Go语言的协程并发机制,使得Go非常适用于大规模高并发后端服务器程序开发。越来越多的开发团队开始用Go开发自己的系统,大量的开发人员开始迁移到Go语言。由于大量的后台开发人员都是从Java/C/C++迁移到Go,其中的并发编程机制存在着一定差异,常常会由于惯性思维导致一些低效和错误的实现,而并没有真正发挥Go语言的并发优势。 本讲座针对那些从传统语言迁移至Go的开发人员,比较了Go语言及传统服务器开发语言的并发编程模式,指出了沿用传统思维易导致的复杂性和错误,以及如何利用Go的并发编程新特性更加简单和高效地实现常见的并发场景。

注脚

展开查看详情

1.Go Concurrent Programming chao.cai@mobvista.com QCon2018 Beijing

2.Shared Memory Model Shared Memory Model

3.class Worker implements Runnable{ private volatile boolean isRunning = false; @Override public void run() { while(isRunning) { //do something } Lock lock = ...; } lock.lock(); try{ } //process (thread-safe) }catch(Exception ex){ }finally{ lock.unlock(); }

4.CSP Tony Hoare

5.Actor Model

6.Channel

7.

8.Nonblocking Call

9.private static FutureTask<String> service() { FutureTask<String> task = new FutureTask<String>(()->"Do something"); new Thread(task).start(); return task; } FutureTask<String> ret = service(); System.out.println("Do something else”); System.out.println(ret.get()); func Service() <-chan string { ret := make(chan string) go func() { ret <- "Do something" }() return ret } func TestService(t *testing.T) { r := Service() //Nonblock call fmt.Println("Do something else") //do something else fmt.Println(<-r) //Waiting for the r }

10.Util Anyone Responses func UntilAnyoneResponse(callables []Callable) interface{} { ch := make(chan interface{}, len(callables)) for _, callable := range callables { go func(c Callable) { ch <- c.Call() }(callable) } return <-ch }

11.Rate Limit

12.func CreateTokenBucket(sizeOfBucket int, numOfTokens int, tokenFillingInterval time.Duration) chan time.Time { bucket := make(chan time.Time, sizeOfBucket) //fill the bucket firstly for j := 0; j < sizeOfBucket; j++ { bucket <- time.Now() } go func() { for t := range time.Tick(tokenFillingInterval) { for i := 0; i < numOfTokens; i++ { bucket <- t } } }() return bucket }

13.func GetToken(tokenBucket chan time.Time, timeout time.Duration) (time.Time, error) { var token time.Time if timeout != 0 { select { case token = <-tokenBucket: return token, nil case <-time.After(timeout): return token, errors.New("Failed to get token for time out") } } token = <-tokenBucket return token, nil }

14.Traps

15.func f1() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { fmt.Println(i) wg.Done() }() } wg.Wait() }

16.func f2() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(p int) { fmt.Println(p) wg.Done() }(i) } wg.Wait() }

17.Goroutine Leak

18.

19.M System Thread P Processor G Goroutine

20.func UntilAnyoneResponse(callables []Callable) interface{} { ch := make(chan interface{}) for _, callable := range callables { go func(c Callable) { ch <- c.Call() }(callable) } return <-ch } func UntilAnyoneResponse(callables []Callable) interface{} { ch := make(chan interface{}, len(callables)) for _, callable := range callables { go func(c Callable) { ch <- c.Call() }(callable) } return <-ch }

21.runtime.NumGoroutine() You might be surprised!

22. Recipes • Rate Limit • Timeout/Circuit Break • Number of Goroutine Limit

23.Cancellation

24.class Worker implements Runnable{ private volatile boolean isRunning = false; @Override public void run() { while(isRunning) { //do something } } }

25. Broadcast by Channel func isCancelled(cancelChan chan struct{}) bool { select { case <-cancelChan: return true default: return false } }

26.Broadcast by Channel func cancel_1(cancelChan chan struct{}) { cancelChan <- struct{}{} }

27.Broadcast by Channel func cancel_2(cancelChan chan struct{}) { fmt.Println("Cancelled!") close(cancelChan) }

28. Context Main Handle(Req1) Handle(Req2) Search(A) Search(B) Search(C) Search(A) Search(B) Search(C)

29.func main() { ctx, cancel := context.WithCancel(context.Background()) go func(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println(“Stop processing ...") return default: fmt.Println("Running...") time.Sleep(2 * time.Second) } } }(ctx) time.Sleep(10 * time.Second) fmt.Println(“Done. Stop it!”) cancel() … }