跳到主要内容

路由器 - 负载均衡

LiteLLM 管理

  • 在多个部署(例如 Azure/OpenAI)之间进行负载均衡
  • 优先处理重要请求以确保它们不会失败(即排队)
  • 基本可靠性逻辑 - 在多个部署/提供商之间进行冷却、回退、超时和重试(固定 + 指数退避)。

在生产环境中,litellm 支持使用 Redis 来跟踪冷却服务器和使用情况(管理 tpm/rpm 限制)。

信息

如果您想要一个服务器来在不同的 LLM API 之间进行负载均衡,请使用我们的 LiteLLM 代理服务器

负载均衡

(鸣谢 @paulpierresweep proxy 为此实现的贡献) 查看代码

快速开始

在多个 azure/bedrock/提供商 部署之间进行负载均衡。如果调用失败,LiteLLM 将处理在不同区域的重试。

from litellm import Router

model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias -> loadbalance between models with same `model_name`
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}, {
"model_name": "gpt-4",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/gpt-4",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
}
}, {
"model_name": "gpt-4",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-4",
"api_key": os.getenv("OPENAI_API_KEY"),
}
},

]

router = Router(model_list=model_list)

# openai.ChatCompletion.create replacement
# requests with model="gpt-3.5-turbo" will pick a deployment where model_name="gpt-3.5-turbo"
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}])

print(response)

# openai.ChatCompletion.create replacement
# requests with model="gpt-4" will pick a deployment where model_name="gpt-4"
response = await router.acompletion(model="gpt-4",
messages=[{"role": "user", "content": "Hey, how's it going?"}])

print(response)

可用端点

  • router.completion() - 调用 100+ LLM 的聊天完成端点
  • router.acompletion() - 异步聊天完成调用
  • router.embedding() - Azure、OpenAI、Huggingface 端点的嵌入端点
  • router.aembedding() - 异步嵌入调用
  • router.text_completion() - 旧的 OpenAI /v1/completions 端点格式的完成调用
  • router.atext_completion() - 异步文本完成调用
  • router.image_generation() - OpenAI /v1/images/generations 端点格式的完成调用
  • router.aimage_generation() - 异步图像生成调用

高级 - 路由策略 ⭐️

路由策略 - 加权选择、速率限制感知、最少忙碌、基于延迟、基于成本

路由器提供 4 种策略,用于在多个部署之间路由您的调用

🎉 新增 这是基于使用情况路由的异步实现。

如果超出 tpm/rpm 限制,则过滤掉部署 - 如果您传入了部署的 tpm/rpm 限制。

路由到该分钟内 TPM 使用率最低的部署

在生产环境中,我们使用 Redis 跟踪多个部署的使用情况 (TPM/RPM)。此实现使用 异步 Redis 调用 (redis.incr 和 redis.mget)。

对于 Azure,每 1000 TPM 可获得 6 RPM

from litellm import Router 


model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 10000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 1000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
"tpm": 100000,
"rpm": 1000,
},
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing-v2" # 👈 KEY CHANGE
enable_pre_call_checks=True, # enables router rate limits for concurrent calls
)

response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]

print(response)

基本可靠性

加权部署

在部署上设置 weight 以使其比其他部署更频繁地被选中。

这适用于 simple-shuffle 路由策略(如果未选择路由策略,这是默认策略)。

from litellm import Router 

model_list = [
{
"model_name": "o1",
"litellm_params": {
"model": "o1-preview",
"api_key": os.getenv("OPENAI_API_KEY"),
"weight": 1
},
},
{
"model_name": "o1",
"litellm_params": {
"model": "o1-preview",
"api_key": os.getenv("OPENAI_API_KEY"),
"weight": 2 # 👈 PICK THIS DEPLOYMENT 2x MORE OFTEN THAN o1-preview
},
},
]

router = Router(model_list=model_list, routing_strategy="cost-based-routing")

response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)

最大并行请求数 (异步)

