热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
该提示词可根据用户输入的编程语言、API或库名称、目标任务与关键参数,生成清晰可复现的使用指引,包括完整代码示例与关键参数解释。所有内容均基于用户显式输入信息,不依赖外部自动推断,确保可控与准确。适用于快速理解 API 功能、完成集成开发或编写内部技术文档,帮助开发者高效实现目标功能。
下面是一份可直接运行的 Python 指南与示例代码,演示如何集成“示例支付网关 REST API”完成签名、下单、回调验签、日志、超时/重试以及幂等键配置。示例默认读取环境变量并提供可运行的本地演示(DRY_RUN=1 时不发起真实 HTTP 请求,仅打印与校验)。
代码示例(Python 3.9+):
import os
import hmac
import hashlib
import json
import time
import uuid
import string
import random
import logging
from datetime import datetime, timezone
from typing import Tuple, Dict, Any, Optional
try:
import requests
except ImportError:
raise SystemExit("请先安装 requests :pip install requests")
# ---------------------------
# 配置与工具
# ---------------------------
def iso8601_utc_now() -> str:
# ISO8601,UTC 带 Z,精度到秒
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def rand_nonce(n: int = 16) -> str:
alphabet = string.ascii_letters + string.digits
return ''.join(random.choice(alphabet) for _ in range(n))
def json_bytes(data: Dict[str, Any]) -> bytes:
# 与请求体一致性:UTF-8,去空格(不影响语义,利于签名稳定)
return json.dumps(data, ensure_ascii=False, separators=(',', ':')).encode('utf-8')
def hmac_sha256_hex(secret: str, message: str) -> str:
return hmac.new(secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).hexdigest()
# ---------------------------
# 客户端实现
# ---------------------------
class PaymentClient:
def __init__(self,
base_url: str,
merchant_id: str,
key_id: str,
secret: str,
timeout_ms: int = 8000,
retry: int = 3,
dry_run: bool = True):
self.base_url = base_url.rstrip('/')
self.merchant_id = merchant_id
self.key_id = key_id
self.secret = secret
self.timeout_ms = timeout_ms
self.retry = retry
self.dry_run = dry_run
self.session = requests.Session()
self.logger = logging.getLogger("pay.client")
@staticmethod
def _build_signature_string(method: str, path: str, query: str, body_bytes: bytes, timestamp: str, nonce: str) -> str:
# 签名原文:method+path+query+body+timestamp+nonce(UTF-8)
body_str = body_bytes.decode('utf-8') if body_bytes else ''
return f"{method.upper()}{path}{query}{body_str}{timestamp}{nonce}"
def _sign_request(self, method: str, path: str, query: str, body_bytes: bytes, timestamp: str, nonce: str) -> str:
message = self._build_signature_string(method, path, query, body_bytes, timestamp, nonce)
return hmac_sha256_hex(self.secret, message)
def create_order(self,
endpoint: str,
body: Dict[str, Any],
idempotency_key: str) -> Tuple[int, Dict[str, Any], Dict[str, str]]:
method = "POST"
path = endpoint # 示例:/v1/orders/create
query = "" # 本例无查询参数
ts = iso8601_utc_now()
nonce = rand_nonce()
body_b = json_bytes(body)
signature = self._sign_request(method, path, query, body_b, ts, nonce)
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"X-Merchant-Id": self.merchant_id,
"X-Key-Id": self.key_id,
"X-Timestamp": ts,
"X-Nonce": nonce,
"X-Signature": signature,
"Idempotency-Key": idempotency_key,
}
url = f"{self.base_url}{path}"
timeout_sec = self.timeout_ms / 1000.0
# 简单指数退避:0.5, 1.0, 2.0(秒),带少量抖动
backoffs = [0.5 * (2 ** i) for i in range(max(self.retry, 1))]
last_exc = None
for attempt, backoff in enumerate(backoffs, start=1):
start = time.time()
req_id = str(uuid.uuid4()) # 本地生成,若服务端返回 X-Request-Id 会覆盖
try:
if self.dry_run:
# 演示模式:打印并构造一个模拟响应
self.logger.info("DRY_RUN: POST %s attempt=%d idem=%s", url, attempt, idempotency_key)
self.logger.info("Headers: %s", {k: headers[k] for k in ['X-Merchant-Id','X-Key-Id','X-Timestamp','X-Nonce','X-Signature','Idempotency-Key']})
self.logger.info("Body: %s", body)
# 模拟服务端响应
elapsed_ms = int((time.time() - start) * 1000)
mock_status = 201
mock_resp = {
"request_id": req_id,
"code": "SUCCESS",
"message": "created",
"data": {
"order_id": body.get("order_id"),
"pay_url": "https://sandbox.pay.example.com/pay/" + body.get("order_id", ""),
"status": "PENDING"
}
}
self.logger.info("request_id=%s status=%d elapsed_ms=%d", mock_resp["request_id"], mock_status, elapsed_ms)
return mock_status, mock_resp, headers
resp = self.session.post(url, data=body_b, headers=headers, timeout=timeout_sec)
elapsed_ms = int((time.time() - start) * 1000)
# 提取服务端 request_id(如存在)
req_id = resp.headers.get("X-Request-Id", req_id)
# 记录日志
err_summary = None if resp.ok else f"http_status={resp.status_code}"
self.logger.info("request_id=%s status=%d elapsed_ms=%d err=%s",
req_id, resp.status_code, elapsed_ms, err_summary)
# 5xx 走重试;2xx/4xx 直接返回
if 500 <= resp.status_code < 600 and attempt < self.retry:
time.sleep(backoff + random.uniform(0, 0.1))
continue
# 尝试解析 JSON
try:
resp_json = resp.json()
except Exception:
resp_json = {"raw": resp.text}
return resp.status_code, resp_json, headers
except (requests.Timeout, requests.ConnectionError) as e:
elapsed_ms = int((time.time() - start) * 1000)
self.logger.warning("request_id=%s timeout/conn error on attempt=%d elapsed_ms=%d err=%s",
req_id, attempt, elapsed_ms, str(e))
last_exc = e
if attempt < self.retry:
time.sleep(backoff + random.uniform(0, 0.1))
continue
else:
# 达到最大重试次数
return 0, {"code": "NETWORK_ERROR", "message": str(e)}, headers
# 正常不会到这里
raise RuntimeError(f"unreachable, last_exc={last_exc}")
# ---------------------------
# 回调验签与防重放
# ---------------------------
class CallbackVerifier:
def __init__(self, secret: str, replay_ttl_sec: int = 300):
self.secret = secret
self.replay_ttl = replay_ttl_sec
self.nonce_seen: Dict[str, float] = {} # nonce -> first_seen_unix
def _prune(self, now: float):
# 清理过期 nonce
expired_keys = [k for k, ts in self.nonce_seen.items() if now - ts > self.replay_ttl]
for k in expired_keys:
self.nonce_seen.pop(k, None)
def _sign_callback_payload(self, body_bytes: bytes, timestamp: str, nonce: str) -> str:
# 回调验签无需额外路径:按照“回包原文 + timestamp + nonce”
base = f"{body_bytes.decode('utf-8')}{timestamp}{nonce}"
return hmac_sha256_hex(self.secret, base)
def verify(self,
headers: Dict[str, str],
raw_body: bytes,
expected_order_id: str,
expected_amount: int) -> Tuple[bool, str]:
# 1) 取回调头
ts = headers.get("X-Timestamp", "")
nonce = headers.get("X-Nonce", "")
sig = headers.get("X-Signature", "")
if not ts or not nonce or not sig:
return False, "missing required callback headers"
# 2) 计算签名并比对
calc = self._sign_callback_payload(raw_body, ts, nonce)
if calc != sig:
return False, "signature mismatch"
# 3) 防重放(nonce 去重)
now = time.time()
self._prune(now)
if nonce in self.nonce_seen:
return False, "replay detected (nonce used)"
self.nonce_seen[nonce] = now
# 4) 业务一致性校验
try:
obj = json.loads(raw_body.decode('utf-8'))
except Exception:
return False, "invalid JSON body"
if obj.get("order_id") != expected_order_id:
return False, "order_id mismatch"
if int(obj.get("amount", -1)) != int(expected_amount):
return False, "amount mismatch"
return True, "ok"
# ---------------------------
# 演示主流程
# ---------------------------
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s"
)
# 1) 环境变量读取(提供默认值便于直接运行)
base_url = os.getenv("PAY_BASE_URL", "https://sandbox.pay.example.com")
merchant_id = os.getenv("PAY_MERCHANT_ID", "MCH_10001")
key_id = os.getenv("PAY_KEY_ID", "KEY_MAIN")
secret = os.getenv("PAY_SECRET", "secret_demo_123456") # 请在生产使用安全管理
dry_run = os.getenv("DRY_RUN", "1") == "1"
# 超时与重试(可通过环境变量覆盖)
timeout_ms = int(os.getenv("PAY_TIMEOUT_MS", "8000"))
retry = int(os.getenv("PAY_RETRY", "3"))
client = PaymentClient(
base_url=base_url,
merchant_id=merchant_id,
key_id=key_id,
secret=secret,
timeout_ms=timeout_ms,
retry=retry,
dry_run=dry_run
)
# 2) 构造请求体与幂等键
endpoint = "/v1/orders/create"
body = {
"order_id": "ORD_20250118_001",
"amount": 25900,
"currency": "CNY",
"subject": "蓝牙耳机-黑色",
"description": "含运费,7日内发货",
"notify_url": "https://shop.example.com/pay/notify",
"return_url": "https://shop.example.com/pay/return",
"expire_minutes": 15,
"metadata": {"buyer": "u_1288", "channel": "app"},
}
idem_key = "IDEM-ORD-20250118-001"
# 3) 发起创建订单请求(支持超时/重试/幂等)
status, resp_json, req_headers = client.create_order(endpoint, body, idem_key)
print("\n=== Create Order Result ===")
print("HTTP Status:", status)
print("Response JSON:", json.dumps(resp_json, ensure_ascii=False))
print("Used Idempotency-Key:", req_headers["Idempotency-Key"])
# 4) 模拟接收异步回调并校验签名、防重放、数据一致性
verifier = CallbackVerifier(secret=secret, replay_ttl_sec=300)
callback_body = {
"event": "payment.success",
"order_id": body["order_id"],
"amount": body["amount"],
"currency": body["currency"],
"paid_at": iso8601_utc_now(),
"transaction_id": "TXN_" + uuid.uuid4().hex[:12]
}
cb_raw = json_bytes(callback_body)
cb_ts = iso8601_utc_now()
cb_nonce = rand_nonce()
cb_sig = hmac_sha256_hex(secret, f"{cb_raw.decode('utf-8')}{cb_ts}{cb_nonce}")
cb_headers = {
"X-Timestamp": cb_ts,
"X-Nonce": cb_nonce,
"X-Signature": cb_sig
}
ok, reason = verifier.verify(cb_headers, cb_raw, expected_order_id=body["order_id"], expected_amount=body["amount"])
print("\n=== Callback Verify Result ===")
print("Valid:", ok, "Reason:", reason)
# 5) 模拟重放(应失败)
ok2, reason2 = verifier.verify(cb_headers, cb_raw, expected_order_id=body["order_id"], expected_amount=body["amount"])
print("Replay Valid:", ok2, "Reason:", reason2)
if __name__ == "__main__":
main()
运行方式:
关键参数与规则说明:
提示:
下面给出一个最小可用的 TypeScript 示例与参数说明,演示如何用 fetch 调用 CodeGenX HTTP API 的 /v1/code/generate 端点,以 SSE 流式消费增量代码,增量拼接并按 stop 规则截断,最终写入文件;同时实现超时、重试(指数退避+抖动)、网络中断重连(复用相同 user 与 seed)与 X-Request-Id 记录。示例完全基于你提供的信息,不做额外推断。
示例代码(Node 18+ 或任意带 fetch 的环境)
// tsconfig 请确保 "moduleResolution": "node", "module": "esnext"(或适配你的环境)
import { writeFile } from "node:fs/promises";
type RetryPolicy = {
max_attempts: number; // 示例: 3
backoff_ms: number; // 示例: 500
jitter: boolean; // 示例: true
};
type GenerateBody = {
model: string; // 示例: "code-medium"
prompt: string; // 示例: "实现一个LRU缓存,含get/put与过期"
language: string; // 示例: "typescript"
style?: string; // 示例: "functional" | "oop"
temperature?: number; // 浮点 0~1(示例: 0.2)
max_tokens?: number; // 示例: 800
stream?: boolean; // true 表示 SSE
stop?: string[]; // 示例: ["```", "\n\n#"]
seed?: number; // 示例: 42
user?: string; // 示例: "dev_2001"
timeout_ms?: number; // 示例: 15000(服务端可见字段,按提供信息原样传)
retry?: RetryPolicy; // 放入 body(按提供信息原样传)
};
type StreamDelta = {
delta?: string;
finish_reason?: string; // "stop" 时结束
};
const ENDPOINT = "https://api.codegenx.example/v1/code/generate";
type GenerateOptions = {
token: string; // Authorization: Bearer <token>
requestId?: string; // 可选 X-Request-Id
outFile: string; // 输出文件路径
body: GenerateBody; // 上述 body
// 客户端侧控制(与 body.timeout_ms/retry 相互独立,二者都会生效):
clientTimeoutMs?: number; // 客户端超时,使用 AbortController
clientRetry?: RetryPolicy; // 客户端重试策略(指数退避+抖动)
};
function sleep(ms: number) {
return new Promise((r) => setTimeout(r, ms));
}
function backoffDelay(base: number, attemptIndex: number, jitter: boolean) {
// attemptIndex 从 0 开始:0->base, 1->base*2, 2->base*4 ...
const baseDelay = base * Math.pow(2, attemptIndex);
if (!jitter) return baseDelay;
const rand = Math.random() * baseDelay;
return Math.floor(baseDelay / 2 + rand / 2); // 简单抖动:±50%
}
// 将增量安全拼接,处理重连导致的重复前缀
function appendWithOverlapDedup(acc: string, delta: string): string {
if (!delta) return acc;
if (acc.endsWith(delta)) return acc;
const maxOverlap = Math.min(acc.length, delta.length);
let overlap = 0;
for (let n = maxOverlap; n > 0; n--) {
if (acc.slice(-n) === delta.slice(0, n)) {
overlap = n;
break;
}
}
return acc + delta.slice(overlap);
}
// 基于 stop 规则截断:返回 [截断后的文本, 是否命中stop]
function applyStopRules(text: string, stops?: string[]): [string, boolean] {
if (!stops || stops.length === 0) return [text, false];
let cut = -1;
for (const s of stops) {
const i = text.indexOf(s);
if (i !== -1) cut = cut === -1 ? i : Math.min(cut, i);
}
if (cut !== -1) return [text.slice(0, cut), true];
return [text, false];
}
// 解析 SSE: 将 response 的 text/event-stream 流拆分为事件,提取 data: 行
async function consumeSSE(
res: Response,
onData: (payload: string) => void,
onFinish: (reason: string | null, requestId: string | null) => void
) {
const reader = res.body?.getReader();
if (!reader) throw new Error("ReadableStream not available");
const decoder = new TextDecoder("utf-8");
let buffer = "";
let finished = false;
const requestId = res.headers.get("x-request-id"); // 记录服务端回传的 Request-Id
while (!finished) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// SSE 事件用空行分隔
let idx: number;
while ((idx = buffer.indexOf("\n\n")) !== -1) {
const rawEvent = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2);
// 只拼接 data: 行
const dataLines = rawEvent
.split("\n")
.map((l) => l.trim())
.filter((l) => l.startsWith("data:"))
.map((l) => l.slice(5).trim());
if (dataLines.length === 0) continue;
const dataPayload = dataLines.join("\n");
if (dataPayload === "[DONE]") {
finished = true;
onFinish(null, requestId);
break;
}
try {
const json: StreamDelta = JSON.parse(dataPayload);
if (typeof json.delta === "string") {
onData(json.delta);
}
if (json.finish_reason === "stop") {
finished = true;
onFinish("stop", requestId);
break;
}
} catch {
// 若非 JSON(不符合协议),作为原始文本增量尝试处理
onData(dataPayload);
}
}
}
if (!finished) onFinish(null, requestId);
}
// 以 SSE 方式请求并将代码写入文件(含客户端超时/重试、网络中断重连、stop 规则)
export async function generateCodeToFile(opts: GenerateOptions) {
const {
token,
requestId,
outFile,
body,
clientTimeoutMs = body.timeout_ms ?? 15000, // 客户端超时;若 body 里也传了 timeout_ms,这里仅作为客户端控制
clientRetry = body.retry ?? { max_attempts: 3, backoff_ms: 500, jitter: true },
} = opts;
// 强制流式
const reqBody: GenerateBody = { ...body, stream: true };
let attempt = 0;
let code = "";
let stopHit = false;
let lastServerRequestId: string | null = null;
while (attempt < Math.max(1, clientRetry.max_attempts)) {
const controller = new AbortController();
const t = setTimeout(() => controller.abort(), clientTimeoutMs);
try {
const res = await fetch(ENDPOINT, {
method: "POST",
signal: controller.signal,
headers: {
"Authorization": `Bearer ${token}`,
"Content-Type": "application/json",
...(requestId ? { "X-Request-Id": requestId } : {}),
},
body: JSON.stringify(reqBody),
});
if (!res.ok) {
// 仅在 5xx 上重试;其他状态直接抛出(可按需调整)
const text = await res.text().catch(() => "");
if (res.status >= 500 && res.status <= 599) {
throw new Error(`Server ${res.status}: ${text}`);
} else {
throw new Error(`HTTP ${res.status}: ${text}`);
}
}
// 开始消费 SSE
await consumeSSE(
res,
(delta) => {
// 重连时可能重复,做去重拼接
code = appendWithOverlapDedup(code, delta);
// 按 stop 规则截断(命中后终止后续处理)
const [trunc, hit] = applyStopRules(code, reqBody.stop);
if (hit && !stopHit) {
stopHit = true;
code = trunc;
// 客户端主动不再处理后续片段:不需要额外动作,等 consumeSSE 结束或超时
// 如需更快停止,可调用 controller.abort() 取消
controller.abort();
}
},
(reason, reqId) => {
lastServerRequestId = reqId ?? lastServerRequestId;
}
);
clearTimeout(t);
// 成功完成(无异常抛出) -> 退出重试循环
break;
} catch (err) {
clearTimeout(t);
attempt += 1;
// 网络中断/超时/5xx:按策略重试,复用相同 user 与 seed(已在 reqBody 保持不变)
if (attempt >= Math.max(1, clientRetry.max_attempts)) {
// 记录最后一次 X-Request-Id(如果有)
if (lastServerRequestId) {
console.error(`Failed with X-Request-Id: ${lastServerRequestId}`);
}
throw err;
}
const delay = backoffDelay(clientRetry.backoff_ms, attempt - 1, clientRetry.jitter);
await sleep(delay);
// 继续下一次尝试(code 已累积,appendWithOverlapDedup 将尽量去重)
}
}
// 简单“格式化”:去除两端空白并以换行结束(外部格式化工具可自行接入)
const finalCode = code.trimEnd() + "\n";
await writeFile(outFile, finalCode, "utf-8");
return { outFile, bytes: Buffer.byteLength(finalCode, "utf-8"), requestId: lastServerRequestId ?? requestId ?? null };
}
// 使用示例
async function demo() {
const token = process.env.CODEGENX_TOKEN || "<token>";
const requestId = "req-" + Math.random().toString(36).slice(2);
await generateCodeToFile({
token,
requestId,
outFile: "lru.ts",
body: {
model: "code-medium",
prompt: "实现一个LRU缓存,含get/put与过期",
language: "typescript",
style: "oop",
temperature: 0.2,
max_tokens: 800,
stream: true,
stop: ["```", "\n\n#"],
seed: 42,
user: "dev_2001",
timeout_ms: 15000,
retry: { max_attempts: 3, backoff_ms: 500, jitter: true },
},
// 客户端控制(可与 body 中 timeout_ms/retry 并存)
clientTimeoutMs: 15000,
clientRetry: { max_attempts: 3, backoff_ms: 500, jitter: true },
});
console.log("Code generated to lru.ts");
}
// demo().catch(console.error);
非流式用法(仅参考,设置 stream: false,将一次性返回完整 JSON)
async function generateNonStream(token: string) {
const res = await fetch("https://api.codegenx.example/v1/code/generate", {
method: "POST",
headers: {
"Authorization": `Bearer ${token}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: "code-medium",
prompt: "实现一个LRU缓存,含get/put与过期",
language: "typescript",
temperature: 0.2,
max_tokens: 800,
stream: false,
seed: 42,
user: "dev_2001",
timeout_ms: 15000,
retry: { max_attempts: 3, backoff_ms: 500, jitter: true },
}),
});
const json = await res.json() as { code: string; usage?: { prompt_tokens: number; completion_tokens: number } };
await writeFile("lru.ts", (json.code ?? "").trimEnd() + "\n", "utf-8");
}
关键参数说明(基于你提供的信息)
备注
下面给出一个面向 Go 开发者的简明指南与示例,演示如何调用 SQLCraft REST API,把自然语言意图转为安全的只读 SQL,并支持非流式与流式增量输出、解析 reasoning 字段与安全拦截。
一、关键参数与最佳实践
二、Go 最小可用示例:非流式一次性生成与解析 reasoning
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
)
const Endpoint = "https://api.sqlcraft.example/v1/sql/generate"
type Constraints struct {
OnlySelect bool `json:"only_select"`
TableWhitelist []string `json:"table_whitelist"`
UseCTE bool `json:"use_cte"`
}
type GenerateRequest struct {
Dialect string `json:"dialect"`
Schema string `json:"schema"`
Intent string `json:"intent"`
Constraints Constraints `json:"constraints"`
Temperature float64 `json:"temperature"`
MaxTokens int `json:"max_tokens"`
Stream bool `json:"stream"`
SafetyCheck bool `json:"safety_check"`
AuditTag string `json:"audit_tag"`
TimeoutMS int `json:"timeout_ms"`
}
type GenerateResponse struct {
SQL string `json:"sql"`
Reasoning string `json:"reasoning"`
TokensUsed int `json:"tokens_used"`
Warnings []string `json:"warnings"`
}
// 仅演示用途的简单安全校验(依赖字符串扫描/正则易漏检,生产可用更健壮的 SQL 解析器)
func ValidateReadOnlySQL(sql string, whitelist []string) (ok bool, reasons []string) {
s := strings.ToLower(stripSQLComments(sql))
s = strings.TrimSpace(s)
if !(strings.HasPrefix(s, "select") || strings.HasPrefix(s, "with")) {
reasons = append(reasons, "仅允许 SELECT/CTE(SQL 应以 WITH 或 SELECT 开头)")
}
// 禁词(服务端已做安全检查,这里本地再兜底)
for _, kw := range []string{" drop ", " alter ", " truncate ", " update ", " insert "} {
if strings.Contains(" "+s+" ", kw) {
reasons = append(reasons, "检测到禁止关键字: "+strings.TrimSpace(kw))
}
}
// 白名单表限制(非常粗略的 from/join 扫描)
used := extractTableNames(s)
if len(used) > 0 && len(whitelist) > 0 {
allow := map[string]struct{}{}
for _, t := range whitelist {
allow[strings.ToLower(t)] = struct{}{}
}
var notAllowed []string
for _, t := range used {
if _, ok := allow[t]; !ok {
notAllowed = append(notAllowed, t)
}
}
if len(notAllowed) > 0 {
reasons = append(reasons, "包含非白名单表: "+strings.Join(notAllowed, ", "))
}
}
return len(reasons) == 0, reasons
}
func stripSQLComments(s string) string {
// 简单移除行注释与块注释(不覆盖所有情况)
out := s
// 行注释 --
lines := strings.Split(out, "\n")
for i, l := range lines {
if idx := strings.Index(l, "--"); idx >= 0 {
lines[i] = l[:idx]
}
}
out = strings.Join(lines, "\n")
// 块注释 /* */
for {
start := strings.Index(out, "/*")
if start < 0 {
break
}
end := strings.Index(out[start+2:], "*/")
if end < 0 {
out = out[:start]
break
}
out = out[:start] + out[start+2+end+2:]
}
return out
}
func extractTableNames(s string) []string {
// 非严格提取 from/join 后第一个标识符(忽略 schema/alias 细节)
var tables []string
tokens := strings.Fields(s)
for i := 0; i < len(tokens); i++ {
t := strings.ToLower(tokens[i])
if t == "from" || t == "join" {
if i+1 < len(tokens) {
raw := tokens[i+1]
// 去掉逗号/括号/引号
raw = strings.Trim(raw, ",()\"`")
// 如果是 schema.table 取表名部分
parts := strings.Split(raw, ".")
base := parts[len(parts)-1]
// 去掉别名语法中尾随的关键字
base = strings.Trim(base, ",")
base = strings.TrimSpace(base)
// 过滤子查询起始 "("
if base != "" && base != "(" && base != "select" {
tables = append(tables, strings.ToLower(base))
}
}
}
}
// 去重
seen := map[string]struct{}{}
var uniq []string
for _, t := range tables {
if _, ok := seen[t]; !ok {
seen[t] = struct{}{}
uniq = append(uniq, t)
}
}
return uniq
}
func GenerateSQL(ctx context.Context, token, traceID string, req GenerateRequest) (*GenerateResponse, error) {
buf, _ := json.Marshal(req)
httpClient := &http.Client{
Timeout: time.Duration(req.TimeoutMS) * time.Millisecond,
}
httpReq, _ := http.NewRequestWithContext(ctx, http.MethodPost, Endpoint, bytes.NewReader(buf))
httpReq.Header.Set("Authorization", "Bearer "+token)
httpReq.Header.Set("Content-Type", "application/json")
if traceID != "" {
httpReq.Header.Set("X-Trace-Id", traceID)
}
resp, err := httpClient.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
}
var out GenerateResponse
if err := json.Unmarshal(body, &out); err != nil {
return nil, err
}
return &out, nil
}
func main() {
token := os.Getenv("SQLCRAFT_TOKEN")
if token == "" {
fmt.Println("please set SQLCRAFT_TOKEN")
return
}
req := GenerateRequest{
Dialect: "postgres",
Schema: "orders(id uuid, user_id uuid, status text, amount int, created_at timestamp); users(id uuid, email text)",
Intent: "统计近30天已支付订单金额与笔数,按天汇总",
Constraints: Constraints{
OnlySelect: true,
TableWhitelist: []string{"orders", "users"},
UseCTE: true,
},
Temperature: 0.1,
MaxTokens: 512,
Stream: false,
SafetyCheck: true,
AuditTag: "rpt_monthly",
TimeoutMS: 12000,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(req.TimeoutMS)*time.Millisecond)
defer cancel()
out, err := GenerateSQL(ctx, token, "trace-abc-001", req)
if err != nil {
fmt.Println("generate error:", err)
return
}
// 解析 sql 与 reasoning 字段
fmt.Println("reasoning:", out.Reasoning)
fmt.Println("warnings:", out.Warnings)
// 本地安全拦截与建议
if ok, reasons := ValidateReadOnlySQL(out.SQL, req.Constraints.TableWhitelist); !ok {
fmt.Println("SQL 安全校验未通过:")
for _, r := range reasons {
fmt.Println(" -", r)
}
fmt.Println("替代建议:")
for _, s := range SuggestSafer(out.SQL, req) {
fmt.Println(" -", s)
}
return
}
fmt.Println("generated SQL:")
fmt.Println(out.SQL)
}
// 生成替代建议(基于既定约束与期望范式)
func SuggestSafer(sql string, req GenerateRequest) []string {
var tips []string
tips = append(tips,
"仅使用 SELECT/CTE:以 WITH 或 SELECT 开头,移除任何 DDL/DML 关键字(drop/alter/truncate/update/insert)",
fmt.Sprintf("限制表到白名单: %v(必要的 JOIN 仅在此范围内)", req.Constraints.TableWhitelist),
"启用 constraints.only_select=true、constraints.use_cte=true(已在示例中开启)",
"需要日期维度时,使用 CTE 构造日期序列,对 orders 按天聚合,最后可 LEFT JOIN users 以补充邮箱字段",
)
return tips
}
三、Go 流式示例:接收增量 sql 片段(SSE)
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
)
type StreamDelta struct {
SQLDelta string `json:"sql_delta"`
FinishReason string `json:"finish_reason"`
}
func GenerateSQLStream(ctx context.Context, token, traceID string, req GenerateRequest, onDelta func(fragment string)) (finalSQL string, err error) {
req.Stream = true
payload, _ := json.Marshal(req)
httpClient := &http.Client{
Timeout: time.Duration(req.TimeoutMS) * time.Millisecond,
}
r, _ := http.NewRequestWithContext(ctx, http.MethodPost, Endpoint, bytes.NewReader(payload))
r.Header.Set("Authorization", "Bearer "+token)
r.Header.Set("Content-Type", "application/json")
r.Header.Set("Accept", "text/event-stream")
if traceID != "" {
r.Header.Set("X-Trace-Id", traceID)
}
resp, err := httpClient.Do(r)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("HTTP %d", resp.StatusCode)
}
sc := bufio.NewScanner(resp.Body)
for sc.Scan() {
line := sc.Text()
// SSE: 只处理以 "data:" 开头的行
if !strings.HasPrefix(line, "data:") {
continue
}
data := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
if data == "" {
continue
}
var delta StreamDelta
if err := json.Unmarshal([]byte(data), &delta); err != nil {
continue
}
if delta.SQLDelta != "" {
onDelta(delta.SQLDelta)
finalSQL += delta.SQLDelta
}
if delta.FinishReason == "stop" {
break
}
}
return finalSQL, sc.Err()
}
使用方式示例:
// ...
req.Stream = true
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(req.TimeoutMS)*time.Millisecond)
defer cancel()
var built strings.Builder
finalSQL, err := GenerateSQLStream(ctx, token, "trace-abc-002", req, func(fragment string) {
// 这里可将 fragment 增量展示到前端
built.WriteString(fragment)
})
if err != nil {
fmt.Println("stream error:", err)
return
}
// 结束后进行安全校验
if ok, reasons := ValidateReadOnlySQL(finalSQL, req.Constraints.TableWhitelist); !ok {
fmt.Println("SQL 流式结果安全校验未通过:", reasons)
for _, s := range SuggestSafer(finalSQL, req) {
fmt.Println(" -", s)
}
return
}
fmt.Println("final SQL:", finalSQL)
四、示例生成期望(样式参考,用于测试验证)
五、错误处理与重试建议
六、小结
帮助开发者快速掌握目标API或库的使用方法,通过清晰的描述和代码示例,降低学习门槛,提升开发效率。
通过自动生成的代码示例与参数说明,快速理解API或库的基本功能,提升学习和实操能力。
在复杂项目中快速找到高效解决方案,降低技术文档查阅时间,更专注于核心业务逻辑开发。
借助自动生成的内容,快速编写清晰专业的API使用文档,提升文档撰写效率与质量。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期