¥
立即购买

库与API示例

473 浏览
46 试用
10 购买
Nov 24, 2025更新

指导用户在指定编程语言中使用库或API完成特定任务,提供分步骤示例代码和可选说明,帮助开发者快速实现功能并掌握调用方法。

下面给出一份从零到一的示例,展示如何用 Python 3.11 + requests 分页拉取用户列表,合并所有页结果,提取所需字段,按 id 去重并导出为 UTF-8 CSV。同时演示 Bearer 鉴权、全局 Session 复用、请求超时、指数退避重试(含 429/5xx 处理)与自定义 User-Agent。

  1. 安装与准备
  • 安装依赖 pip install requests
  • 设置令牌(以环境变量为例)

    macOS/Linux:

    export EXAMPLE_API_KEY="demo-token"

    Windows PowerShell:

    $env:EXAMPLE_API_KEY="demo-token"

    Windows CMD:

    set EXAMPLE_API_KEY=demo-token
  1. 目录结构与文件
  • 在项目根目录新建文件: main.py
  1. 编写完整示例代码(可直接运行,需将 BASE_URL 换成真实 API) 注意:api.example.com 为占位示例,实际运行请替换为你的真实 REST API。 代码示例: import os import csv import time import random import requests

BASE_URL = "https://api.example.com/v1/users" # 替换为你的真实用户列表 API API_KEY = os.environ.get("EXAMPLE_API_KEY", "demo-token")

def get_with_retry(session, url, params=None, retries=5, base_delay=0.5, timeout=10): """ 带指数退避与抖动的 GET 请求。 - 对 429/5xx 进行重试,尊重 Retry-After(秒)头。 - 其他网络错误也会按指数退避重试。 """ for attempt in range(retries): try: resp = session.get(url, params=params, timeout=timeout) # 针对 429(限流)优先处理 if resp.status_code == 429: if attempt == retries - 1: resp.raise_for_status() retry_after = resp.headers.get("Retry-After") if retry_after and retry_after.isdigit(): delay = float(retry_after) else: delay = base_delay * (2 ** attempt) delay += random.uniform(0, 0.25) # 抖动 time.sleep(delay) continue

        # 对 5xx 进行重试
        if 500 <= resp.status_code < 600:
            if attempt == retries - 1:
                resp.raise_for_status()
            delay = base_delay * (2 ** attempt) + random.uniform(0, 0.25)
            time.sleep(delay)
            continue

        resp.raise_for_status()
        return resp
    except requests.RequestException:
        if attempt == retries - 1:
            raise
        delay = base_delay * (2 ** attempt) + random.uniform(0, 0.25)
        time.sleep(delay)

def get_page(session, page, limit=50): """ 拉取单页。假定分页参数为 page/limit,返回 JSON。 """ params = {"page": page, "limit": limit} resp = get_with_retry(session, BASE_URL, params=params) return resp.json()

def main(): # 全局会话,复用连接 session = requests.Session() session.headers.update({ "Authorization": f"Bearer {API_KEY}", "Accept": "application/json", "User-Agent": "example-client/1.0 (+https://your-domain.example)" })

all_users = []
page = 1
per_page = 50

while True:
    data = get_page(session, page, per_page)
    # 约定服务端返回 { "items": [ ... ] };若你的 API 字段名不同,请相应调整
    items = data.get("items", [])
    if not items:
        break

    for u in items:
        all_users.append({
            "id": u.get("id"),
            "name": u.get("name"),
            "email": u.get("email", ""),
            "created_at": u.get("created_at", "")
        })

    page += 1
    # 轻微限速,避免触发限流。若服务端限流严格可适当增加间隔
    time.sleep(0.2)

# 按 id 去重:保留首个出现的记录
seen = set()
deduped = []
for u in all_users:
    uid = u.get("id")
    if uid is None:
        continue
    if uid not in seen:
        seen.add(uid)
        deduped.append(u)

# 导出为 UTF-8 CSV(含表头)
out_path = "users.csv"
with open(out_path, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["id", "name", "email", "created_at"])
    writer.writeheader()
    writer.writerows(deduped)

print(f"Wrote {len(deduped)} users to {out_path}")

if name == "main": main()

  1. 运行与验证
  • 执行 python main.py
  • 预期输出 控制台显示导出数量,并在当前目录生成 users.csv
  1. 结果检查
  • 打开 users.csv 确认列头为 id,name,email,created_at 且编码为 UTF-8
  • 需要进一步分析可在电子表格或 Pandas 中加载

