×
¥
查看详情
🔥 会员专享 文生代码 工具

API示例指引生成

👁️ 542 次查看
📅 Nov 19, 2025
💡 核心价值: 该提示词可根据用户输入的编程语言、API或库名称、目标任务与关键参数,生成清晰可复现的使用指引,包括完整代码示例与关键参数解释。所有内容均基于用户显式输入信息,不依赖外部自动推断,确保可控与准确。适用于快速理解 API 功能、完成集成开发或编写内部技术文档,帮助开发者高效实现目标功能。

🎯 可自定义参数(4个)

编程语言
编程语言选择
API或库名称与用途说明
API或库的名称及其用途说明
目标任务说明
目标任务的具体说明
关键参数列表
关键参数列表的详细说明

🎨 效果示例

下面是一份可直接运行的 Python 指南与示例代码,演示如何集成“示例支付网关 REST API”完成签名、下单、回调验签、日志、超时/重试以及幂等键配置。示例默认读取环境变量并提供可运行的本地演示(DRY_RUN=1 时不发起真实 HTTP 请求,仅打印与校验)。

代码示例(Python 3.9+):

import os
import hmac
import hashlib
import json
import time
import uuid
import string
import random
import logging
from datetime import datetime, timezone
from typing import Tuple, Dict, Any, Optional

try:
    import requests
except ImportError:
    raise SystemExit("请先安装 requests :pip install requests")

# ---------------------------
# 配置与工具
# ---------------------------

def iso8601_utc_now() -> str:
    # ISO8601,UTC 带 Z,精度到秒
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

def rand_nonce(n: int = 16) -> str:
    alphabet = string.ascii_letters + string.digits
    return ''.join(random.choice(alphabet) for _ in range(n))

def json_bytes(data: Dict[str, Any]) -> bytes:
    # 与请求体一致性:UTF-8,去空格(不影响语义,利于签名稳定)
    return json.dumps(data, ensure_ascii=False, separators=(',', ':')).encode('utf-8')

def hmac_sha256_hex(secret: str, message: str) -> str:
    return hmac.new(secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).hexdigest()

# ---------------------------
# 客户端实现
# ---------------------------

class PaymentClient:
    def __init__(self,
                 base_url: str,
                 merchant_id: str,
                 key_id: str,
                 secret: str,
                 timeout_ms: int = 8000,
                 retry: int = 3,
                 dry_run: bool = True):
        self.base_url = base_url.rstrip('/')
        self.merchant_id = merchant_id
        self.key_id = key_id
        self.secret = secret
        self.timeout_ms = timeout_ms
        self.retry = retry
        self.dry_run = dry_run

        self.session = requests.Session()
        self.logger = logging.getLogger("pay.client")

    @staticmethod
    def _build_signature_string(method: str, path: str, query: str, body_bytes: bytes, timestamp: str, nonce: str) -> str:
        # 签名原文:method+path+query+body+timestamp+nonce(UTF-8)
        body_str = body_bytes.decode('utf-8') if body_bytes else ''
        return f"{method.upper()}{path}{query}{body_str}{timestamp}{nonce}"

    def _sign_request(self, method: str, path: str, query: str, body_bytes: bytes, timestamp: str, nonce: str) -> str:
        message = self._build_signature_string(method, path, query, body_bytes, timestamp, nonce)
        return hmac_sha256_hex(self.secret, message)

    def create_order(self,
                     endpoint: str,
                     body: Dict[str, Any],
                     idempotency_key: str) -> Tuple[int, Dict[str, Any], Dict[str, str]]:
        method = "POST"
        path = endpoint  # 示例:/v1/orders/create
        query = ""       # 本例无查询参数
        ts = iso8601_utc_now()
        nonce = rand_nonce()

        body_b = json_bytes(body)
        signature = self._sign_request(method, path, query, body_b, ts, nonce)

        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "X-Merchant-Id": self.merchant_id,
            "X-Key-Id": self.key_id,
            "X-Timestamp": ts,
            "X-Nonce": nonce,
            "X-Signature": signature,
            "Idempotency-Key": idempotency_key,
        }

        url = f"{self.base_url}{path}"
        timeout_sec = self.timeout_ms / 1000.0

        # 简单指数退避:0.5, 1.0, 2.0(秒),带少量抖动
        backoffs = [0.5 * (2 ** i) for i in range(max(self.retry, 1))]
        last_exc = None

        for attempt, backoff in enumerate(backoffs, start=1):
            start = time.time()
            req_id = str(uuid.uuid4())  # 本地生成,若服务端返回 X-Request-Id 会覆盖
            try:
                if self.dry_run:
                    # 演示模式:打印并构造一个模拟响应
                    self.logger.info("DRY_RUN: POST %s attempt=%d idem=%s", url, attempt, idempotency_key)
                    self.logger.info("Headers: %s", {k: headers[k] for k in ['X-Merchant-Id','X-Key-Id','X-Timestamp','X-Nonce','X-Signature','Idempotency-Key']})
                    self.logger.info("Body: %s", body)
                    # 模拟服务端响应
                    elapsed_ms = int((time.time() - start) * 1000)
                    mock_status = 201
                    mock_resp = {
                        "request_id": req_id,
                        "code": "SUCCESS",
                        "message": "created",
                        "data": {
                            "order_id": body.get("order_id"),
                            "pay_url": "https://sandbox.pay.example.com/pay/" + body.get("order_id", ""),
                            "status": "PENDING"
                        }
                    }
                    self.logger.info("request_id=%s status=%d elapsed_ms=%d", mock_resp["request_id"], mock_status, elapsed_ms)
                    return mock_status, mock_resp, headers

                resp = self.session.post(url, data=body_b, headers=headers, timeout=timeout_sec)
                elapsed_ms = int((time.time() - start) * 1000)

                # 提取服务端 request_id(如存在)
                req_id = resp.headers.get("X-Request-Id", req_id)

                # 记录日志
                err_summary = None if resp.ok else f"http_status={resp.status_code}"
                self.logger.info("request_id=%s status=%d elapsed_ms=%d err=%s",
                                 req_id, resp.status_code, elapsed_ms, err_summary)

                # 5xx 走重试;2xx/4xx 直接返回
                if 500 <= resp.status_code < 600 and attempt < self.retry:
                    time.sleep(backoff + random.uniform(0, 0.1))
                    continue

                # 尝试解析 JSON
                try:
                    resp_json = resp.json()
                except Exception:
                    resp_json = {"raw": resp.text}

                return resp.status_code, resp_json, headers

            except (requests.Timeout, requests.ConnectionError) as e:
                elapsed_ms = int((time.time() - start) * 1000)
                self.logger.warning("request_id=%s timeout/conn error on attempt=%d elapsed_ms=%d err=%s",
                                    req_id, attempt, elapsed_ms, str(e))
                last_exc = e
                if attempt < self.retry:
                    time.sleep(backoff + random.uniform(0, 0.1))
                    continue
                else:
                    # 达到最大重试次数
                    return 0, {"code": "NETWORK_ERROR", "message": str(e)}, headers

        # 正常不会到这里
        raise RuntimeError(f"unreachable, last_exc={last_exc}")

