I’m trying to restrict the API calls in my code. I already found a nice python library ratelimiter==1.0.2.post0
https://pypi.python.org/pypi/ratelimiter
However, this library can only limit the rate in local scope. i.e) in function and loops
# Decorator @RateLimiter(max_calls=10, period=1) def do_something(): pass # Context Manager rate_limiter = RateLimiter(max_calls=10, period=1) for i in range(100): with rate_limiter: do_something()
Because I have several functions, which make API calls, in different places, I want to limit the API calls in global scope.
For example, suppose I want to limit the APIs call to one time per second. And, suppose I have functions x
and y
in which two API calls are made.
@rate(...) def x(): ... @rate(...) def y(): ...
By decorating the functions with the limiter
, I’m able to limit the rate against the two functions.
However, if I execute the above two functions sequentially, it looses track of the number of API calls in global scope because they are unaware of each other. So, y
will be called right after the execution of x
without waiting another second. And, this will violate the one time per second restriction.
Is there any way or library that I can use to limit the rate globally in python?
Advertisement
Answer
After all, I implemented my own Throttler
class. By proxying every API request to the request
method, we can keep track of all API requests. Taking advantage of passing function as the request
method parameter, it also caches the result in order to reduce API calls.
class TooManyRequestsError(Exception): def __str__(self): return "More than 30 requests have been made in the last five seconds." class Throttler(object): cache = {} def __init__(self, max_rate, window, throttle_stop=False, cache_age=1800): # Dict of max number of requests of the API rate limit for each source self.max_rate = max_rate # Dict of duration of the API rate limit for each source self.window = window # Whether to throw an error (when True) if the limit is reached, or wait until another request self.throttle_stop = throttle_stop # The time, in seconds, for which to cache a response self.cache_age = cache_age # Initialization self.next_reset_at = dict() self.num_requests = dict() now = datetime.datetime.now() for source in self.max_rate: self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source)) self.num_requests[source] = 0 def request(self, source, method, do_cache=False): now = datetime.datetime.now() # if cache exists, no need to make api call key = source + method.func_name if do_cache and key in self.cache: timestamp, data = self.cache.get(key) logging.info('{} exists in cached @ {}'.format(key, timestamp)) if (now - timestamp).seconds < self.cache_age: logging.info('retrieved cache for {}'.format(key)) return data # <--- MAKE API CALLS ---> # # reset the count if the period passed if now > self.next_reset_at.get(source): self.num_requests[source] = 0 self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source)) # throttle request def halt(wait_time): if self.throttle_stop: raise TooManyRequestsError() else: # Wait the required time, plus a bit of extra padding time. time.sleep(wait_time + 0.1) # if exceed max rate, need to wait if self.num_requests.get(source) >= self.max_rate.get(source): logging.info('back off: {} until {}'.format(source, self.next_reset_at.get(source))) halt((self.next_reset_at.get(source) - now).seconds) self.num_requests[source] += 1 response = method() # potential exception raise # cache the response if do_cache: self.cache[key] = (now, response) logging.info('cached instance for {}, {}'.format(source, method)) return response