用于路由器上异步请求的信号量。限制对部署的最大并发调用数。在高流量场景下很有用。

如果设置了 tpm/rpm,但未给出最大并行请求限制,我们将使用 RPM 或计算出的 RPM (tpm/1000/6) 作为最大并行请求限制。

from litellm import Router 

model_list = [{
"model_name": "gpt-4",
"litellm_params": {
"model": "azure/gpt-4",
...
"max_parallel_requests": 10 # 👈 SET PER DEPLOYMENT
}
}]

### OR ###

router = Router(model_list=model_list, default_max_parallel_requests=20) # 👈 SET DEFAULT MAX PARALLEL REQUESTS


# deployment max parallel requests > default max parallel requests

查看代码

冷却

设置一个模型在被冷却一分钟之前,一分钟内允许失败的调用次数限制。

from litellm import Router

model_list = [{...}]

router = Router(model_list=model_list,
allowed_fails=1, # cooldown model if it fails > 1 call in a minute.
cooldown_time=100 # cooldown the deployment for 100 seconds if it num_fails > allowed_fails
)

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]

# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")

预期响应

No deployments available for selected model, Try again in 60 seconds. Passed model=claude-3-5-sonnet. pre-call-checks=False, allowed_model_region=n/a.

禁用冷却

from litellm import Router 


router = Router(..., disable_cooldowns=True)

重试

对于异步和同步函数,我们都支持重试失败的请求。

对于 RateLimitError,我们实现指数退避

对于一般错误,我们立即重试

下面快速看一下如何设置 num_retries = 3

from litellm import Router

model_list = [{...}]

router = Router(model_list=model_list,
num_retries=3)

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]

# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")

我们还支持设置重试失败请求之前的最小等待时间。这通过 retry_after 参数实现。

from litellm import Router

model_list = [{...}]

router = Router(model_list=model_list,
num_retries=3, retry_after=5) # waits min 5s before retrying request

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]

# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")

[高级]: 基于错误类型的自定义重试、冷却

  • 如果您想根据接收到的异常设置 num_retries,请使用 RetryPolicy
  • 使用 AllowedFailsPolicy 设置在冷却部署之前,每分钟允许的自定义 allowed_fails 次数

查看所有异常类型

示例

retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3, # run 3 retries for ContentPolicyViolationErrors
AuthenticationErrorRetries=0, # run 0 retries for AuthenticationErrorRetries
)

allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000, # Allow 1000 ContentPolicyViolationError before cooling down a deployment
RateLimitErrorAllowedFails=100, # Allow 100 RateLimitErrors before cooling down a deployment
)

用法示例

from litellm.router import RetryPolicy, AllowedFailsPolicy

retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3, # run 3 retries for ContentPolicyViolationErrors
AuthenticationErrorRetries=0, # run 0 retries for AuthenticationErrorRetries
BadRequestErrorRetries=1,
TimeoutErrorRetries=2,
RateLimitErrorRetries=3,
)

allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000, # Allow 1000 ContentPolicyViolationError before cooling down a deployment
RateLimitErrorAllowedFails=100, # Allow 100 RateLimitErrors before cooling down a deployment
)

router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
{
"model_name": "bad-model", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
],
retry_policy=retry_policy,
allowed_fails_policy=allowed_fails_policy,
)

response = await router.acompletion(
model=model,
messages=messages,
)

缓存

在生产环境中,我们建议使用 Redis 缓存。为了在本地快速测试,我们也支持简单的内存缓存。

内存缓存

router = Router(model_list=model_list, 
cache_responses=True)

print(response)

Redis 缓存

router = Router(model_list=model_list, 
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=os.getenv("REDIS_PORT"),
cache_responses=True)

print(response)

传入 Redis URL,附加 kwargs

router = Router(model_list: Optional[list] = None,
## CACHING ##
redis_url=os.getenv("REDIS_URL")",
cache_kwargs= {}, # additional kwargs to pass to RedisCache (see caching.py)
cache_responses=True)

