跳到主要内容

流式 + 异步

特性LiteLLM SDKLiteLLM 代理
流式传输从此处开始从此处开始
异步从此处开始从此处开始
异步流式传输从此处开始从此处开始

流式响应

LiteLLM 支持通过将 stream=True 作为参数传递给完成函数来流式传输模型响应

用法

from litellm import completion
messages = [{"role": "user", "content": "Hey, how's it going?"}]
response = completion(model="gpt-3.5-turbo", messages=messages, stream=True)
for part in response:
print(part.choices[0].delta.content or "")

辅助函数

LiteLLM 还提供了一个辅助函数,用于从分块列表中重建完整的流式响应。

from litellm import completion
messages = [{"role": "user", "content": "Hey, how's it going?"}]
response = completion(model="gpt-3.5-turbo", messages=messages, stream=True)

for chunk in response:
chunks.append(chunk)

print(litellm.stream_chunk_builder(chunks, messages=messages))

异步完成

LiteLLM 的异步完成。LiteLLM 提供了一个名为 acompletion 的异步版本的完成函数

用法

from litellm import acompletion
import asyncio

async def test_get_response():
user_message = "Hello, how are you?"
messages = [{"content": user_message, "role": "user"}]
response = await acompletion(model="gpt-3.5-turbo", messages=messages)
return response

response = asyncio.run(test_get_response())
print(response)

异步流式传输

我们在返回的流对象中实现了 __anext__() 函数。这使得可以对流对象进行异步迭代。

用法

以下是与 openai 一起使用的示例。

from litellm import acompletion
import asyncio, os, traceback

async def completion_call():
try:
print("test acompletion + streaming")
response = await acompletion(
model="gpt-3.5-turbo",
messages=[{"content": "Hello, how are you?", "role": "user"}],
stream=True
)
print(f"response: {response}")
async for chunk in response:
print(chunk)
except:
print(f"error occurred: {traceback.format_exc()}")
pass

asyncio.run(completion_call())

错误处理 - 无限循环

有时模型可能会进入无限循环,并不断重复相同的分块 - 例如 问题

使用以下方法跳出循环

litellm.REPEATED_STREAMING_CHUNK_LIMIT = 100 # # catch if model starts looping the same chunk while streaming. Uses high default to prevent false positives.

LiteLLM 为此提供了错误处理,它会检查一个分块是否重复出现 'n' 次(默认为 100)。如果超过此限制,它将引发 litellm.InternalServerError,以便进行重试逻辑。

import litellm 
import os

litellm.set_verbose = False
loop_amount = litellm.REPEATED_STREAMING_CHUNK_LIMIT + 1
chunks = [
litellm.ModelResponse(**{
"id": "chatcmpl-123",
"object": "chat.completion.chunk",
"created": 1694268190,
"model": "gpt-3.5-turbo-0125",
"system_fingerprint": "fp_44709d6fcb",
"choices": [
{"index": 0, "delta": {"content": "How are you?"}, "finish_reason": "stop"}
],
}, stream=True)
] * loop_amount
completion_stream = litellm.ModelResponseListIterator(model_responses=chunks)

response = litellm.CustomStreamWrapper(
completion_stream=completion_stream,
model="gpt-3.5-turbo",
custom_llm_provider="cached_response",
logging_obj=litellm.Logging(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey"}],
stream=True,
call_type="completion",
start_time=time.time(),
litellm_call_id="12345",
function_id="1245",
),
)

for chunk in response:
continue # expect to raise InternalServerError