blob: 9e95bc14b597308b02685f745e232f3af67c466f [file] [log] [blame]
// Copyright 2016 Google Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"runtime"
)
type RateLimit struct {
requests chan struct{}
finished chan int
released chan int
stop chan struct{}
}
// NewRateLimit starts a new rate limiter with maxExecs number of executions
// allowed to happen at a time. If maxExecs is <= 0, it will default to the
// number of logical CPUs on the system.
//
// With Finish and Release, we'll keep track of outstanding buffer sizes to be
// written. If that size goes above maxMem, we'll prevent starting new
// executions.
//
// The total memory use may be higher due to current executions. This just
// prevents runaway memory use due to slower writes.
func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
if maxExecs <= 0 {
maxExecs = runtime.NumCPU()
}
if maxMem <= 0 {
// Default to 512MB
maxMem = 512 * 1024 * 1024
}
ret := &RateLimit{
requests: make(chan struct{}),
// Let all of the pending executions to mark themselves as finished,
// even if our goroutine isn't processing input.
finished: make(chan int, maxExecs),
released: make(chan int),
stop: make(chan struct{}),
}
go ret.goFunc(maxExecs, maxMem)
return ret
}
// RequestExecution blocks until another execution can be allowed to run.
func (r *RateLimit) RequestExecution() Execution {
<-r.requests
return r.finished
}
type Execution chan<- int
// Finish will mark your execution as finished, and allow another request to be
// approved.
//
// bufferSize may be specified to count memory buffer sizes, and must be
// matched with calls to RateLimit.Release to mark the buffers as released.
func (e Execution) Finish(bufferSize int) {
e <- bufferSize
}
// Call Release when finished with a buffer recorded with Finish.
func (r *RateLimit) Release(bufferSize int) {
r.released <- bufferSize
}
// Stop the background goroutine
func (r *RateLimit) Stop() {
close(r.stop)
}
func (r *RateLimit) goFunc(maxExecs int, maxMem int64) {
var curExecs int
var curMemory int64
for {
var requests chan struct{}
if curExecs < maxExecs && curMemory < maxMem {
requests = r.requests
}
select {
case requests <- struct{}{}:
curExecs++
case amount := <-r.finished:
curExecs--
curMemory += int64(amount)
if curExecs < 0 {
panic("curExecs < 0")
}
case amount := <-r.released:
curMemory -= int64(amount)
case <-r.stop:
return
}
}
}