uber 开源的限流算法,代码不多,尝试理解但是没怎么理解。有没有大佬指点一二.主要不理解的点在于这个 for 循环
// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *limiter) Take() time.Time {
newState := state{}
taken := false
for !taken {
now := t.clock.Now()
previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)
newState = state{}
newState.last = now
// If this is our first request, then we allow it.
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}
// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
newState.sleepFor += t.perRequest - now.Sub(oldState.last)
// We shouldn't allow sleepFor to get too negative, since it would mean that
// a service that slowed down a lot for a short period of time would get
// a much higher RPS following that.
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
}
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
t.clock.Sleep(newState.sleepFor)
return newState.last
}
1
CRVV 2020-12-08 20:47:43 +08:00
var counter int64
然后如果有多个线程执行下面这个循环,每次 break 都会给 counter 加一 for { counter0 := atomic.LoadInt64(&counter) if atomic.CompareAndSwap(&counter, counter0, counter+1) { break } } 这种就是所谓的 lock-free,把它写成用 lock 的形式,再把那几个 if 去掉,这个函数其实就是 func (t *limiter) Take() time.Time { mutex.Lock() oldState := t.state newState = state{} now := t.clock.Now() newState.last = now newState.sleepFor += t.perRequest - now.Sub(oldState.last) t.state = &newState mutex.Unlock() t.clock.Sleep(newState.sleepFor) return newState.last } 然后应该很好懂了 |