# ---------------------------
# 回调验签与防重放
# ---------------------------

class CallbackVerifier:
    def __init__(self, secret: str, replay_ttl_sec: int = 300):
        self.secret = secret
        self.replay_ttl = replay_ttl_sec
        self.nonce_seen: Dict[str, float] = {}  # nonce -> first_seen_unix

    def _prune(self, now: float):
        # 清理过期 nonce
        expired_keys = [k for k, ts in self.nonce_seen.items() if now - ts > self.replay_ttl]
        for k in expired_keys:
            self.nonce_seen.pop(k, None)

    def _sign_callback_payload(self, body_bytes: bytes, timestamp: str, nonce: str) -> str:
        # 回调验签无需额外路径:按照“回包原文 + timestamp + nonce”
        base = f"{body_bytes.decode('utf-8')}{timestamp}{nonce}"
        return hmac_sha256_hex(self.secret, base)

    def verify(self,
               headers: Dict[str, str],
               raw_body: bytes,
               expected_order_id: str,
               expected_amount: int) -> Tuple[bool, str]:
        # 1) 取回调头
        ts = headers.get("X-Timestamp", "")
        nonce = headers.get("X-Nonce", "")
        sig = headers.get("X-Signature", "")

        if not ts or not nonce or not sig:
            return False, "missing required callback headers"

        # 2) 计算签名并比对
        calc = self._sign_callback_payload(raw_body, ts, nonce)
        if calc != sig:
            return False, "signature mismatch"

        # 3) 防重放(nonce 去重)
        now = time.time()
        self._prune(now)
        if nonce in self.nonce_seen:
            return False, "replay detected (nonce used)"
        self.nonce_seen[nonce] = now

        # 4) 业务一致性校验
        try:
            obj = json.loads(raw_body.decode('utf-8'))
        except Exception:
            return False, "invalid JSON body"
        if obj.get("order_id") != expected_order_id:
            return False, "order_id mismatch"
        if int(obj.get("amount", -1)) != int(expected_amount):
            return False, "amount mismatch"

        return True, "ok"

# ---------------------------
# 演示主流程
# ---------------------------

def main():
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(name)s %(message)s"
    )

    # 1) 环境变量读取(提供默认值便于直接运行)
    base_url = os.getenv("PAY_BASE_URL", "https://sandbox.pay.example.com")
    merchant_id = os.getenv("PAY_MERCHANT_ID", "MCH_10001")
    key_id = os.getenv("PAY_KEY_ID", "KEY_MAIN")
    secret = os.getenv("PAY_SECRET", "secret_demo_123456")  # 请在生产使用安全管理
    dry_run = os.getenv("DRY_RUN", "1") == "1"

    # 超时与重试(可通过环境变量覆盖)
    timeout_ms = int(os.getenv("PAY_TIMEOUT_MS", "8000"))
    retry = int(os.getenv("PAY_RETRY", "3"))

    client = PaymentClient(
        base_url=base_url,
        merchant_id=merchant_id,
        key_id=key_id,
        secret=secret,
        timeout_ms=timeout_ms,
        retry=retry,
        dry_run=dry_run
    )

    # 2) 构造请求体与幂等键
    endpoint = "/v1/orders/create"
    body = {
        "order_id": "ORD_20250118_001",
        "amount": 25900,
        "currency": "CNY",
        "subject": "蓝牙耳机-黑色",
        "description": "含运费,7日内发货",
        "notify_url": "https://shop.example.com/pay/notify",
        "return_url": "https://shop.example.com/pay/return",
        "expire_minutes": 15,
        "metadata": {"buyer": "u_1288", "channel": "app"},
    }
    idem_key = "IDEM-ORD-20250118-001"

    # 3) 发起创建订单请求(支持超时/重试/幂等)
    status, resp_json, req_headers = client.create_order(endpoint, body, idem_key)

    print("\n=== Create Order Result ===")
    print("HTTP Status:", status)
    print("Response JSON:", json.dumps(resp_json, ensure_ascii=False))
    print("Used Idempotency-Key:", req_headers["Idempotency-Key"])

    # 4) 模拟接收异步回调并校验签名、防重放、数据一致性
    verifier = CallbackVerifier(secret=secret, replay_ttl_sec=300)

    callback_body = {
        "event": "payment.success",
        "order_id": body["order_id"],
        "amount": body["amount"],
        "currency": body["currency"],
        "paid_at": iso8601_utc_now(),
        "transaction_id": "TXN_" + uuid.uuid4().hex[:12]
    }
    cb_raw = json_bytes(callback_body)
    cb_ts = iso8601_utc_now()
    cb_nonce = rand_nonce()
    cb_sig = hmac_sha256_hex(secret, f"{cb_raw.decode('utf-8')}{cb_ts}{cb_nonce}")

    cb_headers = {
        "X-Timestamp": cb_ts,
        "X-Nonce": cb_nonce,
        "X-Signature": cb_sig
    }

    ok, reason = verifier.verify(cb_headers, cb_raw, expected_order_id=body["order_id"], expected_amount=body["amount"])
    print("\n=== Callback Verify Result ===")
    print("Valid:", ok, "Reason:", reason)

    # 5) 模拟重放(应失败)
    ok2, reason2 = verifier.verify(cb_headers, cb_raw, expected_order_id=body["order_id"], expected_amount=body["amount"])
    print("Replay Valid:", ok2, "Reason:", reason2)