预调用检查(上下文窗口、欧盟区域)

启用预调用检查以过滤掉

  1. 上下文窗口限制小于调用消息数的部署。
  2. 欧盟区域之外的部署

1. 启用预调用检查

from litellm import Router 
# ...
router = Router(model_list=model_list, enable_pre_call_checks=True) # 👈 Set to True

2. 设置模型列表

对于 Azure 部署的上下文窗口检查,设置基础模型。从此列表中选择基础模型,所有 Azure 模型都以 azure/ 开头。

对于“欧盟区域”过滤,设置部署的“region_name”。

注意: 我们会根据您的 litellm 参数自动推断 Vertex AI、Bedrock 和 IBM WatsonxAI 的 region_name。对于 Azure,设置 litellm.enable_preview = True

查看代码

model_list = [
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name": "eu" # 👈 SET 'EU' REGION NAME
"base_model": "azure/gpt-35-turbo", # 👈 (Azure-only) SET BASE MODEL
},
},
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "gemini-pro",
"litellm_params: {
"model": "vertex_ai/gemini-pro-1.5",
"vertex_project": "adroit-crow-1234",
"vertex_location": "us-east1" # 👈 AUTOMATICALLY INFERS 'region_name'
}
}
]

router = Router(model_list=model_list, enable_pre_call_checks=True)

3. 测试!

"""
- Give a gpt-3.5-turbo model group with different context windows (4k vs. 16k)
- Send a 5k prompt
- Assert it works
"""
from litellm import Router
import os

model_list = [
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
},
"model_info": {
"base_model": "azure/gpt-35-turbo",
}
},
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]

router = Router(model_list=model_list, enable_pre_call_checks=True)

text = "What is the meaning of 42?" * 5000

response = router.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": text},
{"role": "user", "content": "Who was Alexander?"},
],
)

print(f"response: {response}")

跨模型组缓存

如果您想在 2 个不同的模型组(例如 Azure 部署和 OpenAI)之间进行缓存,请使用缓存组。

import litellm, asyncio, time
from litellm import Router

# set os env
os.environ["OPENAI_API_KEY"] = ""
os.environ["AZURE_API_KEY"] = ""
os.environ["AZURE_API_BASE"] = ""
os.environ["AZURE_API_VERSION"] = ""

async def test_acompletion_caching_on_router_caching_groups():
# tests acompletion + caching on router
try:
litellm.set_verbose = True
model_list = [
{
"model_name": "openai-gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-0613",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "azure-gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION")
},
}
]

messages = [
{"role": "user", "content": f"write a one sentence poem {time.time()}?"}
]
start_time = time.time()
router = Router(model_list=model_list,
cache_responses=True,
caching_groups=[("openai-gpt-3.5-turbo", "azure-gpt-3.5-turbo")])
response1 = await router.acompletion(model="openai-gpt-3.5-turbo", messages=messages, temperature=1)
print(f"response1: {response1}")
await asyncio.sleep(1) # add cache is async, async sleep for cache to get set
response2 = await router.acompletion(model="azure-gpt-3.5-turbo", messages=messages, temperature=1)
assert response1.id == response2.id
assert len(response1.choices[0].message.content) > 0
assert response1.choices[0].message.content == response2.choices[0].message.content
except Exception as e:
traceback.print_exc()

asyncio.run(test_acompletion_caching_on_router_caching_groups())

警报 🚨

将以下事件的警报发送到 slack / 您的 webhook url

  • LLM API 异常
  • 慢速 LLM 响应

https://api.slack.com/messaging/webhooks 获取 slack webhook url

用法

初始化 AlertingConfig 并将其传递给 litellm.Router。以下代码将触发警报,因为 api_key=bad-key 无效

from litellm.router import AlertingConfig
import litellm
import os

