星辰大海 AI星辰大海 AI
首页
  • ChatGPT
  • Claude
  • Midjourney
  • Stable Diffusion
  • 大语言模型
  • 图像生成模型
  • 语音模型
Demo 示例
开发笔记
GitHub
首页
  • ChatGPT
  • Claude
  • Midjourney
  • Stable Diffusion
  • 大语言模型
  • 图像生成模型
  • 语音模型
Demo 示例
开发笔记
GitHub
  • Demo 示例

    • Demo 示例
    • 聊天机器人 Demo
    • 图像生成 Demo
    • API 集成示例

API 集成示例

这里是我集成各种 AI API 的示例代码。

支持的 API

  • OpenAI API (ChatGPT, DALL-E, Whisper)
  • Anthropic API (Claude)
  • Stability AI API
  • Hugging Face API

OpenAI API 集成

ChatGPT API

import openai

openai.api_key = "your-api-key"

def chat_completion(prompt):
    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": "你是一个有用的助手。"},
            {"role": "user", "content": prompt}
        ],
        temperature=0.7,
        max_tokens=500
    )
    return response.choices[0].message.content

# 使用
result = chat_completion("解释一下什么是机器学习")
print(result)

DALL-E API

import openai
from PIL import Image
import requests
from io import BytesIO

openai.api_key = "your-api-key"

def generate_image(prompt, size="1024x1024"):
    response = openai.Image.create(
        prompt=prompt,
        n=1,
        size=size
    )
    image_url = response['data'][0]['url']
    
    # 下载图像
    img_response = requests.get(image_url)
    img = Image.open(BytesIO(img_response.content))
    return img

# 使用
image = generate_image("a cat sitting on a windowsill")
image.save("generated_image.png")

Whisper API

import openai

openai.api_key = "your-api-key"

def transcribe_audio(audio_file_path):
    with open(audio_file_path, "rb") as audio_file:
        transcript = openai.Audio.transcribe("whisper-1", audio_file)
    return transcript["text"]

# 使用
text = transcribe_audio("audio.mp3")
print(text)

Anthropic API 集成

import anthropic

client = anthropic.Anthropic(api_key="your-api-key")

def claude_chat(prompt):
    message = client.messages.create(
        model="claude-3-opus-20240229",
        max_tokens=1024,
        messages=[
            {"role": "user", "content": prompt}
        ]
    )
    return message.content[0].text

# 使用
result = claude_chat("解释一下量子计算")
print(result)

统一接口封装

class AIAPIWrapper:
    def __init__(self, provider="openai", api_key=None):
        self.provider = provider
        if provider == "openai":
            import openai
            openai.api_key = api_key
            self.client = openai
        elif provider == "anthropic":
            import anthropic
            self.client = anthropic.Anthropic(api_key=api_key)
    
    def chat(self, prompt, model=None):
        if self.provider == "openai":
            model = model or "gpt-3.5-turbo"
            response = self.client.ChatCompletion.create(
                model=model,
                messages=[{"role": "user", "content": prompt}]
            )
            return response.choices[0].message.content
        elif self.provider == "anthropic":
            model = model or "claude-3-opus-20240229"
            message = self.client.messages.create(
                model=model,
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}]
            )
            return message.content[0].text

# 使用
openai_wrapper = AIAPIWrapper("openai", api_key="your-key")
claude_wrapper = AIAPIWrapper("anthropic", api_key="your-key")

result1 = openai_wrapper.chat("Hello")
result2 = claude_wrapper.chat("Hello")

错误处理和重试

import time
import random
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                    time.sleep(delay)
                    print(f"重试 {attempt + 1}/{max_retries}...")
        return wrapper
    return decorator

@retry_with_backoff(max_retries=3)
def chat_with_retry(prompt):
    return chat_completion(prompt)

异步调用

import asyncio
import openai
from openai import AsyncOpenAI

client = AsyncOpenAI(api_key="your-api-key")

async def async_chat(prompt):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

# 批量异步调用
async def batch_chat(prompts):
    tasks = [async_chat(prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks)
    return results

# 使用
prompts = ["什么是 AI", "什么是 ML", "什么是 DL"]
results = asyncio.run(batch_chat(prompts))

流式响应

def stream_chat(prompt):
    stream = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.get("content"):
            yield chunk.choices[0].delta["content"]

# 使用
for chunk in stream_chat("写一个 Python 函数"):
    print(chunk, end="", flush=True)

成本监控

class CostTracker:
    def __init__(self):
        self.total_tokens = 0
        self.total_cost = 0.0
        self.pricing = {
            "gpt-3.5-turbo": {"input": 0.0015/1000, "output": 0.002/1000},
            "gpt-4": {"input": 0.03/1000, "output": 0.06/1000},
        }
    
    def track(self, model, input_tokens, output_tokens):
        self.total_tokens += input_tokens + output_tokens
        cost = (
            input_tokens * self.pricing[model]["input"] +
            output_tokens * self.pricing[model]["output"]
        )
        self.total_cost += cost
        return cost
    
    def get_stats(self):
        return {
            "total_tokens": self.total_tokens,
            "total_cost": self.total_cost
        }

# 使用
tracker = CostTracker()
cost = tracker.track("gpt-3.5-turbo", 100, 50)
print(f"本次调用成本: ${cost:.4f}")
print(f"总统计: {tracker.get_stats()}")

配置管理

import os
from dataclasses import dataclass

@dataclass
class APIConfig:
    provider: str
    api_key: str
    model: str = None
    temperature: float = 0.7
    max_tokens: int = 500
    
    @classmethod
    def from_env(cls):
        return cls(
            provider=os.getenv("AI_PROVIDER", "openai"),
            api_key=os.getenv("AI_API_KEY"),
            model=os.getenv("AI_MODEL"),
            temperature=float(os.getenv("AI_TEMPERATURE", "0.7")),
            max_tokens=int(os.getenv("AI_MAX_TOKENS", "500"))
        )

# 使用环境变量
config = APIConfig.from_env()

完整示例

import openai
import os
from typing import List, Dict

class AIService:
    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.getenv("OPENAI_API_KEY")
        openai.api_key = self.api_key
    
    def chat(
        self,
        prompt: str,
        system_prompt: str = None,
        model: str = "gpt-3.5-turbo",
        temperature: float = 0.7,
        max_tokens: int = 500
    ) -> str:
        messages = []
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        messages.append({"role": "user", "content": prompt})
        
        response = openai.ChatCompletion.create(
            model=model,
            messages=messages,
            temperature=temperature,
            max_tokens=max_tokens
        )
        
        return response.choices[0].message.content
    
    def batch_chat(self, prompts: List[str], **kwargs) -> List[str]:
        return [self.chat(prompt, **kwargs) for prompt in prompts]

# 使用
service = AIService()
result = service.chat("解释一下 Python")
print(result)

运行说明

安装依赖

pip install openai anthropic python-dotenv

设置环境变量

export OPENAI_API_KEY="your-key"
export ANTHROPIC_API_KEY="your-key"

运行示例

python api_integration.py

最佳实践

  1. API 密钥安全: 使用环境变量,不要硬编码
  2. 错误处理: 实现重试机制和错误处理
  3. 成本控制: 监控 token 使用和成本
  4. 速率限制: 遵守 API 速率限制
  5. 日志记录: 记录 API 调用日志用于调试
在 GitHub 上编辑此页
Prev
图像生成 Demo