可选说明与最佳实践

  • 连接复用:requests.Session() 在进程内全程复用,减少 TCP/TLS 握手成本。
  • 超时与退避:根据 API SLA 调整 timeout、retries、base_delay;强限流下可增大页面间 sleep 或按 Retry-After 精确等待。
  • 鉴权与密钥:生产环境使用安全的凭据管理(如环境变量管理器、密钥库),避免硬编码令牌。
  • 大数据量:数据量很大时(>10 万),建议边拉取边写出(增量写 CSV),避免一次性把所有数据放内存;或启用分页游标/服务端导出。
  • 字段与顺序:跨系统传输前固定 CSV 字段顺序与编码(UTF-8)并在文档中明确说明。
  • 分页协议差异:若你的 API 使用游标分页(如 next_cursor/next_url),将 while 循环改为基于游标/链接的迭代并停止于无 next 的条件。

下面给出一套可直接复制运行的示例,使用 Node.js 18 + axios + form-data 在服务端以 multipart/form-data 上传本地图片,包含 Bearer 鉴权、30 秒超时、流式进度统计、最大体积控制与指数退避重试(3 次),成功后打印文件 ID。

  1. 初始化与安装
  • 初始化项目 npm init -y
  • 安装依赖 npm i axios form-data
  1. 新建 upload.js 并编写完整代码 将以下完整代码粘贴到 upload.js:

const fs = require('fs'); const path = require('path'); const axios = require('axios'); const FormData = require('form-data');

const FILE_PATH = path.resolve(__dirname, 'sample.png'); const API_URL = 'https://api.example.com/v1/files'; const TOKEN = process.env.EXAMPLE_TOKEN || 'demo-token';

// 最大体积控制(可通过环境变量覆盖),默认 20MB const MAX_SIZE_MB = Number(process.env.MAX_SIZE_MB || 20);

function assertFileOK() { if (!fs.existsSync(FILE_PATH)) { throw new Error(File not found: ${FILE_PATH}); } const stat = fs.statSync(FILE_PATH); const maxBytes = MAX_SIZE_MB * 1024 * 1024; if (stat.size > maxBytes) { throw new Error(File too large: ${(stat.size / 1024 / 1024).toFixed(2)}MB > ${MAX_SIZE_MB}MB); } return stat; }

