get key from database 数据库中的数据 get key from database get key from database 数据库中的数据 --------- get key from database get key from database 数据库中的数据 --------- --------- 数据库中的数据 get key from database 数据库中的数据 --------- get key from database get key from database 数据库中的数据 --------- --------- 数据库中的数据 --------- get key from database 数据库中的数据 --------- 数据库中的数据 --------- get key from database 数据库中的数据 ---------
// Copyright 2013 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file.
// Package singleflight provides a duplicate function call suppression // mechanism. package singleflight // import "golang.org/x/sync/singleflight"
// errGoexit indicates the runtime.Goexit was called in // the user given function. var errGoexit = errors.New("runtime.Goexit was called")
// A panicError is an arbitrary value recovered from a panic // with the stack trace during the execution of given function. type panicError struct { value interface{} stack []byte }
// The first line of the stack trace is of the form "goroutine N [status]:" // but by the time the panic reaches Do the goroutine may no longer exist // and its status will have changed. Trim out the misleading line. if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { stack = stack[line+1:] } return &panicError{value: v, stack: stack} }
// call is an in-flight or completed singleflight.Do call type call struct { wg sync.WaitGroup
// These fields are written once before the WaitGroup is done // and are only read after the WaitGroup is done. // 函数的返回值,在 wg 返回前只会写入一次 val interface{} err error
// forgotten indicates whether Forget was called with this call's key // while the call was still in flight. // 使用调用了 Forgot 方法 forgotten bool
// These fields are read and written with the singleflight // mutex held before the WaitGroup is done, and are read but // not written after the WaitGroup is done. // 统计调用次数以及返回的 channel dups int chans []chan<- Result }
// Group represents a class of work and forms a namespace in // which units of work can be executed with duplicate suppression. type Group struct { mu sync.Mutex // protects m m map[string]*call // lazily initialized }
// Result holds the results of Do, so they can be passed // on a channel. type Result struct { Val interface{} Err error Shared bool }
// Do executes and returns the results of the given function, making // sure that only one execution is in-flight for a given key at a // time. If a duplicate comes in, the duplicate caller waits for the // original to complete and receives the same results. // The return value shared indicates whether v was given to multiple callers. func(g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock()
// DoChan is like Do but returns a channel that will receive the // results when they are ready. // // The returned channel will not be closed. // Do chan 和 Do 类似,其实就是一个是同步等待,一个是异步返回,主要实现上: // 如果调用 DoChan 会给 call.chans 添加一个 channel 这样等第一次调用执行完毕之后就会循环向这些 channel 写入数据 func(g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock()
go g.doCall(c, key, fn)
return ch }
// doCall handles the single call for a key. // 这个方法的实现有意思,使用了两个 defer 巧妙的将 runtime 的错误 // 和我们传入 function 的 panic 区别开来 // 避免了由于传入的 function panic 导致的死锁 func(g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { normalReturn := false recovered := false
// use double-defer to distinguish panic from runtime.Goexit, // more details see https://golang.org/cl/134395 // 第一个 defer 检查 runtime 错误 deferfunc() { // the given function invoked runtime.Goexit // 如果既没有正常执行完毕,又没有 recover 那就说明需要直接退出了 if !normalReturn && !recovered { c.err = errGoexit }
if e, ok := c.err.(*panicError); ok { // In order to prevent the waiting channels from being blocked forever, // needs to ensure that this panic cannot be recovered. // 如果返回的是 panic 错误,为了避免 channel 死锁,我们需要确保这个 panic 无法被恢复 iflen(c.chans) > 0 { gopanic(e) select {} // Keep this goroutine around so that it will appear in the crash dump. } else { panic(e) } } elseif c.err == errGoexit { // Already in the process of goexit, no need to call again // 已经准备退出了,也就不用做其他操作了 } else { // Normal return // 正常情况下向 channel 写入数据 for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } } }()
// 使用一个匿名函数来执行 func() { deferfunc() { if !normalReturn { // Ideally, we would wait to take a stack trace until we've determined // whether this is a panic or a runtime.Goexit. // // Unfortunately, the only way we can distinguish the two is to see // whether the recover stopped the goroutine from terminating, and by // the time we know that, the part of the stack trace relevant to the // panic has been discarded. // 如果 panic 了我们就 recover 掉,然后 new 一个 panic 的错误 // 后面在上层重新 panic if r := recover(); r != nil { c.err = newPanicError(r) } } }()
// Forget tells the singleflight to forget about a key. Future calls // to Do for this key will call the function rather than waiting for // an earlier call to complete. // 用于手动释放某个 key 下次调用就不会阻塞等待了 func(g *Group) Forget(key string) { g.mu.Lock() if c, ok := g.m[key]; ok { c.forgotten = true } delete(g.m, key) g.mu.Unlock() }