Know more about go concurrency bugs

2019/07/10 Golang

关于go concurrency bugs的学习记录,来源于github项目

blocking-bugs

kubernetes

/*
Fix the deadlock by refactoring workers to acquire a read lock *after* work is
   popped from the queue. This allows writers to get locks while workers are idle,
   while preserving the worker pause semantics necessary to allow safe sync.
*/
diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go
index b2ae6d1..e341e1c 100644
--- a/pkg/controller/resourcequota/resource_quota_controller.go
+++ b/pkg/controller/resourcequota/resource_quota_controller.go

@@ -237,15 +237,13 @@ func (rq *ResourceQuotaController) addQuota(obj interface{}) {
 // worker runs a worker thread that just dequeues items, processes them, and marks them done.
 func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {
 	workFunc := func() bool {
-
-		rq.workerLock.RLock()
-		defer rq.workerLock.RUnlock()
-
 		key, quit := queue.Get()
 		if quit {
 			return true
 		}
 		defer queue.Done(key)
+		rq.workerLock.RLock() // 在 queue.Get()之后进行RLock
+		defer rq.workerLock.RUnlock()
 		err := rq.syncHandler(key.(string))
 		if err == nil {

diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go
index 0c87111..2d40014 100644
--- a/pkg/watch/mux.go
+++ b/pkg/watch/mux.go
@@ -56,9 +56,10 @@ func (m *Mux) Watch() Interface {
 	id := m.nextWatcher
 	m.nextWatcher++
 	w := &muxWatcher{
-		result: make(chan Event),
-		id:     id,
-		m:      m,
+		result:  make(chan Event),
+		stopped: make(chan struct{}),
+		id:      id,
+		m:       m,
 	}
 	m.watchers[id] = w
 	return w
@@ -119,15 +120,20 @@ func (m *Mux) distribute(event Event) {
 	m.lock.Lock()
 	defer m.lock.Unlock()
 	for _, w := range m.watchers {
-		w.result <- event  
+		select {  // 使用select+stopped避免event的阻塞,这是通用的范式
+		case w.result <- event:
+		case <-w.stopped:
+		}
 	}
 }

 // muxWatcher handles a single watcher of a mux
 type muxWatcher struct {
-	result chan Event
-	id     int64
-	m      *Mux
+	result  chan Event
+	stopped chan struct{}
+	stop    sync.Once
+	id      int64
+	m       *Mux
 }

 // ResultChan returns a channel to use for waiting on events.
@@ -137,5 +143,8 @@ func (mw *muxWatcher) ResultChan() <-chan Event {

 // Stop stops watching and removes mw from its list.
 func (mw *muxWatcher) Stop() {
-	mw.m.stopWatching(mw.id)
+	mw.stop.Do(func() {
+		close(mw.stopped)
+		mw.m.stopWatching(mw.id)
+	})
 }


 diff --git a/federation/pkg/federation-controller/util/federated_informer.go b/federation/pkg/federation-controller/util/federated_informer.go
 index 24de649..19f7b8b 100644
 --- a/federation/pkg/federation-controller/util/federated_informer.go
 +++ b/federation/pkg/federation-controller/util/federated_informer.go
 @@ -344,8 +344,6 @@ func (f *federatedInformerImpl) getReadyClusterUnlocked(name string) (*federatio

  // Synced returns true if the view is synced (for the first time)
  func (f *federatedInformerImpl) ClustersSynced() bool {
 -	f.Lock()
 -	defer f.Unlock()
  	return f.clusterInformer.controller.HasSynced()
  }

 @@ -452,18 +450,31 @@ func (fs *federatedStoreImpl) GetKeyFor(item interface{}) string {
  // Checks whether stores for all clusters form the lists (and only these) are there and
  // are synced.
  func (fs *federatedStoreImpl) ClustersSynced(clusters []*federation_api.Cluster) bool {
 -	fs.federatedInformer.Lock()
 -	defer fs.federatedInformer.Unlock()

 -	if len(fs.federatedInformer.targetInformers) != len(clusters) {
 +	// Get the list of informers to check under a lock and check it outside.
 +	okSoFar, informersToCheck := func() (bool, []informer) {
 +		fs.federatedInformer.Lock()
 +		defer fs.federatedInformer.Unlock()
 +
 +		if len(fs.federatedInformer.targetInformers) != len(clusters) {
 +			return false, []informer{}
 +		}
 +		informersToCheck := make([]informer, 0, len(clusters))
 +		for _, cluster := range clusters {
 +			if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found {
 +				informersToCheck = append(informersToCheck, targetInformer)
 +			} else {
 +				return false, []informer{}
 +			}
 +		}
 +		return true, informersToCheck
 +	}()
 +
 +	if !okSoFar {
  		return false
  	}
 -	for _, cluster := range clusters {
 -		if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found {
 -			if !targetInformer.controller.HasSynced() {
 -				return false
 -			}
 -		} else {
 +	for _, informerToCheck := range informersToCheck {
 +		if !informerToCheck.controller.HasSynced() {
  			return false
  		}
    }

    diff --git a/Godeps/_workspace/src/github.com/docker/spdystream/connection.go b/Godeps/_workspace/src/github.com/docker/spdystream/connection.go
    index 846b934..6e623c4 100644
    --- a/Godeps/_workspace/src/github.com/docker/spdystream/connection.go
    +++ b/Godeps/_workspace/src/github.com/docker/spdystream/connection.go
    @@ -89,10 +89,25 @@ Loop:
     			if timer != nil {
     				timer.Stop()
     			}
    +
    +			// Start a goroutine to drain resetChan. This is needed because we've seen
    +			// some unit tests with large numbers of goroutines get into a situation
    +			// where resetChan fills up, at least 1 call to Write() is still trying to
    +			// send to resetChan, the connection gets closed, and this case statement
    +			// attempts to grab the write lock that Write() already has, causing a
    +			// deadlock.
    +			//
    +			// See https://github.com/docker/spdystream/issues/49 for more details.
    +			go func() {
    +				for _ = range resetChan { // drain the resetChan
    +				}
    +			}()
    +
     			i.writeLock.Lock()
     			close(resetChan)
     			i.resetChan = nil
     			i.writeLock.Unlock()
    +
     			break Loop
     		}
      }


Previously, we had no way to stop resync goroutine when ListAndWatch
returned.  goroutine leaked every time ListAndWatch returned, for
example, with error.  This commit adds another channel to signal that
resync goroutine should exit when ListAndWatch returns.

diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go
index 8c8aee3..785cab2 100644
--- a/pkg/client/cache/reflector.go
+++ b/pkg/client/cache/reflector.go
@@ -259,12 +259,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
 	r.setLastSyncResourceVersion(resourceVersion)

 	resyncerrc := make(chan error, 1)
+	cancelCh := make(chan struct{})
+	defer close(cancelCh) // 最后close 能够取消下面的goroutine
 	go func() {
 		for {
 			select {
 			case <-resyncCh:
 			case <-stopCh:
 				return
+			case <-cancelCh:
+				return
 			}
 			glog.V(4).Infof("%s: forcing resync", r.name)
 			if err := r.store.Resync(); err != nil {

func main() {
	fmt.Println("Hello, playground")
	// fmt.Println(UniqRands(13,13))
	rchan := make(chan int)
	cancelCh := make(chan struct{})

	go func() { // goroutine1
		for {
			select {
			case <-rchan:
			case <-cancelCh:
				fmt.Println("=======")
				return
			}
		}
	}()

+ Cc(cancelCh) // 保证goroutine1不会因为rchan发生阻塞而永不结束而阻塞

}

func Cc(cancelCh chan struct{}) {
	defer close(cancelCh)
	fmt.Println("Close cancelCh")
}


avoid dobule RLock() in cpumanager

diff --git a/pkg/kubelet/cm/cpumanager/state/state_mem.go b/pkg/kubelet/cm/cpumanager/state/state_mem.go
index 797cdb1..816c898 100644
--- a/pkg/kubelet/cm/cpumanager/state/state_mem.go
+++ b/pkg/kubelet/cm/cpumanager/state/state_mem.go
@@ -56,9 +56,6 @@ func (s *stateMemory) GetDefaultCPUSet() cpuset.CPUSet {
 }

 func (s *stateMemory) GetCPUSetOrDefault(containerID string) cpuset.CPUSet {
-	s.RLock()
-	defer s.RUnlock()
-
 	if res, ok := s.GetCPUSet(containerID); ok {  // GetCPUSet方法中已经有锁了,外层不应该再用锁,避免嵌套加锁
 		return res
 	}

etcd

diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go
index 736e988..1ac5682 100644
--- a/contrib/raftexample/raft.go
+++ b/contrib/raftexample/raft.go
@@ -41,10 +41,11 @@ type raftNode struct {
 	commitC     chan *string             // entries committed to log (k,v)
 	errorC      chan error               // errors from raft session

-	id     int      // client ID for raft session
-	peers  []string // raft peer URLs
-	join   bool     // node is joining an existing cluster
-	waldir string   // path to WAL directory
+	id        int      // client ID for raft session
+	peers     []string // raft peer URLs
+	join      bool     // node is joining an existing cluster
+	waldir    string   // path to WAL directory
+	lastIndex uint64   // index of log at start

 	// raft backing for the commit/error channel
 	node        raft.Node
@@ -90,8 +91,8 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
 		switch ents[i].Type {
 		case raftpb.EntryNormal:
 			if len(ents[i].Data) == 0 {
-				// ignore conf changes and empty messages
-				continue
+				// ignore empty messages
+				break
 			}
 			s := string(ents[i].Data)
 			select {
@@ -103,7 +104,6 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
 		case raftpb.EntryConfChange:
 			var cc raftpb.ConfChange
 			cc.Unmarshal(ents[i].Data)
-
 			rc.node.ApplyConfChange(cc)
 			switch cc.Type {
 			case raftpb.ConfChangeAddNode:
@@ -118,6 +118,15 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
 				rc.transport.RemovePeer(types.ID(cc.NodeID))
 			}
 		}
+
+		// special nil commit to signal replay has finished
+		if ents[i].Index == rc.lastIndex {
+			select {
+			case rc.commitC <- nil:
+			case <-rc.stopc:
+				return false
+			}
+		}
 	}
 	return true
 }
@@ -144,19 +153,22 @@ func (rc *raftNode) openWAL() *wal.WAL {
 	return w
 }

-// replayWAL replays WAL entries into the raft instance and the commit
-// channel and returns an appendable WAL.
+// replayWAL replays WAL entries into the raft instance.
 func (rc *raftNode) replayWAL() *wal.WAL {
 	w := rc.openWAL()
-	_, _, ents, err := w.ReadAll()
+	_, st, ents, err := w.ReadAll()
 	if err != nil {
 		log.Fatalf("raftexample: failed to read WAL (%v)", err)
 	}
 	// append to storage so raft starts at the right place in log
 	rc.raftStorage.Append(ents)
-	rc.publishEntries(ents)
-	// send nil value so client knows commit channel is current
-	rc.commitC <- nil
+	// send nil once lastIndex is published so client knows commit channel is current
+	if len(ents) > 0 {
+		rc.lastIndex = ents[len(ents)-1].Index
+	} else {
+		rc.commitC <- nil
+	}
+	rc.raftStorage.SetHardState(st)
 	return w
 }

 diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go
 index 1259e0f..89b5208 100644
 --- a/clientv3/concurrency/election.go
 +++ b/clientv3/concurrency/election.go
 @@ -16,6 +16,7 @@ package concurrency

  import (
  	"errors"
 +	"fmt"

  	v3 "github.com/coreos/etcd/clientv3"
  	"github.com/coreos/etcd/mvcc/mvccpb"
 @@ -50,22 +51,39 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
  		return serr
  	}

 -	k, rev, err := NewUniqueKV(ctx, e.client, e.keyPrefix, val, v3.WithLease(s.Lease()))
 -	if err == nil {
 -		err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(rev-1))
 +	k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease())
 +	txn := e.client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
 +	txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
 +	txn = txn.Else(v3.OpGet(k))
 +	resp, err := txn.Commit()
 +	if err != nil {
 +		return err
 +	}
 +
 +	e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
 +	if !resp.Succeeded {
 +		kv := resp.Responses[0].GetResponseRange().Kvs[0]
 +		e.leaderRev = kv.CreateRevision
 +		if string(kv.Value) != val {
 +			if err = e.Proclaim(ctx, val); err != nil {
 +				e.Resign(ctx)
 +				return err
 +			}
 +		}
  	}

 +	err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1))
  	if err != nil {
  		// clean up in case of context cancel
  		select {
  		case <-ctx.Done():
 -			e.client.Delete(e.client.Ctx(), k)
 +			e.Resign(e.client.Ctx())
  		default:
 +			e.leaderSession = nil
  		}
  		return err
  	}

 -	e.leaderKey, e.leaderRev, e.leaderSession = k, rev, s
  	return nil
  }

 diff --git a/clientv3/concurrency/key.go b/clientv3/concurrency/key.go
 index 65e2b89..74d495d 100644
 --- a/clientv3/concurrency/key.go
 +++ b/clientv3/concurrency/key.go
 @@ -17,34 +17,12 @@ package concurrency
  import (
  	"fmt"
  	"math"
 -	"time"

  	v3 "github.com/coreos/etcd/clientv3"
  	"github.com/coreos/etcd/mvcc/mvccpb"
  	"golang.org/x/net/context"
  )

 -// NewUniqueKey creates a new key from a given prefix.
 -func NewUniqueKey(ctx context.Context, kv v3.KV, pfx string, opts ...v3.OpOption) (string, int64, error) {
 -	return NewUniqueKV(ctx, kv, pfx, "", opts...)
 -}
 -
 -func NewUniqueKV(ctx context.Context, kv v3.KV, pfx, val string, opts ...v3.OpOption) (string, int64, error) {
 -	for {
 -		newKey := fmt.Sprintf("%s/%v", pfx, time.Now().UnixNano())
 -		put := v3.OpPut(newKey, val, opts...)
 -		cmp := v3.Compare(v3.ModRevision(newKey), "=", 0)
 -		resp, err := kv.Txn(ctx).If(cmp).Then(put).Commit()
 -		if err != nil {
 -			return "", 0, err
 -		}
 -		if !resp.Succeeded {
 -			continue
 -		}
 -		return newKey, resp.Header.Revision, nil
 -	}
 -}
 -
  func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
  	cctx, cancel := context.WithCancel(ctx)
  	defer cancel()
 diff --git a/integration/v3_election_test.go b/integration/v3_election_test.go
 index d355986..dcadd78 100644
 --- a/integration/v3_election_test.go
 +++ b/integration/v3_election_test.go
 @@ -146,3 +146,26 @@ func TestElectionFailover(t *testing.T) {
  	// leader must ack election (otherwise, Campaign may see closed conn)
  	<-electedc
  }
 +
 +// TestElectionSessionRelock ensures that campaigning twice on the same election
 +// with the same lock will Proclaim instead of deadlocking.
 +func TestElectionSessionRecampaign(t *testing.T) {
 +	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
 +	defer clus.Terminate(t)
 +	cli := clus.RandClient()
 +
 +	e := concurrency.NewElection(cli, "test-elect")
 +	if err := e.Campaign(context.TODO(), "abc"); err != nil {
 +		t.Fatal(err)
 +	}
 +	e2 := concurrency.NewElection(cli, "test-elect")
 +	if err := e2.Campaign(context.TODO(), "def"); err != nil {
 +		t.Fatal(err)
 +	}
 +
 +	ctx, cancel := context.WithCancel(context.TODO())
 +	defer cancel()
 +	if resp := <-e.Observe(ctx); len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != "def" {
 +		t.Fatalf("expected value=%q, got response %v", "def", resp)
 +	}
 +}

 diff --git a/auth/simple_token.go b/auth/simple_token.go
 index 5b608af..ff48c51 100644
 --- a/auth/simple_token.go
 +++ b/auth/simple_token.go
 @@ -32,27 +32,26 @@ import (
  const (
  	letters                  = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
  	defaultSimpleTokenLength = 16
 +)
 +
 +// var for testing purposes
 +var (
  	simpleTokenTTL           = 5 * time.Minute
  	simpleTokenTTLResolution = 1 * time.Second
  )

  type simpleTokenTTLKeeper struct {
 -	tokens              map[string]time.Time
 -	addSimpleTokenCh    chan string
 -	resetSimpleTokenCh  chan string
 -	deleteSimpleTokenCh chan string
 -	stopCh              chan chan struct{}
 -	deleteTokenFunc     func(string)
 +	tokensMu        sync.Mutex
 +	tokens          map[string]time.Time
 +	stopCh          chan chan struct{}
 +	deleteTokenFunc func(string)
  }

  func NewSimpleTokenTTLKeeper(deletefunc func(string)) *simpleTokenTTLKeeper {
  	stk := &simpleTokenTTLKeeper{
 -		tokens:              make(map[string]time.Time),
 -		addSimpleTokenCh:    make(chan string, 1),
 -		resetSimpleTokenCh:  make(chan string, 1),
 -		deleteSimpleTokenCh: make(chan string, 1),
 -		stopCh:              make(chan chan struct{}),
 -		deleteTokenFunc:     deletefunc,
 +		tokens:          make(map[string]time.Time),
 +		stopCh:          make(chan chan struct{}),
 +		deleteTokenFunc: deletefunc,
  	}
  	go stk.run()
  	return stk
 @@ -66,37 +65,34 @@ func (tm *simpleTokenTTLKeeper) stop() {
  }

  func (tm *simpleTokenTTLKeeper) addSimpleToken(token string) {
 -	tm.addSimpleTokenCh <- token
 +	tm.tokens[token] = time.Now().Add(simpleTokenTTL)
  }

  func (tm *simpleTokenTTLKeeper) resetSimpleToken(token string) {
 -	tm.resetSimpleTokenCh <- token
 +	if _, ok := tm.tokens[token]; ok {
 +		tm.tokens[token] = time.Now().Add(simpleTokenTTL)
 +	}
  }

  func (tm *simpleTokenTTLKeeper) deleteSimpleToken(token string) {
 -	tm.deleteSimpleTokenCh <- token
 +	delete(tm.tokens, token)
  }
 +
  func (tm *simpleTokenTTLKeeper) run() {
  	tokenTicker := time.NewTicker(simpleTokenTTLResolution)
  	defer tokenTicker.Stop()
  	for {
  		select {
 -		case t := <-tm.addSimpleTokenCh:
 -			tm.tokens[t] = time.Now().Add(simpleTokenTTL)
 -		case t := <-tm.resetSimpleTokenCh:
 -			if _, ok := tm.tokens[t]; ok {
 -				tm.tokens[t] = time.Now().Add(simpleTokenTTL)
 -			}
 -		case t := <-tm.deleteSimpleTokenCh:
 -			delete(tm.tokens, t)
  		case <-tokenTicker.C:
  			nowtime := time.Now()
 +			tm.tokensMu.Lock()
  			for t, tokenendtime := range tm.tokens {
  				if nowtime.After(tokenendtime) {
  					tm.deleteTokenFunc(t)
  					delete(tm.tokens, t)
  				}
  			}
 +			tm.tokensMu.Unlock()
  		case waitCh := <-tm.stopCh:
  			tm.tokens = make(map[string]time.Time)
  			waitCh <- struct{}{}
 @@ -108,7 +104,7 @@ func (tm *simpleTokenTTLKeeper) run() {
  type tokenSimple struct {
  	indexWaiter       func(uint64) <-chan struct{}
  	simpleTokenKeeper *simpleTokenTTLKeeper
 -	simpleTokensMu    sync.RWMutex
 +	simpleTokensMu    sync.Mutex
  	simpleTokens      map[string]string // token -> username
  }

 @@ -128,6 +124,7 @@ func (t *tokenSimple) genTokenPrefix() (string, error) {
  }

  func (t *tokenSimple) assignSimpleTokenToUser(username, token string) {
 +	t.simpleTokenKeeper.tokensMu.Lock()
  	t.simpleTokensMu.Lock()

  	_, ok := t.simpleTokens[token]
 @@ -138,18 +135,23 @@ func (t *tokenSimple) assignSimpleTokenToUser(username, token string) {
  	t.simpleTokens[token] = username
  	t.simpleTokenKeeper.addSimpleToken(token)
  	t.simpleTokensMu.Unlock()
 +	t.simpleTokenKeeper.tokensMu.Unlock()
  }

  func (t *tokenSimple) invalidateUser(username string) {
 +	if t.simpleTokenKeeper == nil {
 +		return
 +	}
 +	t.simpleTokenKeeper.tokensMu.Lock()
  	t.simpleTokensMu.Lock()
 -	defer t.simpleTokensMu.Unlock()
 -
  	for token, name := range t.simpleTokens {
  		if strings.Compare(name, username) == 0 {
  			delete(t.simpleTokens, token)
  			t.simpleTokenKeeper.deleteSimpleToken(token)
  		}
  	}
 +	t.simpleTokensMu.Unlock()
 +	t.simpleTokenKeeper.tokensMu.Unlock()
  }

  func newDeleterFunc(t *tokenSimple) func(string) {
 @@ -172,7 +174,6 @@ func (t *tokenSimple) disable() {
  		t.simpleTokenKeeper.stop()
  		t.simpleTokenKeeper = nil
  	}
 -
  	t.simpleTokensMu.Lock()
  	t.simpleTokens = make(map[string]string) // invalidate all tokens
  	t.simpleTokensMu.Unlock()
 @@ -182,14 +183,14 @@ func (t *tokenSimple) info(ctx context.Context, token string, revision uint64) (
  	if !t.isValidSimpleToken(ctx, token) {
  		return nil, false
  	}
 -
 -	t.simpleTokensMu.RLock()
 -	defer t.simpleTokensMu.RUnlock()
 +	t.simpleTokenKeeper.tokensMu.Lock()
 +	t.simpleTokensMu.Lock()
  	username, ok := t.simpleTokens[token]
  	if ok {
  		t.simpleTokenKeeper.resetSimpleToken(token)
  	}
 -
 +	t.simpleTokensMu.Unlock()
 +	t.simpleTokenKeeper.tokensMu.Unlock()
  	return &AuthInfo{Username: username, Revision: revision}, ok
  }

diff --git a/raft/node.go b/raft/node.go
index 5fce584..c8410fd 100644
--- a/raft/node.go
+++ b/raft/node.go
@@ -462,8 +462,12 @@ func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {

 func (n *node) Status() Status {
 	c := make(chan Status)
-	n.status <- c
-	return <-c
+	select {
+	case n.status <- c:
+		return <-c // 会阻塞在这
+	case <-n.done:
+		return Status{}
+	}
 }

 diff --git a/proxy/grpcproxy/watch_broadcasts.go b/proxy/grpcproxy/watch_broadcasts.go
 index fc18b74..3ca6fa2 100644
 --- a/proxy/grpcproxy/watch_broadcasts.go
 +++ b/proxy/grpcproxy/watch_broadcasts.go
 @@ -116,13 +116,12 @@ func (wbs *watchBroadcasts) empty() bool { return len(wbs.bcasts) == 0 }

  func (wbs *watchBroadcasts) stop() {
  	wbs.mu.Lock()
 -	defer wbs.mu.Unlock()
 -
  	for wb := range wbs.bcasts {
  		wb.stop()
  	}
  	wbs.bcasts = nil
  	close(wbs.updatec)
 +	wbs.mu.Unlock()
  	<-wbs.donec // 按照原有方式,如果这里一直没有done一直阻塞,则锁就不会释放而deadlock
  }

  var b []byte
  -	errc := make(chan error)
  +	// buffer errc channel so that errc don't block inside the go routinue
  +	errc := make(chan error, 2)

  func (b *simpleBalancer) Close() error {
   	b.mu.Lock()
  -	defer b.mu.Unlock()
   	// In case gRPC calls close twice. TODO: remove the checking
   	// when we are sure that gRPC wont call close twice.
   	if b.closed {
  +		b.mu.Unlock()
  +		<-b.donec // 在 阻塞chan前、return 前 先解锁
   		return nil
   	}
   	b.closed = true
  -	close(b.notifyCh)
  +	close(b.stopc)
   	b.pinAddr = ""

   	// In the case of following scenario:
  @@ -236,6 +292,13 @@ func (b *simpleBalancer) Close() error {
   		// terminate all waiting Get()s
   		close(b.upc)
   	}
  +
  +	b.mu.Unlock()
  +
  +	// wait for updateNotifyLoop to finish
  +	<-b.donec
  +	close(b.notifyCh)
  +
   	return nil
   }

   diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go
   index 555f2f5..753cc2a 100644
   --- a/contrib/raftexample/raft.go
   +++ b/contrib/raftexample/raft.go
   @@ -184,21 +184,23 @@ func (rc *raftNode) serveChannels() {
    	ticker := time.NewTicker(100 * time.Millisecond)
    	defer ticker.Stop()

   -	// event loop on client proposals and raft updates
   +	// send proposals over raft
   +	stopc := make(chan struct{}, 1)
   +	go func() {
   +		for prop := range rc.proposeC {
   +			// blocks until accepted by raft state machine
   +			rc.node.Propose(context.TODO(), []byte(prop))
   +		}
   +		// client closed channel; shutdown raft if not already
   +		stopc <- struct{}{}
   +	}()
   +
   +	// event loop on raft state machine updates
    	for {
    		select {
    		case <-ticker.C:
    			rc.node.Tick()

   -		// send proposals over raft
   -		case prop, ok := <-rc.proposeC:
   -			if !ok {
   -				// client closed channel; shut down
   -				rc.stop()
   -				return
   -			}
   -			rc.node.Propose(context.TODO(), []byte(prop))
   -
    		// store raft entries to wal, then publish over commit channel
    		case rd := <-rc.node.Ready():
    			rc.wal.Save(rd.HardState, rd.Entries)
   @@ -210,6 +212,10 @@ func (rc *raftNode) serveChannels() {
    		case err := <-rc.transport.ErrorC:
    			rc.writeError(err)
    			return
   +
   +		case <-stopc:
   +			rc.stop()
   +			return
    		}
    	}
    }

    diff --git a/client/client.go b/client/client.go
    index da86a0b..ece4cc0 100644
    --- a/client/client.go
    +++ b/client/client.go
    @@ -378,9 +378,12 @@ func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Respon
     		return nil, nil, err
     	}

    -	hctx, hcancel := context.WithCancel(ctx)
    +	var hctx context.Context
    +	var hcancel context.CancelFunc
     	if c.headerTimeout > 0 {
     		hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout)
    +	} else {
    +		hctx, hcancel = context.WithCancel(ctx)
     	}
    defer hcancel()

    diff --git a/clientv3/client.go b/clientv3/client.go
    index 4ec061d..99ea726 100644
    --- a/clientv3/client.go
    +++ b/clientv3/client.go
    @@ -87,12 +87,13 @@ func NewFromURL(url string) (*Client, error) {
     // Close shuts down the client's etcd connections.
     func (c *Client) Close() error {
     	c.mu.Lock()
    -	defer c.mu.Unlock()
     	if c.cancel == nil {
    +		c.mu.Unlock()
     		return nil
     	}
     	c.cancel()
     	c.cancel = nil
    +	c.mu.Unlock()
     	c.Watcher.Close()
     	c.Lease.Close()
     	return c.conn.Close()
    @@ -126,14 +127,22 @@ func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
     	} else {
     		opts = append(opts, grpc.WithInsecure())
     	}
    +
    +	proto := "tcp"
     	if url, uerr := url.Parse(endpoint); uerr == nil && url.Scheme == "unix" {
    -		f := func(a string, t time.Duration) (net.Conn, error) {
    -			return net.DialTimeout("unix", a, t)
    -		}
    +		proto = "unix"
     		// strip unix:// prefix so certs work
     		endpoint = url.Host
    -		opts = append(opts, grpc.WithDialer(f))
     	}
    +	f := func(a string, t time.Duration) (net.Conn, error) {
    +		select {
    +		case <-c.ctx.Done():
    +			return nil, c.ctx.Err()
    +		default:
    +		}
    +		return net.DialTimeout(proto, a, t)
    +	}
    +	opts = append(opts, grpc.WithDialer(f))

     	conn, err := grpc.Dial(endpoint, opts...)
     	if err != nil {
    @@ -156,11 +165,11 @@ func newClient(cfg *Config) (*Client, error) {
     		creds = &c
     	}
     	// use a temporary skeleton client to bootstrap first connection
    -	conn, err := cfg.RetryDialer(&Client{cfg: *cfg, creds: creds})
    +	ctx, cancel := context.WithCancel(context.TODO())
    +	conn, err := cfg.RetryDialer(&Client{cfg: *cfg, creds: creds, ctx: ctx})
     	if err != nil {
     		return nil, err
     	}
    -	ctx, cancel := context.WithCancel(context.TODO())
     	client := &Client{
     		conn:   conn,
     		cfg:    *cfg,
    @@ -198,6 +207,13 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.Cli
     		// conn has already been updated
     		return c.conn, nil
     	}
    +
    +	oldConn.Close()
    +	if st, _ := oldConn.State(); st != grpc.Shutdown {
    +		// wait for shutdown so grpc doesn't leak sleeping goroutines
    +		oldConn.WaitForStateChange(c.ctx, st)
    +	}
    +
     	conn, dialErr := c.cfg.RetryDialer(c)
     	if dialErr != nil {
     		c.errors = append(c.errors, dialErr)

grpc-go

diff --git a/balancer.go b/balancer.go
index 2acc882..57c20c6 100644
--- a/balancer.go
+++ b/balancer.go
@@ -201,6 +201,10 @@ func (rr *roundRobin) watchAddrUpdates() error {
 	if rr.done {
 		return ErrClientConnClosing
 	}
+	select {
+	case <-rr.addrCh:
+	default:
+	}
 	rr.addrCh <- open
 	return nil
 }

 diff --git a/stream.go b/stream.go
 index 0ee572c..537d4b3 100644
 --- a/stream.go
 +++ b/stream.go
 @@ -133,8 +133,12 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
  	// Listen on ctx.Done() to detect cancellation when there is no pending
  	// I/O operations on this stream.
  	go func() {
 -		<-s.Context().Done()
 -		cs.closeTransportStream(transport.ContextErr(s.Context().Err()))
 +		select {
 +		case <-t.Error():
 +			// Incur transport error, simply exit.
 +		case <-s.Context().Done():
 +			cs.closeTransportStream(transport.ContextErr(s.Context().Err()))
 +		}
  	}()
  	return cs, nil
  }

  diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go
  index 7d14007..2ee5dbb 100644
  --- a/benchmark/worker/benchmark_client.go
  +++ b/benchmark/worker/benchmark_client.go
  @@ -200,8 +200,8 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe
   		for j := 0; j < rpcCountPerConn; j++ {
   			go func(client testpb.BenchmarkServiceClient) {
   				defer wg.Done()
  +				done := make(chan bool)
   				for {
  -					done := make(chan bool)
   					go func() {
   						start := time.Now()
   						if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
  @@ -212,7 +212,10 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe
   						bc.mu.Lock()
   						bc.histogram.Add(int64(elapse / time.Nanosecond))
   						bc.mu.Unlock()
  -						done <- true
  +						select {
  +						case <-bc.stop:
  +						case done <- true:
  +						}
   					}()
   					select {
   					case <-bc.stop:
  @@ -259,8 +262,8 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
   		for j := 0; j < rpcCountPerConn; j++ {
   			go func(stream testpb.BenchmarkService_StreamingCallClient) {
   				defer wg.Done()
  +				done := make(chan bool)
   				for {
  -					done := make(chan bool) // 在for中make chan 会产生大量的chan
   					go func() {
   						start := time.Now()
   						if err := doRPC(stream, reqSize, respSize); err != nil {
  @@ -271,7 +274,10 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
   						bc.mu.Lock()
   						bc.histogram.Add(int64(elapse / time.Nanosecond))
   						bc.mu.Unlock()
  -						done <- true
  +						select {
  +						case <-bc.stop:
  +						case done <- true:
  +						}
   					}()

select 是非确定性的stopCh  ticker 同时发生时不一定会执行 stopChan 的分支正确做法是先检查一次 stopCh

select {
case <-bc.stop:
ticker := time.NewTicker()
for {
    select{
        case <- stopCh:
            return
        default:
    }
    f()
    select {
        case <- stopCh:
            return
        case <- ticker:
    }
}

var group sync.WaitGroup
group.Add(len(pm.plugins))
for_, p := range pm.plugins {
    go func(p *plugin) {
        defer group.Done()
    }
    group.Wait() // 阻塞
}
// 应该在这里group.Wait()

func goroutine1() {
    m.Lock()
    ch <- request // 阻塞
    m.Unlock()
}

func goroutine2() {
    for{
        m.Lock()    // 阻塞
        m.Unlock()
        request <- ch
    }
}


diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go
index 0ac0cb3..8794e89 100644
--- a/daemon/logger/jsonfilelog/read.go
+++ b/daemon/logger/jsonfilelog/read.go
@@ -9,6 +9,7 @@ import (
 	"os"
 	"time"

+	"golang.org/x/net/context"
 	"gopkg.in/fsnotify.v1"

 	"github.com/Sirupsen/logrus"
@@ -172,9 +173,22 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
 	}
 	defer func() {
 		f.Close()
+		fileWatcher.Remove(name)
 		fileWatcher.Close()
 	}()

+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	go func() {
+		select {
+		case <-logWatcher.WatchClose():
+			fileWatcher.Remove(name)
+			cancel()
+		case <-ctx.Done():
+			return
+		}
+	}()
+
 	var retries int
 	handleRotate := func() error {
 		f.Close()
@@ -209,8 +223,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
 			case fsnotify.Rename, fsnotify.Remove:
 				select {
 				case <-notifyRotate:
-				case <-logWatcher.WatchClose():
-					fileWatcher.Remove(name)
+				case <-ctx.Done():
 					return errDone
 				}
 				if err := handleRotate(); err != nil {
@@ -232,8 +245,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
 				return errRetry
 			}
 			return err
-		case <-logWatcher.WatchClose():
-			fileWatcher.Remove(name)
+		case <-ctx.Done():
 			return errDone
 		}
 	}
@@ -290,7 +302,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
 		}
 		select {
 		case logWatcher.Msg <- msg:
-		case <-logWatcher.WatchClose():
+		case <-ctx.Done():
 			logWatcher.Msg <- msg
 			for {
 				msg, err := decodeLogLine(dec, l)

  for _, pair := range tests {
-		go func() {
-			srv.Write(pair[0])
-		}()
+		go func(x []byte) {
+			srv.Write(x)
+		}(pair[0]) // write 复制的数据,避免race
 		n, err := l.Read(read)

Search

    Table of Contents