if __name__ == "__main__":
    main()

运行方式:

  • 默认 DRY_RUN=1 只打印请求与模拟响应,便于本地演示:python demo.py
  • 若你已有可用网关或代理服务,将 DRY_RUN=0 且设置环境变量:

关键参数与规则说明:

  • base_url: API 基础地址。示例 https://pay.example.com(沙箱 https://sandbox.pay.example.com)
  • endpoint:
    • 下单路径 /v1/orders/create
    • 回调验签无需额外路径(使用回包原文 + X-Timestamp + X-Nonce 进行 HMAC 校验)
  • auth.headers:
    • X-Merchant-Id: 商户号(示例 MCH_10001)
    • X-Key-Id: 密钥标识(示例 KEY_MAIN)
    • X-Timestamp: ISO8601(UTC,示例 2025-01-18T08:00:00Z)
    • X-Nonce: 随机字串(防重放)
    • X-Signature: HMAC-SHA256 十六进制小写;请求侧计算规则见 signature
  • idempotency:
    • Idempotency-Key: 幂等键(示例 IDEM-ORD-20250118-001),相同键+相同请求体保证服务端只处理一次
  • request.body 字段:
    • order_id: 订单号(示例 ORD_20250118_001)
    • amount: 金额(单位分,示例 25900)
    • currency: 货币代码(示例 CNY)
    • subject: 简要主题(示例 “蓝牙耳机-黑色”)
    • description: 订单描述(示例 “含运费,7日内发货”)
    • notify_url: 异步回调地址(示例 https://shop.example.com/pay/notify)
    • return_url: 同步返回地址(示例 https://shop.example.com/pay/return)
    • expire_minutes: 超时时长(示例 15)
    • metadata: 自定义扩展(示例 {"buyer":"u_1288","channel":"app"})
  • signature:
    • 算法(请求):HMAC-SHA256(secret, method+path+query+body+timestamp+nonce)
    • 字符集:UTF-8;输出十六进制小写
    • 回调验签:按“回包原文 + X-Timestamp + X-Nonce”与 X-Signature 比对(本示例依题设“回调验签无需额外路径”)
  • http:
    • Content-Type: application/json
    • Accept: application/json
    • timeout_ms: 8000(示例)
    • retry: 3 次(指数退避,基于幂等键)
  • callback 验签与重放保护:
    • 使用回调头 X-Timestamp、X-Nonce、X-Signature 与回包原文计算签名并校验
    • 校验业务一致性:order_id、amount 必须与下单一致
    • nonce 去重并设置 TTL(示例 5 分钟)防重放
  • 日志:
    • 记录 request_id(若服务端返回 X-Request-Id 则使用;否则本地生成)、HTTP 响应码、耗时(ms)与失败原因摘要(如网络错误、5xx 状态)

提示:

  • 生产环境请通过安全方式管理 PAY_SECRET(如密钥管理服务/环境注入),并将 DRY_RUN 设为 0。
  • 若服务端的回调签名算法与示例不同,请按网关文档调整 verify 逻辑(本示例严格按题设实现)。

下面给出一个最小可用的 TypeScript 示例与参数说明,演示如何用 fetch 调用 CodeGenX HTTP API 的 /v1/code/generate 端点,以 SSE 流式消费增量代码,增量拼接并按 stop 规则截断,最终写入文件;同时实现超时、重试(指数退避+抖动)、网络中断重连(复用相同 user 与 seed)与 X-Request-Id 记录。示例完全基于你提供的信息,不做额外推断。

示例代码(Node 18+ 或任意带 fetch 的环境)

// tsconfig 请确保 "moduleResolution": "node", "module": "esnext"(或适配你的环境)
import { writeFile } from "node:fs/promises";

type RetryPolicy = {
  max_attempts: number;        // 示例: 3
  backoff_ms: number;          // 示例: 500
  jitter: boolean;             // 示例: true
};

type GenerateBody = {
  model: string;               // 示例: "code-medium"
  prompt: string;              // 示例: "实现一个LRU缓存,含get/put与过期"
  language: string;            // 示例: "typescript"
  style?: string;              // 示例: "functional" | "oop"
  temperature?: number;        // 浮点 0~1(示例: 0.2)
  max_tokens?: number;         // 示例: 800
  stream?: boolean;            // true 表示 SSE
  stop?: string[];             // 示例: ["```", "\n\n#"]
  seed?: number;               // 示例: 42
  user?: string;               // 示例: "dev_2001"
  timeout_ms?: number;         // 示例: 15000(服务端可见字段,按提供信息原样传)
  retry?: RetryPolicy;         // 放入 body(按提供信息原样传)
};

type StreamDelta = {
  delta?: string;
  finish_reason?: string;      // "stop" 时结束
};

const ENDPOINT = "https://api.codegenx.example/v1/code/generate";

type GenerateOptions = {
  token: string;               // Authorization: Bearer <token>
  requestId?: string;          // 可选 X-Request-Id
  outFile: string;             // 输出文件路径
  body: GenerateBody;          // 上述 body
  // 客户端侧控制(与 body.timeout_ms/retry 相互独立,二者都会生效):
  clientTimeoutMs?: number;    // 客户端超时,使用 AbortController
  clientRetry?: RetryPolicy;   // 客户端重试策略(指数退避+抖动)
};

function sleep(ms: number) {
  return new Promise((r) => setTimeout(r, ms));
}

function backoffDelay(base: number, attemptIndex: number, jitter: boolean) {
  // attemptIndex 从 0 开始:0->base, 1->base*2, 2->base*4 ...
  const baseDelay = base * Math.pow(2, attemptIndex);
  if (!jitter) return baseDelay;
  const rand = Math.random() * baseDelay;
  return Math.floor(baseDelay / 2 + rand / 2); // 简单抖动:±50%
}

// 将增量安全拼接,处理重连导致的重复前缀
function appendWithOverlapDedup(acc: string, delta: string): string {
  if (!delta) return acc;
  if (acc.endsWith(delta)) return acc;
  const maxOverlap = Math.min(acc.length, delta.length);
  let overlap = 0;
  for (let n = maxOverlap; n > 0; n--) {
    if (acc.slice(-n) === delta.slice(0, n)) {
      overlap = n;
      break;
    }
  }
  return acc + delta.slice(overlap);
}

// 基于 stop 规则截断:返回 [截断后的文本, 是否命中stop]
function applyStopRules(text: string, stops?: string[]): [string, boolean] {
  if (!stops || stops.length === 0) return [text, false];
  let cut = -1;
  for (const s of stops) {
    const i = text.indexOf(s);
    if (i !== -1) cut = cut === -1 ? i : Math.min(cut, i);
  }
  if (cut !== -1) return [text.slice(0, cut), true];
  return [text, false];
}

// 解析 SSE: 将 response 的 text/event-stream 流拆分为事件,提取 data: 行
async function consumeSSE(
  res: Response,
  onData: (payload: string) => void,
  onFinish: (reason: string | null, requestId: string | null) => void
) {
  const reader = res.body?.getReader();
  if (!reader) throw new Error("ReadableStream not available");
  const decoder = new TextDecoder("utf-8");
  let buffer = "";
  let finished = false;

  const requestId = res.headers.get("x-request-id"); // 记录服务端回传的 Request-Id

  while (!finished) {
    const { value, done } = await reader.read();
    if (done) break;
    buffer += decoder.decode(value, { stream: true });

    // SSE 事件用空行分隔
    let idx: number;
    while ((idx = buffer.indexOf("\n\n")) !== -1) {
      const rawEvent = buffer.slice(0, idx);
      buffer = buffer.slice(idx + 2);

      // 只拼接 data: 行
      const dataLines = rawEvent
        .split("\n")
        .map((l) => l.trim())
        .filter((l) => l.startsWith("data:"))
        .map((l) => l.slice(5).trim());
      if (dataLines.length === 0) continue;

      const dataPayload = dataLines.join("\n");
      if (dataPayload === "[DONE]") {
        finished = true;
        onFinish(null, requestId);
        break;
      }

      try {
        const json: StreamDelta = JSON.parse(dataPayload);
        if (typeof json.delta === "string") {
          onData(json.delta);
        }
        if (json.finish_reason === "stop") {
          finished = true;
          onFinish("stop", requestId);
          break;
        }
      } catch {
        // 若非 JSON(不符合协议),作为原始文本增量尝试处理
        onData(dataPayload);
      }
    }
  }

  if (!finished) onFinish(null, requestId);
}

// 以 SSE 方式请求并将代码写入文件(含客户端超时/重试、网络中断重连、stop 规则)
export async function generateCodeToFile(opts: GenerateOptions) {
  const {
    token,
    requestId,
    outFile,
    body,
    clientTimeoutMs = body.timeout_ms ?? 15000, // 客户端超时;若 body 里也传了 timeout_ms,这里仅作为客户端控制
    clientRetry = body.retry ?? { max_attempts: 3, backoff_ms: 500, jitter: true },
  } = opts;

  // 强制流式
  const reqBody: GenerateBody = { ...body, stream: true };

  let attempt = 0;
  let code = "";
  let stopHit = false;
  let lastServerRequestId: string | null = null;

  while (attempt < Math.max(1, clientRetry.max_attempts)) {
    const controller = new AbortController();
    const t = setTimeout(() => controller.abort(), clientTimeoutMs);

    try {
      const res = await fetch(ENDPOINT, {
        method: "POST",
        signal: controller.signal,
        headers: {
          "Authorization": `Bearer ${token}`,
          "Content-Type": "application/json",
          ...(requestId ? { "X-Request-Id": requestId } : {}),
        },
        body: JSON.stringify(reqBody),
      });

      if (!res.ok) {
        // 仅在 5xx 上重试;其他状态直接抛出(可按需调整)
        const text = await res.text().catch(() => "");
        if (res.status >= 500 && res.status <= 599) {
          throw new Error(`Server ${res.status}: ${text}`);
        } else {
          throw new Error(`HTTP ${res.status}: ${text}`);
        }
      }

      // 开始消费 SSE
      await consumeSSE(
        res,
        (delta) => {
          // 重连时可能重复,做去重拼接
          code = appendWithOverlapDedup(code, delta);
          // 按 stop 规则截断(命中后终止后续处理)
          const [trunc, hit] = applyStopRules(code, reqBody.stop);
          if (hit && !stopHit) {
            stopHit = true;
            code = trunc;
            // 客户端主动不再处理后续片段:不需要额外动作,等 consumeSSE 结束或超时
            // 如需更快停止,可调用 controller.abort() 取消
            controller.abort();
          }
        },
        (reason, reqId) => {
          lastServerRequestId = reqId ?? lastServerRequestId;
        }
      );

      clearTimeout(t);
      // 成功完成(无异常抛出) -> 退出重试循环
      break;
    } catch (err) {
      clearTimeout(t);
      attempt += 1;

      // 网络中断/超时/5xx:按策略重试,复用相同 user 与 seed(已在 reqBody 保持不变)
      if (attempt >= Math.max(1, clientRetry.max_attempts)) {
        // 记录最后一次 X-Request-Id(如果有)
        if (lastServerRequestId) {
          console.error(`Failed with X-Request-Id: ${lastServerRequestId}`);
        }
        throw err;
      }
      const delay = backoffDelay(clientRetry.backoff_ms, attempt - 1, clientRetry.jitter);
      await sleep(delay);
      // 继续下一次尝试(code 已累积,appendWithOverlapDedup 将尽量去重)
    }
  }

  // 简单“格式化”:去除两端空白并以换行结束(外部格式化工具可自行接入)
  const finalCode = code.trimEnd() + "\n";
  await writeFile(outFile, finalCode, "utf-8");
  return { outFile, bytes: Buffer.byteLength(finalCode, "utf-8"), requestId: lastServerRequestId ?? requestId ?? null };
}

// 使用示例
async function demo() {
  const token = process.env.CODEGENX_TOKEN || "<token>";
  const requestId = "req-" + Math.random().toString(36).slice(2);

  await generateCodeToFile({
    token,
    requestId,
    outFile: "lru.ts",
    body: {
      model: "code-medium",
      prompt: "实现一个LRU缓存,含get/put与过期",
      language: "typescript",
      style: "oop",
      temperature: 0.2,
      max_tokens: 800,
      stream: true,
      stop: ["```", "\n\n#"],
      seed: 42,
      user: "dev_2001",
      timeout_ms: 15000,
      retry: { max_attempts: 3, backoff_ms: 500, jitter: true },
    },
    // 客户端控制(可与 body 中 timeout_ms/retry 并存)
    clientTimeoutMs: 15000,
    clientRetry: { max_attempts: 3, backoff_ms: 500, jitter: true },
  });

  console.log("Code generated to lru.ts");
}

// demo().catch(console.error);

非流式用法(仅参考,设置 stream: false,将一次性返回完整 JSON)

async function generateNonStream(token: string) {
  const res = await fetch("https://api.codegenx.example/v1/code/generate", {
    method: "POST",
    headers: {
      "Authorization": `Bearer ${token}`,
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      model: "code-medium",
      prompt: "实现一个LRU缓存,含get/put与过期",
      language: "typescript",
      temperature: 0.2,
      max_tokens: 800,
      stream: false,
      seed: 42,
      user: "dev_2001",
      timeout_ms: 15000,
      retry: { max_attempts: 3, backoff_ms: 500, jitter: true },
    }),
  });
  const json = await res.json() as { code: string; usage?: { prompt_tokens: number; completion_tokens: number } };
  await writeFile("lru.ts", (json.code ?? "").trimEnd() + "\n", "utf-8");
}

关键参数说明(基于你提供的信息)

  • endpoint 与方法
  • headers
    • Authorization: Bearer (必填,认证)
    • Content-Type: application/json(必填)
    • X-Request-Id(可选,便于日志与问题排查;服务端也可能回传该头)
  • body 参数
    • model: 模型名称,例如 code-medium。默认值:未在提供信息中指定。
    • prompt: 生成意图/描述。默认值:未在提供信息中指定。
    • language: 目标语言,例如 typescript、python、go。默认值:未在提供信息中指定。
    • style: 代码风格,例如 functional、oop。默认值:未在提供信息中指定。
    • temperature: 浮点数 0~1,越低越保守,示例 0.2。默认值:未在提供信息中指定。
    • max_tokens: 最大生成长度,示例 800。默认值与上限:未在提供信息中指定。
    • stream: 是否流式,示例 true。默认值:未在提供信息中指定。
    • stop: 终止符列表,示例 ["```", "\n\n#"]。命中即应停止拼接与/或主动取消请求。默认值:未在提供信息中指定。
    • seed: 固定随机种(可复现),示例 42。默认值:未在提供信息中指定。
    • user: 追踪用户/会话 ID,示例 dev_2001。默认值:未在提供信息中指定。
    • timeout_ms: 客户端超时(按你提供的信息,该字段也在 body 中给出),示例 15000。默认值:未在提供信息中指定。
    • retry: { max_attempts, backoff_ms, jitter },示例 {max_attempts:3, backoff_ms:500, jitter:true}。默认值:未在提供信息中指定。
  • 响应
    • 流式:SSE,每个事件的 data 行携带 JSON { delta, finish_reason? }。当收到 data: [DONE] 或 finish_reason=stop 时结束。
    • 非流式:一次性 JSON { code: string, usage: { prompt_tokens, completion_tokens } }。
  • 增量与文件输出
    • 客户端将 delta 依序拼接;若命中 stop 则截断并可主动取消请求;结束后对结果进行简单格式化(示例中为 trimEnd + 换行),写入文件。
  • 错误与健壮性
    • 网络中断/超时:客户端按 retry(指数退避+抖动)重试;重试时复用相同 user 与 seed 以获得可重复输出;若重连产生重复前缀,示例代码通过重叠去重避免重复拼接。
    • 超时取消:AbortController 基于 clientTimeoutMs;同时保留 body.timeout_ms 供服务端参考(二者互不冲突)。
    • 记录 X-Request-Id:读取服务端响应头 x-request-id,失败时打印便于排查。

备注

  • 示例严格使用你提供的字段与协议细节;未给出明确默认值或范围的字段均标注为“未在提供信息中指定”。如果需要更严格的去重或断点续传,需要 API 额外提供事件序号或游标等能力(本示例未做外推)。

下面给出一个面向 Go 开发者的简明指南与示例,演示如何调用 SQLCraft REST API,把自然语言意图转为安全的只读 SQL,并支持非流式与流式增量输出、解析 reasoning 字段与安全拦截。

一、关键参数与最佳实践

  • endpoint: https://api.sqlcraft.example/v1/sql/generate
  • method: POST
  • headers:
    • Authorization: Bearer (必填)
    • Content-Type: application/json(必填)
    • X-Trace-Id: 可选,建议传业务侧请求链路 ID,便于审计与排障
  • body 参数与最佳实践
    • dialect: 数据库方言。postgres、mysql、sqlite。示例 postgres。与 schema 保持一致
    • schema: 表结构上下文。建议提供字段类型与关键索引字段,越完整越好
    • intent: 自然语言意图。尽量明确时间范围/过滤条件/聚合粒度
    • constraints:
      • only_select: true(强制只读)
      • table_whitelist: 限定允许使用的表,如 ["orders","users"]
      • use_cte: true(鼓励生成可读性更好的 CTE)
    • temperature: 0~1。推荐生产环境取低值(如 0.1)保证稳定性
    • max_tokens: 上限,控制响应大小(如 512)
    • stream: 是否启用流式增量 sql 片段(SSE)。需要前端实时展示时设为 true
    • safety_check: true 启用服务端安全检查(与本地拦截配合提升稳健性)
    • audit_tag: 审计标签,如 rpt_monthly,便于归档
    • timeout_ms: 客户端超时(毫秒),同时在 http.Client 也设定一致超时
  • 响应
    • 非流式:{sql:string, reasoning:string, tokens_used:int, warnings:[string]}
    • 流式:SSE data 行载荷 {sql_delta:string},最终一条 {finish_reason:"stop"}
  • 安全约束
    • 拒绝包含 drop/alter/truncate/update/insert
    • 仅允许 select(含 with/cte)
    • 跨表 join 仅限于白名单中的表

二、Go 最小可用示例:非流式一次性生成与解析 reasoning

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"os"
	"strings"
	"time"
)

