基础调用
现代商业 LLM 大都提供 OpenAI 兼容的 API 接口。正确的调用方式需要考虑网络环境、超时设置和错误处理。
from openai import OpenAI, APIError, RateLimitError, APITimeoutError
client = OpenAI(
base_url="https://api.openai.com/v1",
api_key="sk-...",
timeout=30.0, # 请求超时
max_retries=0 # 手动控制重试
)
流式输出(Streaming)
流式输出能让用户看到模型逐 token 生成内容,显著提升交互体验。
def stream_response(messages):
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
stream=True,
stream_options={"include_usage": True}
)
collected_content = ""
for chunk in response:
delta = chunk.choices[0].delta if chunk.choices else None
if delta and delta.content:
content = delta.content
collected_content += content
print(content, end="", flush=True)
# 最后一个 chunk 带有用量信息
if hasattr(chunk, "usage") and chunk.usage:
print(f"\n\nToken 用量:{chunk.usage}")
return collected_content
流式场景下的缓存策略
import hashlib
def stream_with_cache(messages):
cache_key = hashlib.md5(str(messages).encode()).hexdigest()
cached = cache.get(cache_key)
if cached:
for token in cached:
yield token
return
collected = []
response = client.chat.completions.create(
model="gpt-4o", messages=messages, stream=True
)
for chunk in response:
content = chunk.choices[0].delta.content or ""
collected.append(content)
yield content
cache.set(cache_key, collected, ttl=3600)
重试策略
网络问题和 API 限流不可避免,正确的重试策略至关重要。
import time
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=60),
retry=retry_if_exception_type((RateLimitError, APITimeoutError, APIError)),
before_sleep=lambda retry_state: print(
f"第 {retry_state.attempt_number} 次重试,等待 {retry_state.next_action.sleep} 秒"
)
)
def call_llm(messages):
return client.chat.completions.create(
model="gpt-4o",
messages=messages
)
不同错误的处理策略:
| 错误类型 | 处理方式 | 等待时间 |
|---|---|---|
| 429 RateLimit | 指数退避 + jitter | 从 2 秒开始 |
| 503 Service Unavailable | 短等待重试 | 从 1 秒开始 |
| 401 Unauthorized | 不重试,直接报错 | - |
| 400 Bad Request | 不重试,检查参数 | - |
| 500 Internal | 退避重试,最多 3 次 | 从 5 秒开始 |
上下文管理
LLM 的上下文窗口有限,且输入 token 越多成本越高。合理的上下文管理策略可以平衡质量与成本。
Token 计算
import tiktoken
def count_tokens(text, model="gpt-4o"):
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
def estimate_cost(input_tokens, output_tokens, model="gpt-4o"):
rates = {
"gpt-4o": {"input": 2.50, "output": 10.00}, # 每百万 token
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
}
r = rates.get(model, rates["gpt-4o"])
input_cost = input_tokens / 1_000_000 * r["input"]
output_cost = output_tokens / 1_000_000 * r["output"]
return input_cost + output_cost
滑动窗口压缩
def compress_history(messages, max_tokens=8000):
"""超出最大 token 限制时,从最早的对话开始丢弃"""
total = sum(count_tokens(m["content"]) for m in messages)
while total > max_tokens and len(messages) > 2:
# 保留 system 和最近的对话
removed = messages.pop(1) # 跳过 system prompt
total -= count_tokens(removed["content"])
return messages
摘要压缩
对于长对话历史,可以用 LLM 自动摘要旧内容。
def summarize_history(old_messages):
summary_prompt = "请用 100 字以内总结以下对话内容:"
text = "\n".join(m["content"] for m in old_messages)
summary = call_llm([{"role": "user", "content": summary_prompt + "\n" + text}])
return [{"role": "system", "content": f"对话历史摘要:{summary}"}]
成本控制
模型分级路由
根据任务复杂度选择不同模型,可以大幅降低成本。
def route_request(task_type, messages):
if task_type == "simple":
model = "gpt-4o-mini" # 简单任务用 mini
elif task_type == "reasoning":
model = "gpt-4o" # 推理任务用标准版
elif task_type == "creative":
model = "gpt-4o" # 创意任务
else:
model = "gpt-4o-mini"
return client.chat.completions.create(model=model, messages=messages)
Batch API
对于非实时场景,使用 Batch API 可以获得 50% 的价格折扣。
def submit_batch_jobs(jobs):
"""批量提交非实时请求"""
batch_file = []
for job_id, messages in jobs:
batch_file.append({
"custom_id": job_id,
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-4o-mini",
"messages": messages,
"max_tokens": 500
}
})
# 上传并提交批次
file = client.files.create(file=batch_file, purpose="batch")
batch = client.batches.create(
input_file_id=file.id,
endpoint="/v1/chat/completions",
completion_window="24h" # 价格减半
)
return batch.id
输出长度控制
合理限制 max_tokens 可以避免模型过度生成。
# 根据需要设定精确的 token 上限
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=token_budget,
temperature=0.7
)
监控与日志
import logging
import time
logger = logging.getLogger("llm_api")
def monitored_call(messages, **kwargs):
start = time.time()
response = client.chat.completions.create(messages=messages, **kwargs)
elapsed = time.time() - start
logger.info({
"model": response.model,
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens,
"latency_ms": round(elapsed * 1000),
"cost": estimate_cost(
response.usage.prompt_tokens,
response.usage.completion_tokens,
response.model
)
})
return response