Source code for datacatalog.linkedstores.basestore.ratelimit

import os
import random
import time
from pprint import pprint

__all__ = ['RateLimiter', 'RateLimitExceeded']

[docs]class RateLimitExceeded(Exception): pass
[docs]class RateLimiter(object): """Adds a limit() method to a class """ BATCH_SIZE = 1024 BATCH_WINDOW = 60 BATCH_PAUSE = 2.50 BATCH_RANDOM = True def __init__(self, *args, batch_size=BATCH_SIZE, batch_window=BATCH_WINDOW, batch_pause=BATCH_PAUSE, batch_random=BATCH_RANDOM, except_on_limit=False, **kwargs): setattr(self, 'batch_size', batch_size) setattr(self, 'batch_window', batch_window) setattr(self, 'batch_pause', batch_pause) setattr(self, 'batch_random', batch_random) setattr(self, 'start_time', None) setattr(self, 'current_time', None) setattr(self, 'count', 0) setattr(self, 'except_on_limit', except_on_limit)
[docs] def limit(self, increment=True): """Enforce a rate limit Args: increment (bool, optional): Whether to increase the usage count Raises: RateLimitExceeded: This is raised when the usage rate is exceeded and ``except_on_limit`` is True Returns: bool: Always returns True """ count = getattr(self, 'count') current_time = time.time() start_time = getattr(self, 'start_time') if start_time is None: setattr(self, 'start_time', current_time) setattr(self, 'current_time', current_time) if increment: count = count + 1 setattr(self, 'count', count) # if count is smaller than batch_size # then it's always OK to return True if count <= getattr(self, 'batch_size'): # print('RATELIMITER: OK ({})'.format(self.remaining)) return True # If sufficient time has passed, it's OK to return True # but make sure to reset the timer/counter elif (current_time - getattr(self, 'start_time')) > getattr(self, 'batch_window'): self.reset_limit() return True # Sleep for a bit, randomizing if needed. Reset counter, then proceed else: if self.except_on_limit: raise RateLimitExceeded( 'Rate limit of {} calls/{} sec. exceeded. Try again in {} sec.'.format(self.batch_size, self.batch_window, self.remaining_time)) else: sleep_seconds = getattr(self, 'batch_pause', 0) if getattr(self, 'batch_random'): sleep_seconds = random.random() * sleep_seconds # print('RATELIMITER: SLEEP for {} sec'.format(sleep_seconds)) time.sleep(sleep_seconds) self.reset_limit() return True
@property def elapsed(self): """Time in seconds elapsed in current rate-limiting window """ return self.current_time - self.start_time @property def remaining(self): """A tuple of time and calls remaining """ return self.remaining_time, self.remaining_calls @property def remaining_calls(self): """How many metered calls remain """ return self.batch_size - self.count @property def remaining_time(self): """How long until the current limiting window expires """ return self.batch_window - self.elapsed
[docs] def reset_limit(self): """Reset rate limiter to default state """ setattr(self, 'start_time', time.time()) setattr(self, 'current_time', time.time()) setattr(self, 'count', 0)