const Endpoint = "https://api.sqlcraft.example/v1/sql/generate"

type Constraints struct {
	OnlySelect     bool     `json:"only_select"`
	TableWhitelist []string `json:"table_whitelist"`
	UseCTE         bool     `json:"use_cte"`
}

type GenerateRequest struct {
	Dialect     string      `json:"dialect"`
	Schema      string      `json:"schema"`
	Intent      string      `json:"intent"`
	Constraints Constraints `json:"constraints"`
	Temperature float64     `json:"temperature"`
	MaxTokens   int         `json:"max_tokens"`
	Stream      bool        `json:"stream"`
	SafetyCheck bool        `json:"safety_check"`
	AuditTag    string      `json:"audit_tag"`
	TimeoutMS   int         `json:"timeout_ms"`
}

type GenerateResponse struct {
	SQL        string   `json:"sql"`
	Reasoning  string   `json:"reasoning"`
	TokensUsed int      `json:"tokens_used"`
	Warnings   []string `json:"warnings"`
}

// 仅演示用途的简单安全校验(依赖字符串扫描/正则易漏检,生产可用更健壮的 SQL 解析器)
func ValidateReadOnlySQL(sql string, whitelist []string) (ok bool, reasons []string) {
	s := strings.ToLower(stripSQLComments(sql))
	s = strings.TrimSpace(s)
	if !(strings.HasPrefix(s, "select") || strings.HasPrefix(s, "with")) {
		reasons = append(reasons, "仅允许 SELECT/CTE(SQL 应以 WITH 或 SELECT 开头)")
	}
	// 禁词(服务端已做安全检查,这里本地再兜底)
	for _, kw := range []string{" drop ", " alter ", " truncate ", " update ", " insert "} {
		if strings.Contains(" "+s+" ", kw) {
			reasons = append(reasons, "检测到禁止关键字: "+strings.TrimSpace(kw))
		}
	}
	// 白名单表限制(非常粗略的 from/join 扫描)
	used := extractTableNames(s)
	if len(used) > 0 && len(whitelist) > 0 {
		allow := map[string]struct{}{}
		for _, t := range whitelist {
			allow[strings.ToLower(t)] = struct{}{}
		}
		var notAllowed []string
		for _, t := range used {
			if _, ok := allow[t]; !ok {
				notAllowed = append(notAllowed, t)
			}
		}
		if len(notAllowed) > 0 {
			reasons = append(reasons, "包含非白名单表: "+strings.Join(notAllowed, ", "))
		}
	}
	return len(reasons) == 0, reasons
}

