[docs]@versionadded(version="2.1")classMemcachedStorage(Storage):""" Rate limit storage with memcached as backend. Depends on :pypi:`emcache` """STORAGE_SCHEME=["async+memcached"]"""The storage scheme for memcached to be used in an async context"""DEPENDENCIES=["emcache"]def__init__(self,uri:str,wrap_exceptions:bool=False,**options:Union[float,str,bool],)->None:""" :param uri: memcached location of the form ``async+memcached://host:port,host:port`` :param wrap_exceptions: Whether to wrap storage exceptions in :exc:`limits.errors.StorageError` before raising it. :param options: all remaining keyword arguments are passed directly to the constructor of :class:`emcache.Client` :raise ConfigurationError: when :pypi:`emcache` is not available """parsed=urllib.parse.urlparse(uri)self.hosts=[]forhost,portin(loc.split(":")forlocinparsed.netloc.strip().split(",")ifloc.strip()):self.hosts.append((host,int(port)))self._options=optionsself._storage=Nonesuper().__init__(uri,wrap_exceptions=wrap_exceptions,**options)self.dependency=self.dependencies["emcache"].module@propertydefbase_exceptions(self,)->Union[Type[Exception],Tuple[Type[Exception],...]]:# pragma: no coverreturn(self.dependency.ClusterNoAvailableNodes,self.dependency.CommandError,)asyncdefget_storage(self)->EmcacheClientP:ifnotself._storage:self._storage=awaitself.dependency.create_client([self.dependency.MemcachedHostAddress(h,p)forh,pinself.hosts],**self._options,)assertself._storagereturnself._storage
[docs]asyncdefget(self,key:str)->int:""" :param key: the key to get the counter value for """item=await(awaitself.get_storage()).get(key.encode("utf-8"))returnitemandint(item.value)or0
[docs]asyncdefclear(self,key:str)->None:""" :param key: the key to clear rate limits for """await(awaitself.get_storage()).delete(key.encode("utf-8"))
[docs]asyncdefincr(self,key:str,expiry:int,elastic_expiry:bool=False,amount:int=1)->int:""" increments the counter for a given rate limit key :param key: the key to increment :param expiry: amount in seconds for the key to expire in :param elastic_expiry: whether to keep extending the rate limit window every hit. :param amount: the number to increment by """storage=awaitself.get_storage()limit_key=key.encode("utf-8")expire_key=f"{key}/expires".encode()added=Truetry:awaitstorage.add(limit_key,f"{amount}".encode(),exptime=expiry)exceptself.dependency.NotStoredStorageCommandError:added=Falsestorage=awaitself.get_storage()ifnotadded:value=awaitstorage.increment(limit_key,amount)oramountifelastic_expiry:awaitstorage.touch(limit_key,exptime=expiry)awaitstorage.set(expire_key,str(expiry+time.time()).encode("utf-8"),exptime=expiry,noreply=False,)returnvalueelse:awaitstorage.set(expire_key,str(expiry+time.time()).encode("utf-8"),exptime=expiry,noreply=False,)returnamount
[docs]asyncdefget_expiry(self,key:str)->int:""" :param key: the key to get the expiry for """storage=awaitself.get_storage()item=awaitstorage.get(f"{key}/expires".encode())returnint(itemandfloat(item.value)ortime.time())
[docs]asyncdefcheck(self)->bool:""" Check if storage is healthy by calling the ``get`` command on the key ``limiter-check`` """try:storage=awaitself.get_storage()awaitstorage.get(b"limiter-check")returnTrueexcept:# noqareturnFalse