路由器 - 负载均衡
LiteLLM 管理
- 在多个部署(例如 Azure/OpenAI)之间进行负载均衡
- 优先处理重要请求以确保它们不会失败(即排队)
- 基本可靠性逻辑 - 在多个部署/提供商之间进行冷却、回退、超时和重试(固定 + 指数退避)。
在生产环境中,litellm 支持使用 Redis 来跟踪冷却服务器和使用情况(管理 tpm/rpm 限制)。
如果您想要一个服务器来在不同的 LLM API 之间进行负载均衡,请使用我们的 LiteLLM 代理服务器
负载均衡
(鸣谢 @paulpierre 和 sweep proxy 为此实现的贡献) 查看代码
快速开始
在多个 azure/bedrock/提供商 部署之间进行负载均衡。如果调用失败,LiteLLM 将处理在不同区域的重试。
- SDK
- 代理
from litellm import Router
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias -> loadbalance between models with same `model_name`
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}, {
"model_name": "gpt-4",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/gpt-4",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
}
}, {
"model_name": "gpt-4",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-4",
"api_key": os.getenv("OPENAI_API_KEY"),
}
},
]
router = Router(model_list=model_list)
# openai.ChatCompletion.create replacement
# requests with model="gpt-3.5-turbo" will pick a deployment where model_name="gpt-3.5-turbo"
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}])
print(response)
# openai.ChatCompletion.create replacement
# requests with model="gpt-4" will pick a deployment where model_name="gpt-4"
response = await router.acompletion(model="gpt-4",
messages=[{"role": "user", "content": "Hey, how's it going?"}])
print(response)
查看详细的代理负载均衡/回退文档此处
- 使用多个部署设置 model_list
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/<your-deployment-name>
api_base: <your-azure-endpoint>
api_key: <your-azure-api-key>
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/gpt-turbo-small-ca
api_base: https://my-endpoint-canada-berri992.openai.azure.com/
api_key: <your-azure-api-key>
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/gpt-turbo-large
api_base: https://openai-france-1234.openai.azure.com/
api_key: <your-azure-api-key>
- 启动代理
litellm --config /path/to/config.yaml
- 测试!
curl -X POST 'http://0.0.0.0:4000/chat/completions' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer sk-1234' \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hi there!"}
],
"mock_testing_rate_limit_error": true
}'
可用端点
router.completion()
- 调用 100+ LLM 的聊天完成端点router.acompletion()
- 异步聊天完成调用router.embedding()
- Azure、OpenAI、Huggingface 端点的嵌入端点router.aembedding()
- 异步嵌入调用router.text_completion()
- 旧的 OpenAI/v1/completions
端点格式的完成调用router.atext_completion()
- 异步文本完成调用router.image_generation()
- OpenAI/v1/images/generations
端点格式的完成调用router.aimage_generation()
- 异步图像生成调用
高级 - 路由策略 ⭐️
路由策略 - 加权选择、速率限制感知、最少忙碌、基于延迟、基于成本
路由器提供 4 种策略,用于在多个部署之间路由您的调用
- 速率限制感知 v2 (异步)
- 基于延迟
- (默认) 加权选择 (异步)
- 速率限制感知
- 最少忙碌
- 自定义路由策略
- 最低成本路由 (异步)
🎉 新增 这是基于使用情况路由的异步实现。
如果超出 tpm/rpm 限制,则过滤掉部署 - 如果您传入了部署的 tpm/rpm 限制。
路由到该分钟内 TPM 使用率最低的部署。
在生产环境中,我们使用 Redis 跟踪多个部署的使用情况 (TPM/RPM)。此实现使用 异步 Redis 调用 (redis.incr 和 redis.mget)。
对于 Azure,每 1000 TPM 可获得 6 RPM
- sdk
- 代理
from litellm import Router
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 10000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 1000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
"tpm": 100000,
"rpm": 1000,
},
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing-v2" # 👈 KEY CHANGE
enable_pre_call_checks=True, # enables router rate limits for concurrent calls
)
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
print(response)
1. 在配置中设置策略
model_list:
- model_name: gpt-3.5-turbo # model alias
litellm_params: # params for litellm completion/embedding call
model: azure/chatgpt-v-2 # actual model name
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
tpm: 100000
rpm: 10000
- model_name: gpt-3.5-turbo
litellm_params: # params for litellm completion/embedding call
model: gpt-3.5-turbo
api_key: os.getenv(OPENAI_API_KEY)
tpm: 100000
rpm: 1000
router_settings:
routing_strategy: usage-based-routing-v2 # 👈 KEY CHANGE
redis_host: <your-redis-host>
redis_password: <your-redis-password>
redis_port: <your-redis-port>
enable_pre_call_check: true
general_settings:
master_key: sk-1234
2. 启动代理
litellm --config /path/to/config.yaml
3. 测试!
curl --location 'http://localhost:4000/v1/chat/completions' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-1234' \
--data '{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hey, how's it going?"}]
}'
选择响应时间最低的部署。
它缓存并根据请求从部署发送和接收的时间更新部署的响应时间。
from litellm import Router
import asyncio
model_list = [{ ... }]
# init router
router = Router(model_list=model_list,
routing_strategy="latency-based-routing",# 👈 set routing strategy
enable_pre_call_check=True, # enables router rate limits for concurrent calls
)
## CALL 1+2
tasks = []
response = None
final_response = None
for _ in range(2):
tasks.append(router.acompletion(model=model, messages=messages))
response = await asyncio.gather(*tasks)
if response is not None:
## CALL 3
await asyncio.sleep(1) # let the cache update happen
picked_deployment = router.lowestlatency_logger.get_available_deployments(
model_group=model, healthy_deployments=router.healthy_deployments
)
final_response = await router.acompletion(model=model, messages=messages)
print(f"min deployment id: {picked_deployment}")
print(f"model id: {final_response._hidden_params['model_id']}")
assert (
final_response._hidden_params["model_id"]
== picked_deployment["model_info"]["id"]
)
设置时间窗口
设置时间窗口,用于在计算部署平均延迟时考虑多久之前的数据。
在路由器中
router = Router(..., routing_strategy_args={"ttl": 10})
在代理中
router_settings:
routing_strategy_args: {"ttl": 10}
设置最低延迟缓冲区
设置一个缓冲区,在此缓冲区内的部署都是可供调用选择的候选。
例如
如果您有 5 个部署
https://litellm-prod-1.openai.azure.com/: 0.07s
https://litellm-prod-2.openai.azure.com/: 0.1s
https://litellm-prod-3.openai.azure.com/: 0.1s
https://litellm-prod-4.openai.azure.com/: 0.1s
https://litellm-prod-5.openai.azure.com/: 4.66s
为了防止最初所有请求都压垮 prod-1
,我们可以设置一个 50% 的缓冲区,以考虑部署 prod-2, prod-3, prod-4
。
在路由器中
router = Router(..., routing_strategy_args={"lowest_latency_buffer": 0.5})
在代理中
router_settings:
routing_strategy_args: {"lowest_latency_buffer": 0.5}
默认 根据提供的 每分钟请求数 (rpm) 或每分钟令牌数 (tpm) 选择部署
如果未提供 rpm
或 tpm
,它将随机选择一个部署
您还可以设置 weight
参数,指定何时应选择哪个模型。
- 基于 RPM 的混洗
- 基于权重的混洗
LiteLLM 代理 Config.yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
rpm: 900
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-functioncalling
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
rpm: 10
Python SDK
from litellm import Router
import asyncio
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm": 900, # requests per minute for this API
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm": 10,
}
},]
# init router
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
LiteLLM 代理 Config.yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
weight: 9
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-functioncalling
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
weight: 1
Python SDK
from litellm import Router
import asyncio
model_list = [{
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": {
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"weight": 9, # pick this 90% of the time
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"weight": 1,
}
}]
# init router
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
这将路由到该分钟内 TPM 使用率最低的部署。
在生产环境中,我们使用 Redis 跟踪多个部署的使用情况 (TPM/RPM)。
如果您传入了部署的 tpm/rpm 限制,这也将对照这些限制进行检查,并过滤掉任何将超出限制的部署。
对于 Azure,您的 RPM = TPM/6。
from litellm import Router
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 100000,
"rpm": 10000,
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 100000,
"rpm": 1000,
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 100000,
"rpm": 1000,
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing"
enable_pre_call_check=True, # enables router rate limits for concurrent calls
)
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
print(response)
选择正在处理的进行中调用数量最少的部署。
from litellm import Router
import asyncio
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}]
# init router
router = Router(model_list=model_list, routing_strategy="least-busy")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
插入自定义路由策略以选择部署
步骤 1. 定义您的自定义路由策略
from litellm.router import CustomRoutingStrategyBase
class CustomRoutingStrategy(CustomRoutingStrategyBase):
async def async_get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Asynchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
print("In CUSTOM async get available deployment")
model_list = router.model_list
print("router model list=", model_list)
for model in model_list:
if isinstance(model, dict):
if model["litellm_params"]["model"] == "openai/very-special-endpoint":
return model
pass
def get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Synchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
pass
步骤 2. 使用自定义路由策略初始化路由器
from litellm import Router
router = Router(
model_list=[
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/very-special-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/", # If you are Krrish, this is OpenAI Endpoint3 on our Railway endpoint :)
"api_key": "fake-key",
},
"model_info": {"id": "very-special-endpoint"},
},
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/fast-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
"api_key": "fake-key",
},
"model_info": {"id": "fast-endpoint"},
},
],
set_verbose=True,
debug_level="DEBUG",
timeout=1,
) # type: ignore
router.set_custom_routing_strategy(CustomRoutingStrategy()) # 👈 Set your routing strategy here
步骤 3. 测试您的路由策略。在运行 router.acompletion
请求时,预期您的自定义路由策略将被调用
for _ in range(10):
response = await router.acompletion(
model="azure-model", messages=[{"role": "user", "content": "hello"}]
)
print(response)
_picked_model_id = response._hidden_params["model_id"]
print("picked model=", _picked_model_id)
选择成本最低的部署
工作原理
- 获取所有健康的部署
- 选择所有未超出其提供的
rpm/tpm
限制的部署 - 对于每个部署,检查
litellm_param["model"]
是否存在于litellm_model_cost_map
中- 如果部署不存在于
litellm_model_cost_map
中 -> 使用 deployment_cost=$1
- 如果部署不存在于
- 选择成本最低的部署
from litellm import Router
import asyncio
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-4"},
"model_info": {"id": "openai-gpt-4"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "groq/llama3-8b-8192"},
"model_info": {"id": "groq-llama"},
},
]
# init router
router = Router(model_list=model_list, routing_strategy="cost-based-routing")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
print(response._hidden_params["model_id"]) # expect groq-llama, since groq/llama has lowest cost
return response
asyncio.run(router_acompletion())
使用自定义输入/输出定价
设置 litellm_params["input_cost_per_token"]
和 litellm_params["output_cost_per_token"]
以在使用自定义定价进行路由时
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"input_cost_per_token": 0.00003,
"output_cost_per_token": 0.00003,
},
"model_info": {"id": "chatgpt-v-experimental"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-1",
"input_cost_per_token": 0.000000001,
"output_cost_per_token": 0.00000001,
},
"model_info": {"id": "chatgpt-v-1"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-5",
"input_cost_per_token": 10,
"output_cost_per_token": 12,
},
"model_info": {"id": "chatgpt-v-5"},
},
]
# init router
router = Router(model_list=model_list, routing_strategy="cost-based-routing")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
print(response._hidden_params["model_id"]) # expect chatgpt-v-1, since chatgpt-v-1 has lowest cost
return response
asyncio.run(router_acompletion())
基本可靠性
加权部署
在部署上设置 weight
以使其比其他部署更频繁地被选中。
这适用于 simple-shuffle 路由策略(如果未选择路由策略,这是默认策略)。
- SDK
- 代理
from litellm import Router
model_list = [
{
"model_name": "o1",
"litellm_params": {
"model": "o1-preview",
"api_key": os.getenv("OPENAI_API_KEY"),
"weight": 1
},
},
{
"model_name": "o1",
"litellm_params": {
"model": "o1-preview",
"api_key": os.getenv("OPENAI_API_KEY"),
"weight": 2 # 👈 PICK THIS DEPLOYMENT 2x MORE OFTEN THAN o1-preview
},
},
]
router = Router(model_list=model_list, routing_strategy="cost-based-routing")
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
model_list:
- model_name: o1
litellm_params:
model: o1
api_key: os.environ/OPENAI_API_KEY
weight: 1
- model_name: o1
litellm_params:
model: o1-preview
api_key: os.environ/OPENAI_API_KEY
weight: 2 # 👈 PICK THIS DEPLOYMENT 2x MORE OFTEN THAN o1-preview
最大并行请求数 (异步)
用于路由器上异步请求的信号量。限制对部署的最大并发调用数。在高流量场景下很有用。
如果设置了 tpm/rpm,但未给出最大并行请求限制,我们将使用 RPM 或计算出的 RPM (tpm/1000/6) 作为最大并行请求限制。
from litellm import Router
model_list = [{
"model_name": "gpt-4",
"litellm_params": {
"model": "azure/gpt-4",
...
"max_parallel_requests": 10 # 👈 SET PER DEPLOYMENT
}
}]
### OR ###
router = Router(model_list=model_list, default_max_parallel_requests=20) # 👈 SET DEFAULT MAX PARALLEL REQUESTS
# deployment max parallel requests > default max parallel requests
冷却
设置一个模型在被冷却一分钟之前,一分钟内允许失败的调用次数限制。
- SDK
- 代理
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
allowed_fails=1, # cooldown model if it fails > 1 call in a minute.
cooldown_time=100 # cooldown the deployment for 100 seconds if it num_fails > allowed_fails
)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
设置全局值
router_settings:
allowed_fails: 3 # cooldown model if it fails > 1 call in a minute.
cooldown_time: 30 # (in seconds) how long to cooldown model if fails/min > allowed_fails
默认值
- allowed_fails: 3
- cooldown_time: 5 秒 (constants.py 中的
DEFAULT_COOLDOWN_TIME_SECONDS
)
按模型设置
model_list:
- model_name: fake-openai-endpoint
litellm_params:
model: predibase/llama-3-8b-instruct
api_key: os.environ/PREDIBASE_API_KEY
tenant_id: os.environ/PREDIBASE_TENANT_ID
max_new_tokens: 256
cooldown_time: 0 # 👈 KEY CHANGE
预期响应
No deployments available for selected model, Try again in 60 seconds. Passed model=claude-3-5-sonnet. pre-call-checks=False, allowed_model_region=n/a.
禁用冷却
- SDK
- 代理
from litellm import Router
router = Router(..., disable_cooldowns=True)
router_settings:
disable_cooldowns: True
重试
对于异步和同步函数,我们都支持重试失败的请求。
对于 RateLimitError,我们实现指数退避
对于一般错误,我们立即重试
下面快速看一下如何设置 num_retries = 3
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
num_retries=3)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
我们还支持设置重试失败请求之前的最小等待时间。这通过 retry_after
参数实现。
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
num_retries=3, retry_after=5) # waits min 5s before retrying request
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
[高级]: 基于错误类型的自定义重试、冷却
- 如果您想根据接收到的异常设置
num_retries
,请使用RetryPolicy
- 使用
AllowedFailsPolicy
设置在冷却部署之前,每分钟允许的自定义allowed_fails
次数
- SDK
- 代理
示例
retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3, # run 3 retries for ContentPolicyViolationErrors
AuthenticationErrorRetries=0, # run 0 retries for AuthenticationErrorRetries
)
allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000, # Allow 1000 ContentPolicyViolationError before cooling down a deployment
RateLimitErrorAllowedFails=100, # Allow 100 RateLimitErrors before cooling down a deployment
)
用法示例
from litellm.router import RetryPolicy, AllowedFailsPolicy
retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3, # run 3 retries for ContentPolicyViolationErrors
AuthenticationErrorRetries=0, # run 0 retries for AuthenticationErrorRetries
BadRequestErrorRetries=1,
TimeoutErrorRetries=2,
RateLimitErrorRetries=3,
)
allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000, # Allow 1000 ContentPolicyViolationError before cooling down a deployment
RateLimitErrorAllowedFails=100, # Allow 100 RateLimitErrors before cooling down a deployment
)
router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
{
"model_name": "bad-model", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
],
retry_policy=retry_policy,
allowed_fails_policy=allowed_fails_policy,
)
response = await router.acompletion(
model=model,
messages=messages,
)
router_settings:
retry_policy: {
"BadRequestErrorRetries": 3,
"ContentPolicyViolationErrorRetries": 4
}
allowed_fails_policy: {
"ContentPolicyViolationErrorAllowedFails": 1000, # Allow 1000 ContentPolicyViolationError before cooling down a deployment
"RateLimitErrorAllowedFails": 100 # Allow 100 RateLimitErrors before cooling down a deployment
}
缓存
在生产环境中,我们建议使用 Redis 缓存。为了在本地快速测试,我们也支持简单的内存缓存。
内存缓存
router = Router(model_list=model_list,
cache_responses=True)
print(response)
Redis 缓存
router = Router(model_list=model_list,
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=os.getenv("REDIS_PORT"),
cache_responses=True)
print(response)
传入 Redis URL,附加 kwargs
router = Router(model_list: Optional[list] = None,
## CACHING ##
redis_url=os.getenv("REDIS_URL")",
cache_kwargs= {}, # additional kwargs to pass to RedisCache (see caching.py)
cache_responses=True)
预调用检查(上下文窗口、欧盟区域)
启用预调用检查以过滤掉
- 上下文窗口限制小于调用消息数的部署。
- 欧盟区域之外的部署
- SDK
- 代理
1. 启用预调用检查
from litellm import Router
# ...
router = Router(model_list=model_list, enable_pre_call_checks=True) # 👈 Set to True
2. 设置模型列表
对于 Azure 部署的上下文窗口检查,设置基础模型。从此列表中选择基础模型,所有 Azure 模型都以 azure/
开头。
对于“欧盟区域”过滤,设置部署的“region_name”。
注意: 我们会根据您的 litellm 参数自动推断 Vertex AI、Bedrock 和 IBM WatsonxAI 的 region_name。对于 Azure,设置 litellm.enable_preview = True
。
model_list = [
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name": "eu" # 👈 SET 'EU' REGION NAME
"base_model": "azure/gpt-35-turbo", # 👈 (Azure-only) SET BASE MODEL
},
},
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "gemini-pro",
"litellm_params: {
"model": "vertex_ai/gemini-pro-1.5",
"vertex_project": "adroit-crow-1234",
"vertex_location": "us-east1" # 👈 AUTOMATICALLY INFERS 'region_name'
}
}
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
3. 测试!
- 上下文窗口检查
- 欧盟区域检查
"""
- Give a gpt-3.5-turbo model group with different context windows (4k vs. 16k)
- Send a 5k prompt
- Assert it works
"""
from litellm import Router
import os
model_list = [
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
},
"model_info": {
"base_model": "azure/gpt-35-turbo",
}
},
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
text = "What is the meaning of 42?" * 5000
response = router.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": text},
{"role": "user", "content": "Who was Alexander?"},
],
)
print(f"response: {response}")
"""
- Give 2 gpt-3.5-turbo deployments, in eu + non-eu regions
- Make a call
- Assert it picks the eu-region model
"""
from litellm import Router
import os
model_list = [
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name": "eu"
},
"model_info": {
"id": "1"
}
},
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"model_info": {
"id": "2"
}
},
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
response = router.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Who was Alexander?"}],
)
print(f"response: {response}")
print(f"response id: {response._hidden_params['model_id']}")
转到此处了解如何在代理上执行此操作
跨模型组缓存
如果您想在 2 个不同的模型组(例如 Azure 部署和 OpenAI)之间进行缓存,请使用缓存组。
import litellm, asyncio, time
from litellm import Router
# set os env
os.environ["OPENAI_API_KEY"] = ""
os.environ["AZURE_API_KEY"] = ""
os.environ["AZURE_API_BASE"] = ""
os.environ["AZURE_API_VERSION"] = ""
async def test_acompletion_caching_on_router_caching_groups():
# tests acompletion + caching on router
try:
litellm.set_verbose = True
model_list = [
{
"model_name": "openai-gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-0613",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "azure-gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION")
},
}
]
messages = [
{"role": "user", "content": f"write a one sentence poem {time.time()}?"}
]
start_time = time.time()
router = Router(model_list=model_list,
cache_responses=True,
caching_groups=[("openai-gpt-3.5-turbo", "azure-gpt-3.5-turbo")])
response1 = await router.acompletion(model="openai-gpt-3.5-turbo", messages=messages, temperature=1)
print(f"response1: {response1}")
await asyncio.sleep(1) # add cache is async, async sleep for cache to get set
response2 = await router.acompletion(model="azure-gpt-3.5-turbo", messages=messages, temperature=1)
assert response1.id == response2.id
assert len(response1.choices[0].message.content) > 0
assert response1.choices[0].message.content == response2.choices[0].message.content
except Exception as e:
traceback.print_exc()
asyncio.run(test_acompletion_caching_on_router_caching_groups())
警报 🚨
将以下事件的警报发送到 slack / 您的 webhook url
- LLM API 异常
- 慢速 LLM 响应
从 https://api.slack.com/messaging/webhooks 获取 slack webhook url
用法
初始化 AlertingConfig
并将其传递给 litellm.Router
。以下代码将触发警报,因为 api_key=bad-key
无效
from litellm.router import AlertingConfig
import litellm
import os
router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "bad_key",
},
}
],
alerting_config= AlertingConfig(
alerting_threshold=10, # threshold for slow / hanging llm responses (in seconds). Defaults to 300 seconds
webhook_url= os.getenv("SLACK_WEBHOOK_URL") # webhook you want to send alerts to
),
)
try:
await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
except:
pass
跟踪 Azure 部署成本
问题:使用 azure/gpt-4-1106-preview
时,Azure 在响应中返回 gpt-4
。这导致成本跟踪不准确
解决方案 ✅ :在您的路由器初始化时设置 model_info["base_model"]
,以便 litellm 使用正确的模型计算 Azure 成本
步骤 1. 路由器设置
from litellm import Router
model_list = [
{ # list of model deployments
"model_name": "gpt-4-preview", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-1106-preview" # azure/gpt-4-1106-preview will be used for cost tracking, ensure this exists in litellm model_prices_and_context_window.json
}
},
{
"model_name": "gpt-4-32k",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-32k" # azure/gpt-4-32k will be used for cost tracking, ensure this exists in litellm model_prices_and_context_window.json
}
}
]
router = Router(model_list=model_list)
步骤 2. 在自定义回调中访问 response_cost
,litellm 会为您计算响应成本
import litellm
from litellm.integrations.custom_logger import CustomLogger
class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
response_cost = kwargs.get("response_cost")
print("response_cost=", response_cost)
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
# router completion call
response = router.completion(
model="gpt-4-32k",
messages=[{ "role": "user", "content": "Hi who are you"}]
)
默认 litellm.completion/embedding 参数
您还可以为 litellm 完成/嵌入调用设置默认参数。方法如下
from litellm import Router
fallback_dict = {"gpt-3.5-turbo": "gpt-3.5-turbo-16k"}
router = Router(model_list=model_list,
default_litellm_params={"context_window_fallback_dict": fallback_dict})
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
自定义回调 - 跟踪 API 密钥、API 端点、使用的模型
如果您需要跟踪每次完成调用使用的 api_key、api 端点、模型、custom_llm_provider,您可以设置一个自定义回调
用法
import litellm
from litellm.integrations.custom_logger import CustomLogger
class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print("kwargs=", kwargs)
litellm_params= kwargs.get("litellm_params")
api_key = litellm_params.get("api_key")
api_base = litellm_params.get("api_base")
custom_llm_provider= litellm_params.get("custom_llm_provider")
response_cost = kwargs.get("response_cost")
# print the values
print("api_key=", api_key)
print("api_base=", api_base)
print("custom_llm_provider=", custom_llm_provider)
print("response_cost=", response_cost)
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Failure")
print("kwargs=")
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
# Init Router
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
# router completion call
response = router.completion(
model="gpt-3.5-turbo",
messages=[{ "role": "user", "content": "Hi who are you"}]
)
部署路由器
如果您想要一个服务器来在不同的 LLM API 之间进行负载均衡,请使用我们的 LiteLLM 代理服务器
调试路由器
基本调试
设置 Router(set_verbose=True)
from litellm import Router
router = Router(
model_list=model_list,
set_verbose=True
)
详细调试
设置 Router(set_verbose=True,debug_level="DEBUG")
from litellm import Router
router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG" # defaults to INFO
)
非常详细的调试
设置 litellm.set_verbose=True
和 Router(set_verbose=True,debug_level="DEBUG")
from litellm import Router
import litellm
litellm.set_verbose = True
router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG" # defaults to INFO
)
路由器通用设置
用法
router = Router(model_list=..., router_general_settings=RouterGeneralSettings(async_only_mode=True))
规格
class RouterGeneralSettings(BaseModel):
async_only_mode: bool = Field(
default=False
) # this will only initialize async clients. Good for memory utils
pass_through_all_models: bool = Field(
default=False
) # if passed a model not llm_router model list, pass through the request to litellm.acompletion/embedding