func stripSQLComments(s string) string {
	// 简单移除行注释与块注释(不覆盖所有情况)
	out := s
	// 行注释 --
	lines := strings.Split(out, "\n")
	for i, l := range lines {
		if idx := strings.Index(l, "--"); idx >= 0 {
			lines[i] = l[:idx]
		}
	}
	out = strings.Join(lines, "\n")
	// 块注释 /* */
	for {
		start := strings.Index(out, "/*")
		if start < 0 {
			break
		}
		end := strings.Index(out[start+2:], "*/")
		if end < 0 {
			out = out[:start]
			break
		}
		out = out[:start] + out[start+2+end+2:]
	}
	return out
}

func extractTableNames(s string) []string {
	// 非严格提取 from/join 后第一个标识符(忽略 schema/alias 细节)
	var tables []string
	tokens := strings.Fields(s)
	for i := 0; i < len(tokens); i++ {
		t := strings.ToLower(tokens[i])
		if t == "from" || t == "join" {
			if i+1 < len(tokens) {
				raw := tokens[i+1]
				// 去掉逗号/括号/引号
				raw = strings.Trim(raw, ",()\"`")
				// 如果是 schema.table 取表名部分
				parts := strings.Split(raw, ".")
				base := parts[len(parts)-1]
				// 去掉别名语法中尾随的关键字
				base = strings.Trim(base, ",")
				base = strings.TrimSpace(base)
				// 过滤子查询起始 "("
				if base != "" && base != "(" && base != "select" {
					tables = append(tables, strings.ToLower(base))
				}
			}
		}
	}
	// 去重
	seen := map[string]struct{}{}
	var uniq []string
	for _, t := range tables {
		if _, ok := seen[t]; !ok {
			seen[t] = struct{}{}
			uniq = append(uniq, t)
		}
	}
	return uniq
}

