- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
蔡超 - Go高效并发编程
展开查看详情
1 .Go Concurrent Programming chao.cai@mobvista.com QCon2018 Beijing
2 .Shared Memory Model Shared Memory Model
3 .class Worker implements Runnable{ private volatile boolean isRunning = false; @Override public void run() { while(isRunning) { //do something } Lock lock = ...; } lock.lock(); try{ } //process (thread-safe) }catch(Exception ex){ }finally{ lock.unlock(); }
4 .CSP Tony Hoare
5 .Actor Model
6 .Channel
7 .
8 .Nonblocking Call
9 .private static FutureTask<String> service() { FutureTask<String> task = new FutureTask<String>(()->"Do something"); new Thread(task).start(); return task; } FutureTask<String> ret = service(); System.out.println("Do something else”); System.out.println(ret.get()); func Service() <-chan string { ret := make(chan string) go func() { ret <- "Do something" }() return ret } func TestService(t *testing.T) { r := Service() //Nonblock call fmt.Println("Do something else") //do something else fmt.Println(<-r) //Waiting for the r }
10 .Util Anyone Responses func UntilAnyoneResponse(callables []Callable) interface{} { ch := make(chan interface{}, len(callables)) for _, callable := range callables { go func(c Callable) { ch <- c.Call() }(callable) } return <-ch }
11 .Rate Limit
12 .func CreateTokenBucket(sizeOfBucket int, numOfTokens int, tokenFillingInterval time.Duration) chan time.Time { bucket := make(chan time.Time, sizeOfBucket) //fill the bucket firstly for j := 0; j < sizeOfBucket; j++ { bucket <- time.Now() } go func() { for t := range time.Tick(tokenFillingInterval) { for i := 0; i < numOfTokens; i++ { bucket <- t } } }() return bucket }
13 .func GetToken(tokenBucket chan time.Time, timeout time.Duration) (time.Time, error) { var token time.Time if timeout != 0 { select { case token = <-tokenBucket: return token, nil case <-time.After(timeout): return token, errors.New("Failed to get token for time out") } } token = <-tokenBucket return token, nil }
14 .Traps
15 .func f1() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { fmt.Println(i) wg.Done() }() } wg.Wait() }
16 .func f2() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(p int) { fmt.Println(p) wg.Done() }(i) } wg.Wait() }
17 .Goroutine Leak
18 .
19 .M System Thread P Processor G Goroutine
20 .func UntilAnyoneResponse(callables []Callable) interface{} { ch := make(chan interface{}) for _, callable := range callables { go func(c Callable) { ch <- c.Call() }(callable) } return <-ch } func UntilAnyoneResponse(callables []Callable) interface{} { ch := make(chan interface{}, len(callables)) for _, callable := range callables { go func(c Callable) { ch <- c.Call() }(callable) } return <-ch }
21 .runtime.NumGoroutine() You might be surprised!
22 . Recipes • Rate Limit • Timeout/Circuit Break • Number of Goroutine Limit
23 .Cancellation
24 .class Worker implements Runnable{ private volatile boolean isRunning = false; @Override public void run() { while(isRunning) { //do something } } }
25 . Broadcast by Channel func isCancelled(cancelChan chan struct{}) bool { select { case <-cancelChan: return true default: return false } }
26 .Broadcast by Channel func cancel_1(cancelChan chan struct{}) { cancelChan <- struct{}{} }
27 .Broadcast by Channel func cancel_2(cancelChan chan struct{}) { fmt.Println("Cancelled!") close(cancelChan) }
28 . Context Main Handle(Req1) Handle(Req2) Search(A) Search(B) Search(C) Search(A) Search(B) Search(C)
29 .func main() { ctx, cancel := context.WithCancel(context.Background()) go func(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println(“Stop processing ...") return default: fmt.Println("Running...") time.Sleep(2 * time.Second) } } }(ctx) time.Sleep(10 * time.Second) fmt.Println(“Done. Stop it!”) cancel() … }