router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "bad_key",
},
}
],
alerting_config= AlertingConfig(
alerting_threshold=10, # threshold for slow / hanging llm responses (in seconds). Defaults to 300 seconds
webhook_url= os.getenv("SLACK_WEBHOOK_URL") # webhook you want to send alerts to
),
)
try:
await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
except:
pass

跟踪 Azure 部署成本

问题:使用 azure/gpt-4-1106-preview 时,Azure 在响应中返回 gpt-4。这导致成本跟踪不准确

解决方案 ✅ :在您的路由器初始化时设置 model_info["base_model"],以便 litellm 使用正确的模型计算 Azure 成本

步骤 1. 路由器设置

from litellm import Router

model_list = [
{ # list of model deployments
"model_name": "gpt-4-preview", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-1106-preview" # azure/gpt-4-1106-preview will be used for cost tracking, ensure this exists in litellm model_prices_and_context_window.json
}
},
{
"model_name": "gpt-4-32k",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-32k" # azure/gpt-4-32k will be used for cost tracking, ensure this exists in litellm model_prices_and_context_window.json
}
}
]

router = Router(model_list=model_list)

步骤 2. 在自定义回调中访问 response_costlitellm 会为您计算响应成本

import litellm
from litellm.integrations.custom_logger import CustomLogger

class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
response_cost = kwargs.get("response_cost")
print("response_cost=", response_cost)

customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]

# router completion call
response = router.completion(
model="gpt-4-32k",
messages=[{ "role": "user", "content": "Hi who are you"}]
)

默认 litellm.completion/embedding 参数

您还可以为 litellm 完成/嵌入调用设置默认参数。方法如下

from litellm import Router

fallback_dict = {"gpt-3.5-turbo": "gpt-3.5-turbo-16k"}

router = Router(model_list=model_list,
default_litellm_params={"context_window_fallback_dict": fallback_dict})

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]

# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")

自定义回调 - 跟踪 API 密钥、API 端点、使用的模型

如果您需要跟踪每次完成调用使用的 api_key、api 端点、模型、custom_llm_provider,您可以设置一个自定义回调

用法

import litellm
from litellm.integrations.custom_logger import CustomLogger

class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print("kwargs=", kwargs)
litellm_params= kwargs.get("litellm_params")
api_key = litellm_params.get("api_key")
api_base = litellm_params.get("api_base")
custom_llm_provider= litellm_params.get("custom_llm_provider")
response_cost = kwargs.get("response_cost")

# print the values
print("api_key=", api_key)
print("api_base=", api_base)
print("custom_llm_provider=", custom_llm_provider)
print("response_cost=", response_cost)

def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Failure")
print("kwargs=")

customHandler = MyCustomHandler()

litellm.callbacks = [customHandler]

# Init Router
router = Router(model_list=model_list, routing_strategy="simple-shuffle")

# router completion call
response = router.completion(
model="gpt-3.5-turbo",
messages=[{ "role": "user", "content": "Hi who are you"}]
)

部署路由器

如果您想要一个服务器来在不同的 LLM API 之间进行负载均衡,请使用我们的 LiteLLM 代理服务器

调试路由器

基本调试

设置 Router(set_verbose=True)

from litellm import Router

router = Router(
model_list=model_list,
set_verbose=True
)

详细调试

设置 Router(set_verbose=True,debug_level="DEBUG")

from litellm import Router

router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG" # defaults to INFO
)

非常详细的调试

设置 litellm.set_verbose=TrueRouter(set_verbose=True,debug_level="DEBUG")

from litellm import Router
import litellm

litellm.set_verbose = True

router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG" # defaults to INFO
)

路由器通用设置

用法

router = Router(model_list=..., router_general_settings=RouterGeneralSettings(async_only_mode=True))

规格

class RouterGeneralSettings(BaseModel):
async_only_mode: bool = Field(
default=False
) # this will only initialize async clients. Good for memory utils
pass_through_all_models: bool = Field(
default=False
) # if passed a model not llm_router model list, pass through the request to litellm.acompletion/embedding