F FisherHub Docs

API 调用最佳实践

基础调用

现代商业 LLM 大都提供 OpenAI 兼容的 API 接口。正确的调用方式需要考虑网络环境、超时设置和错误处理。

from openai import OpenAI, APIError, RateLimitError, APITimeoutError

client = OpenAI(
    base_url="https://api.openai.com/v1",
    api_key="sk-...",
    timeout=30.0,       # 请求超时
    max_retries=0       # 手动控制重试
)

流式输出(Streaming)

流式输出能让用户看到模型逐 token 生成内容,显著提升交互体验。

def stream_response(messages):
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        stream=True,
        stream_options={"include_usage": True}
    )

    collected_content = ""
    for chunk in response:
        delta = chunk.choices[0].delta if chunk.choices else None
        if delta and delta.content:
            content = delta.content
            collected_content += content
            print(content, end="", flush=True)

        # 最后一个 chunk 带有用量信息
        if hasattr(chunk, "usage") and chunk.usage:
            print(f"\n\nToken 用量:{chunk.usage}")

    return collected_content

流式场景下的缓存策略

import hashlib

def stream_with_cache(messages):
    cache_key = hashlib.md5(str(messages).encode()).hexdigest()
    cached = cache.get(cache_key)
    if cached:
        for token in cached:
            yield token
        return

    collected = []
    response = client.chat.completions.create(
        model="gpt-4o", messages=messages, stream=True
    )
    for chunk in response:
        content = chunk.choices[0].delta.content or ""
        collected.append(content)
        yield content

    cache.set(cache_key, collected, ttl=3600)

重试策略

网络问题和 API 限流不可避免,正确的重试策略至关重要。

import time
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=60),
    retry=retry_if_exception_type((RateLimitError, APITimeoutError, APIError)),
    before_sleep=lambda retry_state: print(
        f"第 {retry_state.attempt_number} 次重试,等待 {retry_state.next_action.sleep} 秒"
    )
)
def call_llm(messages):
    return client.chat.completions.create(
        model="gpt-4o",
        messages=messages
    )

不同错误的处理策略:

错误类型处理方式等待时间
429 RateLimit指数退避 + jitter从 2 秒开始
503 Service Unavailable短等待重试从 1 秒开始
401 Unauthorized不重试,直接报错-
400 Bad Request不重试,检查参数-
500 Internal退避重试,最多 3 次从 5 秒开始

上下文管理

LLM 的上下文窗口有限,且输入 token 越多成本越高。合理的上下文管理策略可以平衡质量与成本。

Token 计算

import tiktoken

def count_tokens(text, model="gpt-4o"):
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

def estimate_cost(input_tokens, output_tokens, model="gpt-4o"):
    rates = {
        "gpt-4o": {"input": 2.50, "output": 10.00},  # 每百万 token
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    }
    r = rates.get(model, rates["gpt-4o"])
    input_cost = input_tokens / 1_000_000 * r["input"]
    output_cost = output_tokens / 1_000_000 * r["output"]
    return input_cost + output_cost

滑动窗口压缩

def compress_history(messages, max_tokens=8000):
    """超出最大 token 限制时,从最早的对话开始丢弃"""
    total = sum(count_tokens(m["content"]) for m in messages)
    while total > max_tokens and len(messages) > 2:
        # 保留 system 和最近的对话
        removed = messages.pop(1)  # 跳过 system prompt
        total -= count_tokens(removed["content"])
    return messages

摘要压缩

对于长对话历史,可以用 LLM 自动摘要旧内容。

def summarize_history(old_messages):
    summary_prompt = "请用 100 字以内总结以下对话内容:"
    text = "\n".join(m["content"] for m in old_messages)
    summary = call_llm([{"role": "user", "content": summary_prompt + "\n" + text}])
    return [{"role": "system", "content": f"对话历史摘要:{summary}"}]

成本控制

模型分级路由

根据任务复杂度选择不同模型,可以大幅降低成本。

def route_request(task_type, messages):
    if task_type == "simple":
        model = "gpt-4o-mini"  # 简单任务用 mini
    elif task_type == "reasoning":
        model = "gpt-4o"       # 推理任务用标准版
    elif task_type == "creative":
        model = "gpt-4o"       # 创意任务
    else:
        model = "gpt-4o-mini"

    return client.chat.completions.create(model=model, messages=messages)

Batch API

对于非实时场景,使用 Batch API 可以获得 50% 的价格折扣。

def submit_batch_jobs(jobs):
    """批量提交非实时请求"""
    batch_file = []
    for job_id, messages in jobs:
        batch_file.append({
            "custom_id": job_id,
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4o-mini",
                "messages": messages,
                "max_tokens": 500
            }
        })

    # 上传并提交批次
    file = client.files.create(file=batch_file, purpose="batch")
    batch = client.batches.create(
        input_file_id=file.id,
        endpoint="/v1/chat/completions",
        completion_window="24h"  # 价格减半
    )
    return batch.id

输出长度控制

合理限制 max_tokens 可以避免模型过度生成。

# 根据需要设定精确的 token 上限
response = client.chat.completions.create(
    model="gpt-4o",
    messages=messages,
    max_tokens=token_budget,
    temperature=0.7
)

监控与日志

import logging
import time

logger = logging.getLogger("llm_api")

def monitored_call(messages, **kwargs):
    start = time.time()
    response = client.chat.completions.create(messages=messages, **kwargs)
    elapsed = time.time() - start

    logger.info({
        "model": response.model,
        "input_tokens": response.usage.prompt_tokens,
        "output_tokens": response.usage.completion_tokens,
        "latency_ms": round(elapsed * 1000),
        "cost": estimate_cost(
            response.usage.prompt_tokens,
            response.usage.completion_tokens,
            response.model
        )
    })
    return response