func GenerateSQL(ctx context.Context, token, traceID string, req GenerateRequest) (*GenerateResponse, error) {
	buf, _ := json.Marshal(req)
	httpClient := &http.Client{
		Timeout: time.Duration(req.TimeoutMS) * time.Millisecond,
	}
	httpReq, _ := http.NewRequestWithContext(ctx, http.MethodPost, Endpoint, bytes.NewReader(buf))
	httpReq.Header.Set("Authorization", "Bearer "+token)
	httpReq.Header.Set("Content-Type", "application/json")
	if traceID != "" {
		httpReq.Header.Set("X-Trace-Id", traceID)
	}

	resp, err := httpClient.Do(httpReq)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	body, _ := io.ReadAll(resp.Body)
	if resp.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
	}
	var out GenerateResponse
	if err := json.Unmarshal(body, &out); err != nil {
		return nil, err
	}
	return &out, nil
}

func main() {
	token := os.Getenv("SQLCRAFT_TOKEN")
	if token == "" {
		fmt.Println("please set SQLCRAFT_TOKEN")
		return
	}

	req := GenerateRequest{
		Dialect: "postgres",
		Schema:  "orders(id uuid, user_id uuid, status text, amount int, created_at timestamp); users(id uuid, email text)",
		Intent:  "统计近30天已支付订单金额与笔数,按天汇总",
		Constraints: Constraints{
			OnlySelect:     true,
			TableWhitelist: []string{"orders", "users"},
			UseCTE:         true,
		},
		Temperature: 0.1,
		MaxTokens:   512,
		Stream:      false,
		SafetyCheck: true,
		AuditTag:    "rpt_monthly",
		TimeoutMS:   12000,
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(req.TimeoutMS)*time.Millisecond)
	defer cancel()

	out, err := GenerateSQL(ctx, token, "trace-abc-001", req)
	if err != nil {
		fmt.Println("generate error:", err)
		return
	}

	// 解析 sql 与 reasoning 字段
	fmt.Println("reasoning:", out.Reasoning)
	fmt.Println("warnings:", out.Warnings)

	// 本地安全拦截与建议
	if ok, reasons := ValidateReadOnlySQL(out.SQL, req.Constraints.TableWhitelist); !ok {
		fmt.Println("SQL 安全校验未通过:")
		for _, r := range reasons {
			fmt.Println(" -", r)
		}
		fmt.Println("替代建议:")
		for _, s := range SuggestSafer(out.SQL, req) {
			fmt.Println(" -", s)
		}
		return
	}

	fmt.Println("generated SQL:")
	fmt.Println(out.SQL)
}