function shouldRetry(err) { // 网络类错误 if (err && err.code && ['ECONNRESET', 'ETIMEDOUT', 'EAI_AGAIN', 'ENOTFOUND', 'ECONNREFUSED', 'EPIPE'].includes(err.code)) { return true; } // HTTP 状态码:5xx 或 429 重试 const status = err && err.response && err.response.status; return status >= 500 || status === 429; }

async function uploadOnce() { const stat = assertFileOK();

const stream = fs.createReadStream(FILE_PATH); let loaded = 0; stream.on('data', chunk => { loaded += chunk.length; const percent = stat.size ? ((loaded / stat.size) * 100).toFixed(1) : '...'; process.stdout.write(\rUploading ${percent}%); });

const form = new FormData(); form.append('file', stream, { filename: path.basename(FILE_PATH) }); form.append('meta', JSON.stringify({ folder: 'images', tags: ['guide', 'api'] }));

// 尝试计算 Content-Length(部分服务端要求) const contentLength = await new Promise(resolve => { form.getLength((err, length) => { if (err) return resolve(undefined); resolve(length); }); });

const headers = { ...form.getHeaders(), 'Authorization': Bearer ${TOKEN}, }; if (contentLength !== undefined) headers['Content-Length'] = contentLength;

const res = await axios.post(API_URL, form, { headers, maxBodyLength: Infinity, // 避免 axios 在大文件时提前拦截 timeout: 30000, // 30 秒超时 validateStatus: s => s >= 200 && s < 300 });

console.log('\nFile ID:', res.data && res.data.id); return res.data; }

async function uploadWithRetry(max = 3) { let lastErr; for (let i = 0; i < max; i++) { try { const data = await uploadOnce(); return data; } catch (err) { lastErr = err; const canRetry = shouldRetry(err); const wait = 500 * Math.pow(2, i) + Math.floor(Math.random() * 250); // 指数退避 + 抖动 const statusMsg = err && err.response ? (HTTP ${err.response.status}) : ''; console.warn(\nAttempt ${i + 1} failed${statusMsg}${canRetry ? , retrying in ${wait}ms... : ', not retryable.'}); if (!canRetry || i === max - 1) break; await new Promise(r => setTimeout(r, wait)); } } if (lastErr && lastErr.response && lastErr.response.data) { console.error('Server response:', lastErr.response.data); } else { console.error(lastErr && lastErr.message ? lastErr.message : lastErr); } throw new Error('All retries failed'); }

uploadWithRetry(3) .then(() => console.log('Done')) .catch(() => process.exit(1));

  1. 放置待上传文件
  • 将一张本地图片命名为 sample.png 放在与 upload.js 同级目录。
  1. 运行与验证
  • 执行 node upload.js
  • 观察控制台实时上传进度,成功后应打印 File ID 和 Done。
  1. 结果检查(可选的状态验证示例)
  • 如果接口支持查询文件详情(以文件 ID 为例),可在上传成功后追加验证:
    • 示例(添加在 then 分支里): const fileId = data.id; const check = await axios.get(${API_URL}/${fileId}, { headers: { Authorization: Bearer ${TOKEN} }, timeout: 10000 }); console.log('File status:', check.data.status, 'meta:', check.data.meta);

补充说明

  • 服务端通常限制单文件大小与速率:本示例在上传前做了本地体积校验(MAX_SIZE_MB)。如果还需要限速,可在读取流时引入限速库或采用分片/分段上传策略。
  • 代理:在企业网络中可使用 HTTP_PROXY/HTTPS_PROXY/NO_PROXY 环境变量,或通过 axios 的 proxy 配置进行代理访问。
  • 进度统计:本示例基于文件读取流长度统计,若中间有压缩或变更内容长度的中间件,进度可能与服务器已接收的字节数存在偏差。
  • 安全性:令牌请使用安全配置管理(环境变量、密钥管理服务等),避免将 Token 明文提交到代码仓库。

下面给出一套可直接复用的高性能调用模板,展示如何在 Go 1.21 中使用 net/http + context 并发拉取商品详情,开启全局连接复用与 HTTP/2,设置整体与单请求超时,JSON 解码后按价格排序并稳定打印。

步骤 1:初始化项目

  • 在项目根目录执行
  • go mod init example.com/apiguide

步骤 2:编写 main.go 将以下代码保存为 main.go。注意:示例 URL 为占位,请替换为你的真实 API 基址;如需鉴权,请设置环境变量 EXAMPLE_TOKEN。

package main

import ( "context" "encoding/json" "fmt" "net" "net/http" "os" "sort" "sync" "time" )

type Product struct { ID int json:"id" Name string json:"name" Price float64 json:"price" }

func fetch(ctx context.Context, client *http.Client, id int) (Product, error) { url := fmt.Sprintf("https://api.example.com/v1/products/%d", id) // TODO: 替换为真实接口 req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return Product{}, err }

// 常用请求头:JSON、鉴权、UA
req.Header.Set("Accept", "application/json")
req.Header.Set("User-Agent", "apiguide/1.0 (+https://example.com)")
if token := os.Getenv("EXAMPLE_TOKEN"); token != "" {
    req.Header.Set("Authorization", "Bearer "+token)
}

resp, err := client.Do(req)
if err != nil {
    return Product{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
    return Product{}, fmt.Errorf("status %d", resp.StatusCode)
}

var p Product
if err := json.NewDecoder(resp.Body).Decode(&p); err != nil {
    return Product{}, err
}
return p, nil

}

func main() { // 全局 Transport:连接池、Keep-Alive、HTTP/2、超时参数 tr := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ Timeout: 3 * time.Second, // TCP 连接超时 KeepAlive: 30 * time.Second, // 保持连接心跳 }).DialContext, ForceAttemptHTTP2: true, // 对支持 ALPN 的 TLS 主机启用 HTTP/2 MaxIdleConns: 100, // 全局最大空闲连接 MaxIdleConnsPerHost: 20, // 每主机最大空闲连接(按并发规模调优) MaxConnsPerHost: 0, // 0 表示不限制 IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 5 * time.Second, ExpectContinueTimeout: 1 * time.Second, // ResponseHeaderTimeout 可选:上限等待首包时间(也可依赖每请求的 context) // ResponseHeaderTimeout: 4 * time.Second, }

// 复用单例 http.Client(不要为每个请求创建新 client)
client := &http.Client{
    Transport: tr,
    // 不设置 Client.Timeout,改用 context 控制单请求和整体超时,避免双重超时冲突
}

// 需要拉取的商品 ID
ids := []int{101, 205, 309, 412, 518}

// 整体超时(所有请求的总预算时间)
ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
defer cancel()

var (
    wg       sync.WaitGroup
    mu       sync.Mutex
    products = make([]Product, 0, len(ids))
)

for _, id := range ids {
    id := id // 闭包捕获
    wg.Add(1)
    go func() {
        defer wg.Done()
        // 单请求超时(连接、TLS、首包、读取等都受此 ctx 控制)
        cctx, ccancel := context.WithTimeout(ctx, 3*time.Second)
        defer ccancel()

        if p, err := fetch(cctx, client, id); err == nil {
            mu.Lock()
            products = append(products, p)
            mu.Unlock()
        } else {
            // 需求:单请求失败或超时忽略;如需排查可打印到 stderr
            // fmt.Fprintf(os.Stderr, "fetch %d error: %v\n", id, err)
        }
    }()
}

wg.Wait()

// 稳定排序与稳定打印:价格降序;同价时按 ID 升序再按名称升序,确保结果稳定
sort.Slice(products, func(i, j int) bool {
    if products[i].Price == products[j].Price {
        if products[i].ID == products[j].ID {
            return products[i].Name < products[j].Name
        }
        return products[i].ID < products[j].ID
    }
    return products[i].Price > products[j].Price
})

for _, p := range products {
    fmt.Printf("%d\t%s\t%.2f\n", p.ID, p.Name, p.Price)
}

// 可选:程序退出前显式关闭空闲连接
// tr.CloseIdleConnections()

}

步骤 3:运行与验证

  • go run .
  • 控制台将输出成功请求到的商品,按价格降序排列(同价按 ID、名称保证稳定顺序)。
  • 注意:示例域名为占位。若未替换为真实 API,将无法返回数据。你可以:
    • 替换为你的线上 API 并设置 EXAMPLE_TOKEN;
    • 或使用本地/Mock 服务返回 JSON 验证流程。

步骤 4:行为说明与关键点

  • 全局连接复用与 HTTP/2:
    • 通过复用单例 http.Client 与自定义 Transport 打开连接池、Keep-Alive、HTTP/2(ForceAttemptHTTP2)。
    • MaxIdleConns/MaxIdleConnsPerHost 根据并发与目标服务调优,降低握手与慢启动成本。
  • 超时控制:
    • 整体超时:context.WithTimeout(ctx, 8s) 控制整批请求的上限耗时。
    • 单请求超时:每个 goroutine 内再包一层 3s 超时,独立控制单个请求生命周期。
    • Dial/TLS/首包等由请求 context 与 Transport 超时共同约束。
  • 并发与聚合:
    • 使用 goroutine + WaitGroup 合并结果,失败或超时的请求被忽略,仅输出成功数据。
    • 使用互斥锁安全地向切片追加结果。
  • JSON 解码与稳定输出:
    • json.NewDecoder 流式解码单个响应体。
    • 排序采用稳定且确定的多键排序:价格降序,ID 升序,名称升序,保证输出顺序稳定可复现。
  • 请求构造与鉴权:
    • 新建带 ctx 的请求、设置 Accept、User-Agent 与可选 Authorization: Bearer
    • token 从环境变量 EXAMPLE_TOKEN 读取,避免硬编码。

可选优化与扩展

  • 限制并发度:使用带缓冲通道或令牌桶实现限流,例如 semaphore := make(chan struct{}, 50)。
  • 重试与退避:对可重试的幂等请求引入指数退避重试,并在 ctx 剩余时间内尝试。
  • 统一日志与指标:记录失败原因、时延分布、超时比率;导出 Prometheus 指标。
  • 缓存:对热门商品加本地/分布式缓存,降低下游压力。
  • 大响应优化:若响应极大,可用 json.Decoder 增量读取或切换到 jsoniter 以提升吞吐(权衡兼容性)。
  • Header 标准化:统一 User-Agent、X-Request-ID、Trace-Context 便于跨服务追踪。

示例详情

解决的问题

帮助开发者快速掌握如何在特定编程语言中调用指定的库或API,以高效完成具体任务。这一过程中,重点在于提供清晰的指导与代码示例,降低学习曲线并提升开发效率。

适用用户

软件开发初学者

帮助入门开发者快速掌握特定编程语言中的库或API使用技巧,从基础任务入手提升实践能力。

资深开发者

为经验丰富的程序员提供复杂任务的清晰实现路径,节省时间、提升技术应用效率。

技术培训讲师

为讲师设计课程内容时提供可靠开发案例,构建结构清晰的教学素材。

特征总结

快速生成编程指导,帮助用户高效掌握库或API的使用方法。
清晰的代码示例,逐步展示任务实现过程,降低技术学习门槛。
支持多种编程语言,让开发者轻松适配不同开发环境。
自动化任务分解,将复杂问题拆解为简单步骤,易于理解和执行。
灵活定制任务需求,根据目标场景个性化调整指导方案。
对接学习与开发场景,适合初学者与资深开发者共同使用。
明确重点法则,提供必要的注意事项,避免技术踩坑。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 111 tokens
- 5 个可调节参数
{ 编程语言 } { 库或API名称 } { 任务描述 } { 操作步骤 } { 可选说明 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59