// Copyright 2017 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file.
// NewWeighted creates a new weighted semaphore with the given // maximum combined weight for concurrent access. funcNewWeighted(n int64) *Weighted { w := &Weighted{size: n} return w }
// Weighted provides a way to bound concurrent access to a resource. // The callers can request access with a given weight. type Weighted struct { size int64// 字段用来记录信号量拥有的最大资源数 cur int64// 标识当前已被使用的资源数 mu sync.Mutex // 互斥锁,用来提供对其他字段的临界区保护 waiters list.List // 表示申请资源时由于可使用资源不够而陷入阻塞等待的调用者列表 }
// Acquire acquires the semaphore with a weight of n, blocking until resources // are available or ctx is done. On success, returns nil. On failure, returns // ctx.Err() and leaves the semaphore unchanged. // // If ctx is already done, Acquire may still succeed without blocking.
// 如果恰好有足够的资源,也没有排队等待获取资源的goroutine,则将cur加上n后直接返回 if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() returnnil }
// 请求的资源数 > 能提供的最大资源数, 则该任务处理不了,走错误处理逻辑 if n > s.size { // Don't make other Acquire calls block on one that's doomed to fail. s.mu.Unlock() // 依赖ctx的状态返回,否则一直等待 <-ctx.Done() return ctx.Err() }
// 现存资源不够, 需要把调用者加入到等待队列中 // 创建了一个ready chan,以便被通知唤醒 ready := make(chanstruct{}) //如果调用者请求不到信号量的资源就会被加入等待者列表里 w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) s.mu.Unlock()
// 等待 select { case <-ctx.Done(): // context的Done被关闭 err := ctx.Err() s.mu.Lock() select { case <-ready: // 如果被唤醒了,则忽略ctx的状态 // Acquired the semaphore after we were canceled. Rather than trying to // fix up the queue, just pretend we didn't notice the cancelation. err = nil default: // 通知waiter isFront := s.waiters.Front() == elem s.waiters.Remove(elem) // If we're at the front and there're extra tokens left, notify other waiters. // 通知其它的waiters,检查是否有足够的资源 if isFront && s.size > s.cur { s.notifyWaiters() } } s.mu.Unlock() return err
case <-ready: // 等待者被唤醒 returnnil } }
// TryAcquire acquires the semaphore with a weight of n without blocking. // On success, returns true. On failure, returns false and leaves the semaphore unchanged. func(s *Weighted) TryAcquire(n int64) bool { s.mu.Lock() success := s.size-s.cur >= n && s.waiters.Len() == 0 if success { s.cur += n } s.mu.Unlock() return success }
// Release releases the semaphore with a weight of n. //Release方法很简单, 它将当前计数值减去释放的资源数 n, 并调用notifyWaiters方法,尝试唤醒等待队列中的调用者,看是否有足够的资源被获取 func(s *Weighted) Release(n int64) { s.mu.Lock() s.cur -= n if s.cur < 0 { s.mu.Unlock() panic("semaphore: released more than held") } s.notifyWaiters() s.mu.Unlock() }
// notifyWaiters方法 会逐个检查队列里等待的调用者,如果现存资源 够等待者请求的数量n,或者是没有等待者了,就返回 func(s *Weighted) notifyWaiters() { for { next := s.waiters.Front() if next == nil { break// No more waiters blocked. // 没有等待者了,直接返回 }
w := next.Value.(waiter) if s.size-s.cur < w.n { // 如果现有资源不够队列头调用者请求的资源数,就退出所有等待者会继续等待 // 这里还是按照先入先出的方式处理是为了避免饥饿
// Not enough tokens for the next waiter. We could keep going (to try to // find a waiter with a smaller request), but under load that could cause // starvation for large requests; instead, we leave all remaining waiters // blocked. // // Consider a semaphore used as a read-write lock, with N tokens, N // readers, and one writer. Each reader can Acquire(1) to obtain a read // lock. The writer can Acquire(N) to obtain a write lock, excluding all // of the readers. If we allow the readers to jump ahead in the queue, // the writer will starve — there is always one token available for every // reader. break }