// 生成替代建议(基于既定约束与期望范式)
func SuggestSafer(sql string, req GenerateRequest) []string {
	var tips []string
	tips = append(tips,
		"仅使用 SELECT/CTE:以 WITH 或 SELECT 开头,移除任何 DDL/DML 关键字(drop/alter/truncate/update/insert)",
		fmt.Sprintf("限制表到白名单: %v(必要的 JOIN 仅在此范围内)", req.Constraints.TableWhitelist),
		"启用 constraints.only_select=true、constraints.use_cte=true(已在示例中开启)",
		"需要日期维度时,使用 CTE 构造日期序列,对 orders 按天聚合,最后可 LEFT JOIN users 以补充邮箱字段",
	)
	return tips
}

三、Go 流式示例:接收增量 sql 片段(SSE)

package main

import (
	"bufio"
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"strings"
	"time"
)

type StreamDelta struct {
	SQLDelta     string `json:"sql_delta"`
	FinishReason string `json:"finish_reason"`
}

func GenerateSQLStream(ctx context.Context, token, traceID string, req GenerateRequest, onDelta func(fragment string)) (finalSQL string, err error) {
	req.Stream = true
	payload, _ := json.Marshal(req)

	httpClient := &http.Client{
		Timeout: time.Duration(req.TimeoutMS) * time.Millisecond,
	}
	r, _ := http.NewRequestWithContext(ctx, http.MethodPost, Endpoint, bytes.NewReader(payload))
	r.Header.Set("Authorization", "Bearer "+token)
	r.Header.Set("Content-Type", "application/json")
	r.Header.Set("Accept", "text/event-stream")
	if traceID != "" {
		r.Header.Set("X-Trace-Id", traceID)
	}

	resp, err := httpClient.Do(r)
	if err != nil {
		return "", err
	}
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusOK {
		return "", fmt.Errorf("HTTP %d", resp.StatusCode)
	}

	sc := bufio.NewScanner(resp.Body)
	for sc.Scan() {
		line := sc.Text()
		// SSE: 只处理以 "data:" 开头的行
		if !strings.HasPrefix(line, "data:") {
			continue
		}
		data := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
		if data == "" {
			continue
		}
		var delta StreamDelta
		if err := json.Unmarshal([]byte(data), &delta); err != nil {
			continue
		}
		if delta.SQLDelta != "" {
			onDelta(delta.SQLDelta)
			finalSQL += delta.SQLDelta
		}
		if delta.FinishReason == "stop" {
			break
		}
	}
	return finalSQL, sc.Err()
}

使用方式示例:

// ...
req.Stream = true
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(req.TimeoutMS)*time.Millisecond)
defer cancel()

var built strings.Builder
finalSQL, err := GenerateSQLStream(ctx, token, "trace-abc-002", req, func(fragment string) {
	// 这里可将 fragment 增量展示到前端
	built.WriteString(fragment)
})
if err != nil {
	fmt.Println("stream error:", err)
	return
}

