python - 在 python 中使用 @functools.lru_decorator 实现 LRU 缓存
问题描述
所以我一直在尝试为我的项目实现 LRU 缓存,使用 python functools lru_cache。作为参考,我使用了 this。以下是参考中使用的代码。
def timed_lru_cache(maxsize, seconds):
def wrapper_cache(func):
func = lru_cache(maxsize=maxsize)(func)
func.lifetime = timedelta(seconds=seconds)
func.expiration = datetime.utcnow() + func.lifetime
@wraps(func)
def wrapped_func(*args, **kwargs):
if datetime.utcnow() >= func.expiration:
func.cache_clear()
func.expiration = datetime.utcnow() + func.lifetime
return func(*args, **kwargs)
return wrapped_func
return wrapper_cache
@timed_lru_cache(maxsize=config.cache_size, seconds=config.ttl)
def load_into_cache(id):
return object
在包装的 func 部分中,func.cache_clear()
, 清除整个缓存以及所有项目。我需要帮助才能在插入后仅删除超过过期时间的元素。有什么解决办法吗?
解决方案
我不认为适应现有的那么容易lru_cache
,并且我认为链接的方法不是很清楚。
相反,我从头开始实现了一个定时 lru 缓存。有关用法,请参阅顶部的文档字符串。
args
它根据输入的和存储一个键kwargs
,并管理两个结构:
- 的映射
key => (expiry, result)
- 最近使用的列表,其中第一项是最近最少使用的
每次您尝试获取项目时,都会在“最近使用”列表中查找密钥。如果它不存在,它将被添加到列表和映射中。如果它在那里,我们检查到期是否在过去。如果是,我们重新计算结果并更新。否则我们可以只返回映射中的任何内容。
from datetime import datetime, timedelta
from functools import wraps
from typing import Any, Dict, List, Optional, Tuple
class TimedLRUCache:
""" Cache that caches results based on an expiry time, and on least recently used.
Items are eliminated first if they expire, and then if too many "recent" items are being
stored.
There are two methods of using this cache, either the `get` method`, or calling this as a
decorator. The `get` method accepts any arbitrary function, but on the parameters are
considered in the key, so it is advisable not to mix function.
>>> cache = TimedLRUCache(5)
>>> def foo(i):
... return i + 1
>>> cache.get(foo, 1) # runs foo
>>> cache.get(foo, 1) # returns the previously calculated result
As a decorator is more familiar:
>>> @TimedLRUCache(5)
... def foo(i):
... return i + 1
>>> foo(1) # runs foo
>>> foo(1) # returns the previously calculated result
Either method can allow for fine-grained control of the cache:
>>> five_second_cache = TimedLRUCache(5)
>>> @five_second_cache
... def foo(i):
... return i + 1
>>> five_second_cache.clear_cache() # resets the cache (clear every item)
>>> five_second_cache.prune() # clear invalid items
"""
_items: Dict[int, Tuple[datetime, Any]]
_recently_added: List[int]
delta: timedelta
max_size: int
def __init__(self, seconds: Optional[int] = None, max_size: Optional[int] = None):
self.delta = timedelta(seconds=seconds) if seconds else None
self.max_size = max_size
self._items = {}
self._recently_added = []
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
return self.get(func, args, kwargs)
return wrapper
@staticmethod
def _get_key(args, kwargs) -> int:
""" Get the thing we're going to use to lookup items in the cache. """
key = (args, tuple(sorted(kwargs.items())))
return hash(key)
def _update(self, key: int, item: Any) -> None:
""" Make sure an item is up to date. """
if key in self._recently_added:
self._recently_added.remove(key)
# the first item in the list is the least recently used
self._recently_added.append(key)
self._items[key] = (datetime.now() + self.delta, item)
# when this function is called, something has changed, so we can also sort out the cache
self.prune()
def prune(self):
""" Clear out everything that no longer belongs in the cache
First delete everything that has expired. Then delete everything that isn't recent (only
if there is a `max_size`).
"""
# clear out anything that no longer belongs in the cache.
current_time = datetime.now()
# first get rid of things which have expired
for key, (expiry, item) in self._items.items():
if expiry < current_time:
del self._items[key]
self._recently_added.remove(key)
# then make sure there aren't too many recent items
if self.max_size:
self._recently_added[:-self.max_size] = []
def clear_cache(self):
""" Clear everything from the cache """
self._items = {}
self._recently_added = []
def get(self, func, args, kwargs):
""" Given a function and its arguments, get the result using the cache
Get the key from the arguments of the function. If the key is in the cache, and the
expiry time of that key hasn't passed, return the result from the cache.
If the key *has* expired, or there are too many "recent" items, recalculate the result,
add it to the cache, and then return the result.
"""
key = self._get_key(args, kwargs)
current_time = datetime.now()
if key in self._recently_added:
# there is something in the cache
expiry, item = self._items.get(key)
if expiry < current_time:
# the item has expired, so we need to get the new value
new_item = func(*args, **kwargs)
self._update(key, new_item)
return new_item
else:
# we can use the existing value
return item
else:
# never seen this before, so add it
new_item = func(*args, **kwargs)
self._update(key, new_item)
return new_item
推荐阅读
- flutter - Navigator.pop(context) 在 Flutter 中完成堆栈中的最后一个 Activity 时显示黑屏
- node.js - 如何在 Opsworks (AWS) 中使用 Apollo 服务器包部署 Nodejs 应用程序
- unit-testing - 模拟 perl 反引号的返回码和输出
- javascript - 如何在 PF 6 中禁用 p:progressBar 过渡动画
- react-native - 无法在 Android 上打开键盘,同时出现模态
- php - 获取具有 id 的多维数组中的最高日期
- javascript - 如何将 JS 库合并到我的 NPM 构建脚本中?
- spring-boot - Kotlin 的导入注释中预期的名称
- javascript - 导入 sendgrid 邮件时它不起作用
- python - 决定重建时忽略一些标志更改