自定义回调
信息
对于 PROXY 前往此处
回调类
你可以创建一个自定义回调类,以精确记录 litellm 中发生的事件。
import litellm
from litellm.integrations.custom_logger import CustomLogger
from litellm import completion, acompletion
class MyCustomHandler(CustomLogger):
def log_pre_api_call(self, model, messages, kwargs):
print(f"Pre-API Call")
def log_post_api_call(self, kwargs, response_obj, start_time, end_time):
print(f"Post-API Call")
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Failure")
#### ASYNC #### - for acompletion/aembeddings
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Async Success")
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Async Failure")
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
## sync
response = completion(model="gpt-3.5-turbo", messages=[{ "role": "user", "content": "Hi 👋 - i'm openai"}],
stream=True)
for chunk in response:
continue
## async
import asyncio
def async completion():
response = await acompletion(model="gpt-3.5-turbo", messages=[{ "role": "user", "content": "Hi 👋 - i'm openai"}],
stream=True)
async for chunk in response:
continue
asyncio.run(completion())
回调函数
如果你只想在特定事件(例如输入时)进行日志记录,你可以使用回调函数。
你可以设置自定义回调来触发:
litellm.input_callback
- 在调用 LLM API 之前跟踪输入/转换后的输入litellm.success_callback
- 在调用 LLM API 之后跟踪输入/输出litellm.failure_callback
- 跟踪 litellm 调用中的输入/输出 + 异常
定义自定义回调函数
创建一个接受特定参数的自定义回调函数
def custom_callback(
kwargs, # kwargs to completion
completion_response, # response from completion
start_time, end_time # start/end time
):
# Your custom code here
print("LITELLM: in custom callback function")
print("kwargs", kwargs)
print("completion_response", completion_response)
print("start_time", start_time)
print("end_time", end_time)
设置自定义回调函数
import litellm
litellm.success_callback = [custom_callback]
使用你的自定义回调函数
import litellm
from litellm import completion
# Assign the custom callback function
litellm.success_callback = [custom_callback]
response = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "Hi 👋 - i'm openai"
}
]
)
print(response)
异步回调函数
对于异步操作,我们建议使用 Custom Logger 类。
from litellm.integrations.custom_logger import CustomLogger
from litellm import acompletion
class MyCustomHandler(CustomLogger):
#### ASYNC ####
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Async Success")
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Async Failure")
import asyncio
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
def async completion():
response = await acompletion(model="gpt-3.5-turbo", messages=[{ "role": "user", "content": "Hi 👋 - i'm openai"}],
stream=True)
async for chunk in response:
continue
asyncio.run(completion())
函数
如果你只想传入一个异步函数用于日志记录。
LiteLLM 目前仅支持异步完成/嵌入调用的异步成功回调函数。
import asyncio, litellm
async def async_test_logging_fn(kwargs, completion_obj, start_time, end_time):
print(f"On Async Success!")
async def test_chat_openai():
try:
# litellm.set_verbose = True
litellm.success_callback = [async_test_logging_fn]
response = await litellm.acompletion(model="gpt-3.5-turbo",
messages=[{
"role": "user",
"content": "Hi 👋 - i'm openai"
}],
stream=True)
async for chunk in response:
continue
except Exception as e:
print(e)
pytest.fail(f"An error occurred - {str(e)}")
asyncio.run(test_chat_openai())
信息
我们正在积极努力将此扩展到其他事件类型。 如果你需要,请告诉我们!
kwargs 中有什么?
注意,我们将 kwargs 参数传递给自定义回调。
def custom_callback(
kwargs, # kwargs to completion
completion_response, # response from completion
start_time, end_time # start/end time
):
# Your custom code here
print("LITELLM: in custom callback function")
print("kwargs", kwargs)
print("completion_response", completion_response)
print("start_time", start_time)
print("end_time", end_time)
这是一个字典,包含所有模型调用详细信息(我们接收到的参数、发送到 http 端点的值、我们接收到的响应、错误情况下的堆栈跟踪等)。
所有这些都通过我们的 Logger 记录在 model_call_details 中。
以下是你可以在 kwargs 字典中预期到的具体内容
### DEFAULT PARAMS ###
"model": self.model,
"messages": self.messages,
"optional_params": self.optional_params, # model-specific params passed in
"litellm_params": self.litellm_params, # litellm-specific params passed in (e.g. metadata passed to completion call)
"start_time": self.start_time, # datetime object of when call was started
### PRE-API CALL PARAMS ### (check via kwargs["log_event_type"]="pre_api_call")
"input" = input # the exact prompt sent to the LLM API
"api_key" = api_key # the api key used for that LLM API
"additional_args" = additional_args # any additional details for that API call (e.g. contains optional params sent)
### POST-API CALL PARAMS ### (check via kwargs["log_event_type"]="post_api_call")
"original_response" = original_response # the original http response received (saved via response.text)
### ON-SUCCESS PARAMS ### (check via kwargs["log_event_type"]="successful_api_call")
"complete_streaming_response" = complete_streaming_response # the complete streamed response (only set if `completion(..stream=True)`)
"end_time" = end_time # datetime object of when call was completed
### ON-FAILURE PARAMS ### (check via kwargs["log_event_type"]="failed_api_call")
"exception" = exception # the Exception raised
"traceback_exception" = traceback_exception # the traceback generated via `traceback.format_exc()`
"end_time" = end_time # datetime object of when call was completed
缓存命中
缓存命中记录在成功事件中,表示为 kwarg["cache_hit"]
。
这是一个访问它的例子
import litellm
from litellm.integrations.custom_logger import CustomLogger
from litellm import completion, acompletion, Cache
class MyCustomHandler(CustomLogger):
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print(f"Value of Cache hit: {kwargs['cache_hit']"})
async def test_async_completion_azure_caching():
customHandler_caching = MyCustomHandler()
litellm.cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD'])
litellm.callbacks = [customHandler_caching]
unique_time = time.time()
response1 = await litellm.acompletion(model="azure/chatgpt-v-2",
messages=[{
"role": "user",
"content": f"Hi 👋 - i'm async azure {unique_time}"
}],
caching=True)
await asyncio.sleep(1)
print(f"customHandler_caching.states pre-cache hit: {customHandler_caching.states}")
response2 = await litellm.acompletion(model="azure/chatgpt-v-2",
messages=[{
"role": "user",
"content": f"Hi 👋 - i'm async azure {unique_time}"
}],
caching=True)
await asyncio.sleep(1) # success callbacks are done in parallel
print(f"customHandler_caching.states post-cache hit: {customHandler_caching.states}")
assert len(customHandler_caching.errors) == 0
assert len(customHandler_caching.states) == 4 # pre, post, success, success
获取完整的流式响应
LiteLLM 会在最终的流式块中将完整的流式响应作为 kwargs 的一部分传递给你的自定义回调函数。
# litellm.set_verbose = False
def custom_callback(
kwargs, # kwargs to completion
completion_response, # response from completion
start_time, end_time # start/end time
):
# print(f"streaming response: {completion_response}")
if "complete_streaming_response" in kwargs:
print(f"Complete Streaming Response: {kwargs['complete_streaming_response']}")
# Assign the custom callback function
litellm.success_callback = [custom_callback]
response = completion(model="claude-instant-1", messages=messages, stream=True)
for idx, chunk in enumerate(response):
pass
记录额外元数据
LiteLLM 在完成调用中接受一个元数据字典。你可以通过 completion(..., metadata={"key": "value"})
将额外元数据传递到你的完成调用中。
由于这是一个 litellm 特定参数,它可以通过 kwargs 访问["litellm_params"]
from litellm import completion
import os, litellm
## set ENV variables
os.environ["OPENAI_API_KEY"] = "your-api-key"
messages = [{ "content": "Hello, how are you?","role": "user"}]
def custom_callback(
kwargs, # kwargs to completion
completion_response, # response from completion
start_time, end_time # start/end time
):
print(kwargs["litellm_params"]["metadata"])
# Assign the custom callback function
litellm.success_callback = [custom_callback]
response = litellm.completion(model="gpt-3.5-turbo", messages=messages, metadata={"hello": "world"})
示例
自定义回调以跟踪流式 + 非流式成本
默认情况下,在成功时(同步 + 异步),响应成本可以通过日志对象中的 kwargs["response_cost"]
访问
# Step 1. Write your custom callback function
def track_cost_callback(
kwargs, # kwargs to completion
completion_response, # response from completion
start_time, end_time # start/end time
):
try:
response_cost = kwargs["response_cost"] # litellm calculates response cost for you
print("regular response_cost", response_cost)
except:
pass
# Step 2. Assign the custom callback function
litellm.success_callback = [track_cost_callback]
# Step 3. Make litellm.completion call
response = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "Hi 👋 - i'm openai"
}
]
)
print(response)
自定义回调以记录转换后的 LLMs 输入
def get_transformed_inputs(
kwargs,
):
params_to_model = kwargs["additional_args"]["complete_input_dict"]
print("params to model", params_to_model)
litellm.input_callback = [get_transformed_inputs]
def test_chat_openai():
try:
response = completion(model="claude-2",
messages=[{
"role": "user",
"content": "Hi 👋 - i'm openai"
}])
print(response)
except Exception as e:
print(e)
pass
输出
params to model {'model': 'claude-2', 'prompt': "\n\nHuman: Hi 👋 - i'm openai\n\nAssistant: ", 'max_tokens_to_sample': 256}
自定义回调以写入 Mixpanel
import mixpanel
import litellm
from litellm import completion
def custom_callback(
kwargs, # kwargs to completion
completion_response, # response from completion
start_time, end_time # start/end time
):
# Your custom code here
mixpanel.track("LLM Response", {"llm_response": completion_response})
# Assign the custom callback function
litellm.success_callback = [custom_callback]
response = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "Hi 👋 - i'm openai"
}
]
)
print(response)