// 结束后进行安全校验
if ok, reasons := ValidateReadOnlySQL(finalSQL, req.Constraints.TableWhitelist); !ok {
	fmt.Println("SQL 流式结果安全校验未通过:", reasons)
	for _, s := range SuggestSafer(finalSQL, req) {
		fmt.Println(" -", s)
	}
	return
}
fmt.Println("final SQL:", finalSQL)

四、示例生成期望(样式参考,用于测试验证)

  • 目标:postgres,使用 CTE 生成日期维度,对 orders 近 30 天内 status=‘paid’ 的金额与笔数按天聚合;可选 LEFT JOIN users 以补充邮箱字段
  • 形态要点:
    • WITH date_dim AS (...) 生成日期序列
    • 对 orders 进行按天聚合 sum(amount), count(*)
    • 最终可 LEFT JOIN users ON orders.user_id = users.id 以带出 email(可选)
    • 全程仅 SELECT/CTE,不包含任何 DDL/DML;只出现表 orders、users

五、错误处理与重试建议

  • 建议在 http.Client 超时与 body.timeout_ms 双重控制超时
  • 非 200 状态码直接中止并记录 X-Trace-Id 便于后端排查
  • 对 warnings 字段进行记录,如包含“方言/字段不匹配”等提示时,优先完善 schema 后重试
  • 若本地安全校验未通过:直接拦截结果,不下发到数据库执行;给出上述替代建议并可重试生成

六、小结

  • 通过 constraints.only_select、table_whitelist、use_cte 与本地 ValidateReadOnlySQL 双层约束,确保只读、安全、可审计
  • 非流式适合一次性生成;流式适合前端字词级/片段级实时展示
  • 传入完整 schema 和清晰 intent,可以显著提升生成质量与稳定性

示例详情

该提示词已被收录:
“程序员必备:提升开发效率的专业AI提示词合集”
让 AI 成为你的第二双手,从代码生成到测试文档全部搞定,节省 80% 开发时间
√ 立即可用 · 零学习成本
√ 参数化批量生成
√ 专业提示词工程师打磨

📖 如何使用

30秒出活:复制 → 粘贴 → 搞定
与其花几十分钟和AI聊天、试错,不如直接复制这些经过千人验证的模板,修改几个 {{变量}} 就能立刻获得专业级输出。省下来的时间,足够你轻松享受两杯咖啡!
加载中...
💬 不会填参数?让 AI 反过来问你
不确定变量该填什么?一键转为对话模式,AI 会像资深顾问一样逐步引导你,问几个问题就能自动生成完美匹配你需求的定制结果。零门槛,开口就行。
转为对话模式
🚀 告别复制粘贴,Chat 里直接调用
无需切换,输入 / 唤醒 8000+ 专家级提示词。 插件将全站提示词库深度集成于 Chat 输入框。基于当前对话语境,系统智能推荐最契合的 Prompt 并自动完成参数化,让海量资源触手可及,从此彻底告别"手动搬运"。
即将推出
🔌 接口一调,提示词自己会进化
手动跑一次还行,跑一百次呢?通过 API 接口动态注入变量,接入批量评价引擎,让程序自动迭代出更高质量的提示词方案。Prompt 会自己进化,你只管收结果。
发布 API
🤖 一键变成你的专属 Agent 应用
不想每次都配参数?把这条提示词直接发布成独立 Agent,内嵌图片生成、参数优化等工具,分享链接就能用。给团队或客户一个"开箱即用"的完整方案。
创建 Agent

✅ 特性总结

一键生成多种编程语言的API使用指南,快速解锁跨平台开发潜力。
自动输出清晰实用的代码示例,帮助开发者快速掌握使用技巧。
直观解释关键参数及其作用,减少摸索时间,提升开发效率。
根据具体需求灵活定制内容,满足多样化场景下的开发任务。
即使是复杂功能也能用简洁的表达形式解锁,实现零门槛理解。
支持多领域API与库的使用说明,从后端开发到前端交互全覆盖。
内嵌场景化引导,帮助开发者快速实现目标功能并解决实际问题。
结合上下文智能优化输出,减少冗长表达,精准传达重点。
兼具示例与文本说明的双重优势,为技术文档编写提供标准模板。
节省学习曲线,让不同技术水平的用户都能轻松上手。

🎯 解决的问题

帮助开发者快速掌握目标API或库的使用方法,通过清晰的描述和代码示例,降低学习门槛,提升开发效率。

🕒 版本历史

当前版本
v2.1 2024-01-15
优化输出结构,增强情节连贯性
  • ✨ 新增章节节奏控制参数
  • 🔧 优化人物关系描述逻辑
  • 📝 改进主题深化引导语
  • 🎯 增强情节转折点设计
v2.0 2023-12-20
重构提示词架构,提升生成质量
  • 🚀 全新的提示词结构设计
  • 📊 增加输出格式化选项
  • 💡 优化角色塑造引导
v1.5 2023-11-10
修复已知问题,提升稳定性
  • 🐛 修复长文本处理bug
  • ⚡ 提升响应速度
v1.0 2023-10-01
首次发布
  • 🎉 初始版本上线
COMING SOON
版本历史追踪,即将启航
记录每一次提示词的进化与升级,敬请期待。

💬 用户评价

4.8
⭐⭐⭐⭐⭐
基于 28 条评价
5星
85%
4星
12%
3星
3%
👤
电商运营 - 张先生
⭐⭐⭐⭐⭐ 2025-01-15
双十一用这个提示词生成了20多张海报,效果非常好!点击率提升了35%,节省了大量设计时间。参数调整很灵活,能快速适配不同节日。
效果好 节省时间
👤
品牌设计师 - 李女士
⭐⭐⭐⭐⭐ 2025-01-10
作为设计师,这个提示词帮我快速生成创意方向,大大提升了工作效率。生成的海报氛围感很强,稍作调整就能直接使用。
创意好 专业
COMING SOON
用户评价与反馈系统,即将上线
倾听真实反馈,在这里留下您的使用心得,敬请期待。
加载中...