Improve token pool to prevent rate limits

This commit is contained in:
Zed 2021-01-13 14:32:26 +01:00
parent 2d788704b1
commit 51b1567af6
2 changed files with 38 additions and 20 deletions

View file

@ -31,9 +31,6 @@ proc genHeaders*(token: Token = nil): HttpHeaders =
"DNT": "1" "DNT": "1"
}) })
proc rateLimitError(): ref RateLimitError =
newException(RateLimitError, "rate limited with " & getPoolInfo())
proc fetch*(url: Uri; oldApi=false): Future[JsonNode] {.async.} = proc fetch*(url: Uri; oldApi=false): Future[JsonNode] {.async.} =
once: once:
pool = HttpPool() pool = HttpPool()
@ -54,12 +51,16 @@ proc fetch*(url: Uri; oldApi=false): Future[JsonNode] {.async.} =
echo resp.status, ": ", body echo resp.status, ": ", body
result = newJNull() result = newJNull()
if not oldApi and resp.headers.hasKey(rl & "limit"): if not oldApi and resp.headers.hasKey(rl & "reset"):
token.remaining = parseInt(resp.headers[rl & "remaining"]) let time = fromUnix(parseInt(resp.headers[rl & "reset"]))
token.reset = fromUnix(parseInt(resp.headers[rl & "reset"])) if token.reset != time:
token.remaining = parseInt(resp.headers[rl & "limit"])
token.reset = time
if result.getError notin {invalidToken, forbidden, badToken}: if result.getError notin {invalidToken, forbidden, badToken}:
token.release() token.lastUse = getTime()
else:
echo "fetch error: ", result.getError
except Exception: except Exception:
echo "error: ", url echo "error: ", url
raise rateLimitError() raise rateLimitError()

View file

@ -1,4 +1,4 @@
import asyncdispatch, httpclient, times, sequtils, json, math import asyncdispatch, httpclient, times, sequtils, json, math, random
import strutils, strformat import strutils, strformat
import types, agents, consts, http_pool import types, agents, consts, http_pool
@ -6,11 +6,20 @@ var
clientPool {.threadvar.}: HttpPool clientPool {.threadvar.}: HttpPool
tokenPool {.threadvar.}: seq[Token] tokenPool {.threadvar.}: seq[Token]
lastFailed: Time lastFailed: Time
minFail = initDuration(seconds=10) minFail = initDuration(minutes=30)
proc getPoolInfo*: string =
if tokenPool.len == 0: return "token pool empty"
let avg = tokenPool.mapIt(it.remaining).sum() div tokenPool.len
return &"{tokenPool.len} tokens, average remaining: {avg}"
proc rateLimitError*(): ref RateLimitError =
newException(RateLimitError, "rate limited with " & getPoolInfo())
proc fetchToken(): Future[Token] {.async.} = proc fetchToken(): Future[Token] {.async.} =
if getTime() - lastFailed < minFail: if getTime() - lastFailed < minFail:
return Token() raise rateLimitError()
let headers = newHttpHeaders({ let headers = newHttpHeaders({
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
@ -33,7 +42,6 @@ proc fetchToken(): Future[Token] {.async.} =
init: time, lastUse: time) init: time, lastUse: time)
except Exception as e: except Exception as e:
lastFailed = getTime() lastFailed = getTime()
result = Token()
echo "fetching token failed: ", e.msg echo "fetching token failed: ", e.msg
proc expired(token: Token): bool {.inline.} = proc expired(token: Token): bool {.inline.} =
@ -49,19 +57,25 @@ proc isLimited(token: Token): bool {.inline.} =
token.expired token.expired
proc release*(token: Token) = proc release*(token: Token) =
if token != nil and not token.expired: if token != nil and token.expired:
token.lastUse = getTime() tokenPool.delete(tokenPool.find(token))
tokenPool.insert(token)
proc getToken*(): Future[Token] {.async.} = proc getToken*(): Future[Token] {.async.} =
for i in 0 ..< tokenPool.len: for i in 0 ..< tokenPool.len:
if not result.isLimited: break if not result.isLimited: break
result.release() result.release()
result = tokenPool.pop() result = tokenPool.sample()
if result.isLimited: if result.isLimited:
result.release() result.release()
result = await fetchToken() result = await fetchToken()
tokenPool.add result
echo getPoolInfo()
if result == nil:
raise rateLimitError()
dec result.remaining
proc poolTokens*(amount: int) {.async.} = proc poolTokens*(amount: int) {.async.} =
var futs: seq[Future[Token]] var futs: seq[Future[Token]]
@ -69,7 +83,14 @@ proc poolTokens*(amount: int) {.async.} =
futs.add fetchToken() futs.add fetchToken()
for token in futs: for token in futs:
release(await token) var newToken: Token
try: newToken = await token
except: discard
if newToken != nil:
tokenPool.add newToken
echo getPoolInfo()
proc initTokenPool*(cfg: Config) {.async.} = proc initTokenPool*(cfg: Config) {.async.} =
clientPool = HttpPool() clientPool = HttpPool()
@ -78,7 +99,3 @@ proc initTokenPool*(cfg: Config) {.async.} =
if tokenPool.countIt(not it.isLimited) < cfg.minTokens: if tokenPool.countIt(not it.isLimited) < cfg.minTokens:
await poolTokens(min(4, cfg.minTokens - tokenPool.len)) await poolTokens(min(4, cfg.minTokens - tokenPool.len))
await sleepAsync(2000) await sleepAsync(2000)
proc getPoolInfo*: string =
let avg = tokenPool.mapIt(it.remaining).sum()
return &"{tokenPool.len} tokens, average remaining: {avg}"