星辰大海 AI星辰大海 AI
首页
  • ChatGPT
  • Claude
  • Midjourney
  • Stable Diffusion
  • 大语言模型
  • 图像生成模型
  • 语音模型
Demo 示例
开发笔记
GitHub
首页
  • ChatGPT
  • Claude
  • Midjourney
  • Stable Diffusion
  • 大语言模型
  • 图像生成模型
  • 语音模型
Demo 示例
开发笔记
GitHub
  • 开发笔记

    • 开发笔记
    • Prompt 工程笔记
    • 模型微调笔记
    • API 使用笔记

API 使用笔记

API 选择

主要 API 提供商

提供商模型特点适用场景
OpenAIGPT-4, GPT-3.5, DALL-E, Whisper功能全面,性能强通用任务
AnthropicClaude 3长上下文,安全性高长文档分析
GoogleGemini多模态能力强多模态任务
CohereCommand企业级,安全性企业应用

API 调用基础

以下是我使用 API 调用的基础知识:

基本结构

import openai

# 初始化
client = openai.OpenAI(api_key="your-api-key")

# 调用
response = client.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=[{"role": "user", "content": "Hello"}]
)

# 获取结果
print(response.choices[0].message.content)

错误处理

import openai
from openai import OpenAIError

try:
    response = client.chat.completions.create(...)
except OpenAIError as e:
    if e.status_code == 429:
        print("速率限制,请稍后重试")
    elif e.status_code == 401:
        print("API 密钥无效")
    else:
        print(f"错误: {e}")

速率限制处理

以下是我处理速率限制的方法:

指数退避重试

import time
import random
from functools import wraps

def retry_with_backoff(max_retries=5, base_delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if "rate limit" not in str(e).lower():
                        raise
                    if attempt == max_retries - 1:
                        raise
                    delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                    time.sleep(delay)
        return wrapper
    return decorator

@retry_with_backoff()
def api_call():
    return client.chat.completions.create(...)

速率限制监控

class RateLimiter:
    def __init__(self, max_calls, time_window):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = []
    
    def wait_if_needed(self):
        now = time.time()
        # 移除过期的调用记录
        self.calls = [t for t in self.calls if now - t < self.time_window]
        
        if len(self.calls) >= self.max_calls:
            sleep_time = self.time_window - (now - self.calls[0])
            if sleep_time > 0:
                time.sleep(sleep_time)
        
        self.calls.append(time.time())

# 使用
limiter = RateLimiter(max_calls=60, time_window=60)  # 每分钟 60 次

def api_call():
    limiter.wait_if_needed()
    return client.chat.completions.create(...)

成本控制

以下是我控制成本的方法:

Token 使用监控

class TokenTracker:
    def __init__(self):
        self.total_input_tokens = 0
        self.total_output_tokens = 0
        self.calls = []
    
    def track(self, response):
        usage = response.usage
        self.total_input_tokens += usage.prompt_tokens
        self.total_output_tokens += usage.completion_tokens
        
        self.calls.append({
            "input_tokens": usage.prompt_tokens,
            "output_tokens": usage.completion_tokens,
            "total_tokens": usage.total_tokens
        })
    
    def get_stats(self):
        return {
            "total_input": self.total_input_tokens,
            "total_output": self.total_output_tokens,
            "total": self.total_input_tokens + self.total_output_tokens,
            "calls": len(self.calls)
        }
    
    def estimate_cost(self, pricing):
        input_cost = self.total_input_tokens * pricing["input"] / 1000
        output_cost = self.total_output_tokens * pricing["output"] / 1000
        return input_cost + output_cost

# 使用
tracker = TokenTracker()
response = client.chat.completions.create(...)
tracker.track(response)

pricing = {"input": 0.0015, "output": 0.002}  # GPT-3.5-turbo
cost = tracker.estimate_cost(pricing)
print(f"总成本: ${cost:.4f}")

Prompt 优化

def optimize_prompt(prompt, max_tokens=100):
    # 移除不必要的词
    # 使用缩写
    # 精简描述
    return optimized_prompt

批量处理

以下是我进行批量处理的方法:

并发调用

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI(api_key="your-api-key")

async def async_chat(prompt):
    response = await async_client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def batch_chat(prompts, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def limited_chat(prompt):
        async with semaphore:
            return await async_chat(prompt)
    
    tasks = [limited_chat(prompt) for prompt in prompts]
    return await asyncio.gather(*tasks)

# 使用
prompts = ["prompt1", "prompt2", "prompt3"]
results = asyncio.run(batch_chat(prompts))

批量 API

def batch_api_calls(items, batch_size=10):
    results = []
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        batch_results = process_batch(batch)
        results.extend(batch_results)
        time.sleep(1)  # 避免速率限制
    return results

流式响应

以下是我处理流式响应的方法:

处理流式输出

def stream_chat(prompt):
    stream = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    full_response = ""
    for chunk in stream:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            full_response += content
    print()  # 换行
    return full_response

WebSocket 实时通信

from fastapi import WebSocket

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    stream = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[...],
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            await websocket.send_text(chunk.choices[0].delta.content)

配置管理

环境变量

import os
from dotenv import load_dotenv

load_dotenv()

api_key = os.getenv("OPENAI_API_KEY")
model = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")

配置文件

# config.yaml
api:
  openai:
    api_key: ${OPENAI_API_KEY}
    model: gpt-3.5-turbo
    temperature: 0.7
    max_tokens: 500
  anthropic:
    api_key: ${ANTHROPIC_API_KEY}
    model: claude-3-opus-20240229
import yaml

with open("config.yaml") as f:
    config = yaml.safe_load(f)

client = OpenAI(api_key=config["api"]["openai"]["api_key"])

安全最佳实践

以下是我实践的安全最佳实践:

API 密钥管理

# ❌ 不要这样做
api_key = "sk-..."

# ✅ 使用环境变量
api_key = os.getenv("OPENAI_API_KEY")

# ✅ 使用密钥管理服务
import boto3
secrets = boto3.client("secretsmanager")
api_key = secrets.get_secret_value(SecretId="openai-key")["SecretString"]

输入验证

def validate_input(prompt, max_length=1000):
    if not prompt or len(prompt) > max_length:
        raise ValueError("Invalid prompt")
    # 检查敏感内容
    if contains_sensitive_info(prompt):
        raise ValueError("Prompt contains sensitive information")
    return True

输出过滤

def filter_output(text):
    # 移除敏感信息
    # 内容审核
    # 格式验证
    return filtered_text

监控和日志

以下是我进行监控和日志记录的方法:

日志记录

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def logged_api_call(prompt):
    logger.info(f"API call: {prompt[:50]}...")
    try:
        response = client.chat.completions.create(...)
        logger.info("API call successful")
        return response
    except Exception as e:
        logger.error(f"API call failed: {e}")
        raise

性能监控

import time
from functools import wraps

def monitor_performance(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start
        logger.info(f"{func.__name__} took {duration:.2f}s")
        return result
    return wrapper

最佳实践总结

  1. 错误处理: 实现完善的错误处理和重试机制
  2. 速率限制: 遵守 API 速率限制,实现退避策略
  3. 成本控制: 监控 token 使用和成本
  4. 安全性: 保护 API 密钥,验证输入输出
  5. 性能优化: 使用异步、批量处理提升效率
  6. 监控日志: 记录调用日志,监控性能
  7. 配置管理: 使用配置文件,便于管理

相关资源

  • OpenAI API 文档
  • Anthropic API 文档
  • API 最佳实践
在 GitHub 上编辑此页
Prev
模型微调笔记