API 使用笔记
API 选择
主要 API 提供商
| 提供商 | 模型 | 特点 | 适用场景 |
|---|---|---|---|
| OpenAI | GPT-4, GPT-3.5, DALL-E, Whisper | 功能全面,性能强 | 通用任务 |
| Anthropic | Claude 3 | 长上下文,安全性高 | 长文档分析 |
| Gemini | 多模态能力强 | 多模态任务 | |
| Cohere | Command | 企业级,安全性 | 企业应用 |
API 调用基础
以下是我使用 API 调用的基础知识:
基本结构
import openai
# 初始化
client = openai.OpenAI(api_key="your-api-key")
# 调用
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello"}]
)
# 获取结果
print(response.choices[0].message.content)
错误处理
import openai
from openai import OpenAIError
try:
response = client.chat.completions.create(...)
except OpenAIError as e:
if e.status_code == 429:
print("速率限制,请稍后重试")
elif e.status_code == 401:
print("API 密钥无效")
else:
print(f"错误: {e}")
速率限制处理
以下是我处理速率限制的方法:
指数退避重试
import time
import random
from functools import wraps
def retry_with_backoff(max_retries=5, base_delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if "rate limit" not in str(e).lower():
raise
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff()
def api_call():
return client.chat.completions.create(...)
速率限制监控
class RateLimiter:
def __init__(self, max_calls, time_window):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
def wait_if_needed(self):
now = time.time()
# 移除过期的调用记录
self.calls = [t for t in self.calls if now - t < self.time_window]
if len(self.calls) >= self.max_calls:
sleep_time = self.time_window - (now - self.calls[0])
if sleep_time > 0:
time.sleep(sleep_time)
self.calls.append(time.time())
# 使用
limiter = RateLimiter(max_calls=60, time_window=60) # 每分钟 60 次
def api_call():
limiter.wait_if_needed()
return client.chat.completions.create(...)
成本控制
以下是我控制成本的方法:
Token 使用监控
class TokenTracker:
def __init__(self):
self.total_input_tokens = 0
self.total_output_tokens = 0
self.calls = []
def track(self, response):
usage = response.usage
self.total_input_tokens += usage.prompt_tokens
self.total_output_tokens += usage.completion_tokens
self.calls.append({
"input_tokens": usage.prompt_tokens,
"output_tokens": usage.completion_tokens,
"total_tokens": usage.total_tokens
})
def get_stats(self):
return {
"total_input": self.total_input_tokens,
"total_output": self.total_output_tokens,
"total": self.total_input_tokens + self.total_output_tokens,
"calls": len(self.calls)
}
def estimate_cost(self, pricing):
input_cost = self.total_input_tokens * pricing["input"] / 1000
output_cost = self.total_output_tokens * pricing["output"] / 1000
return input_cost + output_cost
# 使用
tracker = TokenTracker()
response = client.chat.completions.create(...)
tracker.track(response)
pricing = {"input": 0.0015, "output": 0.002} # GPT-3.5-turbo
cost = tracker.estimate_cost(pricing)
print(f"总成本: ${cost:.4f}")
Prompt 优化
def optimize_prompt(prompt, max_tokens=100):
# 移除不必要的词
# 使用缩写
# 精简描述
return optimized_prompt
批量处理
以下是我进行批量处理的方法:
并发调用
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI(api_key="your-api-key")
async def async_chat(prompt):
response = await async_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def batch_chat(prompts, max_concurrent=10):
semaphore = asyncio.Semaphore(max_concurrent)
async def limited_chat(prompt):
async with semaphore:
return await async_chat(prompt)
tasks = [limited_chat(prompt) for prompt in prompts]
return await asyncio.gather(*tasks)
# 使用
prompts = ["prompt1", "prompt2", "prompt3"]
results = asyncio.run(batch_chat(prompts))
批量 API
def batch_api_calls(items, batch_size=10):
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
batch_results = process_batch(batch)
results.extend(batch_results)
time.sleep(1) # 避免速率限制
return results
流式响应
以下是我处理流式响应的方法:
处理流式输出
def stream_chat(prompt):
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_response += content
print() # 换行
return full_response
WebSocket 实时通信
from fastapi import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[...],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
await websocket.send_text(chunk.choices[0].delta.content)
配置管理
环境变量
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
model = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
配置文件
# config.yaml
api:
openai:
api_key: ${OPENAI_API_KEY}
model: gpt-3.5-turbo
temperature: 0.7
max_tokens: 500
anthropic:
api_key: ${ANTHROPIC_API_KEY}
model: claude-3-opus-20240229
import yaml
with open("config.yaml") as f:
config = yaml.safe_load(f)
client = OpenAI(api_key=config["api"]["openai"]["api_key"])
安全最佳实践
以下是我实践的安全最佳实践:
API 密钥管理
# ❌ 不要这样做
api_key = "sk-..."
# ✅ 使用环境变量
api_key = os.getenv("OPENAI_API_KEY")
# ✅ 使用密钥管理服务
import boto3
secrets = boto3.client("secretsmanager")
api_key = secrets.get_secret_value(SecretId="openai-key")["SecretString"]
输入验证
def validate_input(prompt, max_length=1000):
if not prompt or len(prompt) > max_length:
raise ValueError("Invalid prompt")
# 检查敏感内容
if contains_sensitive_info(prompt):
raise ValueError("Prompt contains sensitive information")
return True
输出过滤
def filter_output(text):
# 移除敏感信息
# 内容审核
# 格式验证
return filtered_text
监控和日志
以下是我进行监控和日志记录的方法:
日志记录
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def logged_api_call(prompt):
logger.info(f"API call: {prompt[:50]}...")
try:
response = client.chat.completions.create(...)
logger.info("API call successful")
return response
except Exception as e:
logger.error(f"API call failed: {e}")
raise
性能监控
import time
from functools import wraps
def monitor_performance(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
logger.info(f"{func.__name__} took {duration:.2f}s")
return result
return wrapper
最佳实践总结
- 错误处理: 实现完善的错误处理和重试机制
- 速率限制: 遵守 API 速率限制,实现退避策略
- 成本控制: 监控 token 使用和成本
- 安全性: 保护 API 密钥,验证输入输出
- 性能优化: 使用异步、批量处理提升效率
- 监控日志: 记录调用日志,监控性能
- 配置管理: 使用配置文件,便于管理