缓存 - 内存缓存、Redis、s3、Redis 语义缓存、磁盘缓存
初始化缓存 - 内存缓存、Redis、s3 Bucket、Redis 语义缓存、磁盘缓存、Qdrant 语义缓存
- redis 缓存
- s3 缓存
- redis 语义缓存
- qdrant 语义缓存
- 内存缓存
- 磁盘缓存
安装 redis
pip install redis
对于托管版本,您可以在此处设置您自己的 Redis 数据库:https://redis.ac.cn/try-free/
import litellm
from litellm import completion
from litellm.caching.caching import Cache
litellm.cache = Cache(type="redis", host=<host>, port=<port>, password=<password>)
# Make completion calls
response1 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}]
)
response2 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}]
)
# response1 == response2, response 1 is cached
安装 boto3
pip install boto3
设置 AWS 环境变量
AWS_ACCESS_KEY_ID = "AKI*******"
AWS_SECRET_ACCESS_KEY = "WOl*****"
import litellm
from litellm import completion
from litellm.caching.caching import Cache
# pass s3-bucket name
litellm.cache = Cache(type="s3", s3_bucket_name="cache-bucket-litellm", s3_region_name="us-west-2")
# Make completion calls
response1 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}]
)
response2 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}]
)
# response1 == response2, response 1 is cached
安装 redisvl 客户端
pip install redisvl==0.4.1
对于托管版本,您可以在此处设置您自己的 Redis 数据库:https://redis.ac.cn/try-free/
import litellm
from litellm import completion
from litellm.caching.caching import Cache
random_number = random.randint(
1, 100000
) # add a random number to ensure it's always adding / reading from cache
print("testing semantic caching")
litellm.cache = Cache(
type="redis-semantic",
host=os.environ["REDIS_HOST"],
port=os.environ["REDIS_PORT"],
password=os.environ["REDIS_PASSWORD"],
similarity_threshold=0.8, # similarity threshold for cache hits, 0 == no similarity, 1 = exact matches, 0.5 == 50% similarity
ttl=120,
redis_semantic_cache_embedding_model="text-embedding-ada-002", # this model is passed to litellm.embedding(), any litellm.embedding() model is supported here
)
response1 = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"write a one sentence poem about: {random_number}",
}
],
max_tokens=20,
)
print(f"response1: {response1}")
random_number = random.randint(1, 100000)
response2 = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"write a one sentence poem about: {random_number}",
}
],
max_tokens=20,
)
print(f"response2: {response1}")
assert response1.id == response2.id
# response1 == response2, response 1 is cached
您可以按照此处说明设置您自己的云 Qdrant 集群:https://qdrant.org.cn/documentation/quickstart-cloud/
要在本地设置 Qdrant 集群,请按照此处说明:https://qdrant.org.cn/documentation/quickstart/
import litellm
from litellm import completion
from litellm.caching.caching import Cache
random_number = random.randint(
1, 100000
) # add a random number to ensure it's always adding / reading from cache
print("testing semantic caching")
litellm.cache = Cache(
type="qdrant-semantic",
qdrant_api_base=os.environ["QDRANT_API_BASE"],
qdrant_api_key=os.environ["QDRANT_API_KEY"],
qdrant_collection_name="your_collection_name", # any name of your collection
similarity_threshold=0.7, # similarity threshold for cache hits, 0 == no similarity, 1 = exact matches, 0.5 == 50% similarity
qdrant_quantization_config ="binary", # can be one of 'binary', 'product' or 'scalar' quantizations that is supported by qdrant
qdrant_semantic_cache_embedding_model="text-embedding-ada-002", # this model is passed to litellm.embedding(), any litellm.embedding() model is supported here
)
response1 = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"write a one sentence poem about: {random_number}",
}
],
max_tokens=20,
)
print(f"response1: {response1}")
random_number = random.randint(1, 100000)
response2 = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"write a one sentence poem about: {random_number}",
}
],
max_tokens=20,
)
print(f"response2: {response2}")
assert response1.id == response2.id
# response1 == response2, response 1 is cached
快速开始
import litellm
from litellm import completion
from litellm.caching.caching import Cache
litellm.cache = Cache()
# Make completion calls
response1 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}],
caching=True
)
response2 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}],
caching=True
)
# response1 == response2, response 1 is cached
快速开始
安装 diskcache
pip install diskcache
然后您可以如下使用磁盘缓存。
import litellm
from litellm import completion
from litellm.caching.caching import Cache
litellm.cache = Cache(type="disk")
# Make completion calls
response1 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}],
caching=True
)
response2 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}],
caching=True
)
# response1 == response2, response 1 is cached
如果您运行代码两次,第二次的 response1 将使用第一次运行时存储在缓存文件中的缓存。
按每次 LiteLLM 调用开关缓存
LiteLLM 支持 4 种缓存控制参数
no-cache
: Optional(bool) 当True
时,将不返回缓存响应,而是调用实际端点。no-store
: Optional(bool) 当True
时,将不缓存响应。ttl
: Optional(int) - 将按用户定义的时间(秒)缓存响应。s-maxage
: Optional(int) 将只接受在用户定义范围(秒)内的缓存响应。
- No-Cache
- No-Store
- ttl
- s-maxage
no-cache
示例用法 - 当 True
时,将不返回缓存响应
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "hello who are you"
}
],
cache={"no-cache": True},
)
no-store
示例用法 - 当 True
时,将不缓存响应。
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "hello who are you"
}
],
cache={"no-store": True},
)
`ttl` 示例用法 - 缓存响应 10 秒
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "hello who are you"
}
],
cache={"ttl": 10},
)
`s-maxage` 示例用法 - 将只接受 60 秒内的缓存响应
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "hello who are you"
}
],
cache={"s-maxage": 60},
)
缓存上下文管理器 - 启用、禁用、更新缓存
使用上下文管理器可以轻松启用、禁用和更新 litellm 缓存
启用缓存
快速开始启用
litellm.enable_cache()
高级参数
litellm.enable_cache(
type: Optional[Literal["local", "redis", "s3", "disk"]] = "local",
host: Optional[str] = None,
port: Optional[str] = None,
password: Optional[str] = None,
supported_call_types: Optional[
List[Literal["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"]]
] = ["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"],
**kwargs,
)
禁用缓存
关闭缓存
litellm.disable_cache()
更新缓存参数 (Redis Host, Port 等)
更新缓存参数
litellm.update_cache(
type: Optional[Literal["local", "redis", "s3", "disk"]] = "local",
host: Optional[str] = None,
port: Optional[str] = None,
password: Optional[str] = None,
supported_call_types: Optional[
List[Literal["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"]]
] = ["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"],
**kwargs,
)
自定义缓存键:
定义函数以返回缓存键
# this function takes in *args, **kwargs and returns the key you want to use for caching
def custom_get_cache_key(*args, **kwargs):
# return key to use for your cache:
key = kwargs.get("model", "") + str(kwargs.get("messages", "")) + str(kwargs.get("temperature", "")) + str(kwargs.get("logit_bias", ""))
print("key for cache", key)
return key
设置您的函数为 litellm.cache.get_cache_key
from litellm.caching.caching import Cache
cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD'])
cache.get_cache_key = custom_get_cache_key # set get_cache_key function for your cache
litellm.cache = cache # set litellm.cache to your cache
如何编写自定义 add/get 缓存函数
1. 初始化缓存
from litellm.caching.caching import Cache
cache = Cache()
2. 定义自定义 add/get 缓存函数
def add_cache(self, result, *args, **kwargs):
your logic
def get_cache(self, *args, **kwargs):
your logic
3. 将缓存的 add/get 函数指向您的自定义 add/get 函数
cache.add_cache = add_cache
cache.get_cache = get_cache
缓存初始化参数
def __init__(
self,
type: Optional[Literal["local", "redis", "redis-semantic", "s3", "disk"]] = "local",
supported_call_types: Optional[
List[Literal["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"]]
] = ["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"],
ttl: Optional[float] = None,
default_in_memory_ttl: Optional[float] = None,
# redis cache params
host: Optional[str] = None,
port: Optional[str] = None,
password: Optional[str] = None,
namespace: Optional[str] = None,
default_in_redis_ttl: Optional[float] = None,
redis_flush_size=None,
# redis semantic cache params
similarity_threshold: Optional[float] = None,
redis_semantic_cache_embedding_model: str = "text-embedding-ada-002",
redis_semantic_cache_index_name: Optional[str] = None,
# s3 Bucket, boto3 configuration
s3_bucket_name: Optional[str] = None,
s3_region_name: Optional[str] = None,
s3_api_version: Optional[str] = None,
s3_path: Optional[str] = None, # if you wish to save to a specific path
s3_use_ssl: Optional[bool] = True,
s3_verify: Optional[Union[bool, str]] = None,
s3_endpoint_url: Optional[str] = None,
s3_aws_access_key_id: Optional[str] = None,
s3_aws_secret_access_key: Optional[str] = None,
s3_aws_session_token: Optional[str] = None,
s3_config: Optional[Any] = None,
# disk cache params
disk_cache_dir=None,
# qdrant cache params
qdrant_api_base: Optional[str] = None,
qdrant_api_key: Optional[str] = None,
qdrant_collection_name: Optional[str] = None,
qdrant_quantization_config: Optional[str] = None,
qdrant_semantic_cache_embedding_model="text-embedding-ada-002",
**kwargs
):
日志记录
缓存命中会作为 kwarg["cache_hit"]
记录在成功事件中。
这是一个访问它的示例
import litellm
from litellm.integrations.custom_logger import CustomLogger
from litellm import completion, acompletion, Cache
# create custom callback for success_events
class MyCustomHandler(CustomLogger):
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print(f"Value of Cache hit: {kwargs['cache_hit']"})
async def test_async_completion_azure_caching():
# set custom callback
customHandler_caching = MyCustomHandler()
litellm.callbacks = [customHandler_caching]
# init cache
litellm.cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD'])
unique_time = time.time()
response1 = await litellm.acompletion(model="azure/chatgpt-v-2",
messages=[{
"role": "user",
"content": f"Hi 👋 - i'm async azure {unique_time}"
}],
caching=True)
await asyncio.sleep(1)
print(f"customHandler_caching.states pre-cache hit: {customHandler_caching.states}")
response2 = await litellm.acompletion(model="azure/chatgpt-v-2",
messages=[{
"role": "user",
"content": f"Hi 👋 - i'm async azure {unique_time}"
}],
caching=True)
await asyncio.sleep(1) # success callbacks are done in parallel