热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词为数据处理领域的调试专家设计,能够帮助用户系统识别数据相关编程错误,提供分步骤调试指南、可执行代码示例和测试验证方法。输出内容结构清晰、操作性强,并结合参与式反馈(如等级、积分),帮助用户快速定位问题并提升解决能力。
import os, io, csv, re, random
from decimal import Decimal, InvalidOperation
import numpy as np
import pandas as pd
from dateutil import parser, tz
# ===== 配置 =====
CONFIG = {
"expected_cols": 7, # 头部列数;可留None自动取header
"local_tz": "Asia/Shanghai", # 本地时区
"user_id_col": "user_id", # 如果列名不同,请修改
"timestamp_col": None, # None时自动检测
"amount_col": None, # None时自动检测
"dedupe_subset": None # 例如 ["user_id", "timestamp_utc"];None则全列去重
}
# ===== 1) 编码嗅探与统一(逐行解码,UTF-8优先,失败退GBK;仍失败->隔离) =====
def unify_encoding(src_path, utf8_path, quarantine_path):
total, good, bad = 0, 0, 0
with open(src_path, "rb") as f_in, \
open(utf8_path, "w", encoding="utf-8", newline="") as f_out, \
open(quarantine_path, "w", encoding="utf-8", newline="") as f_bad:
for raw in f_in:
total += 1
for enc in ("utf-8-sig", "gbk"):
try:
line = raw.decode(enc)
# 去除BOM
line = line.replace("\ufeff", "")
f_out.write(line)
good += 1
break
except UnicodeDecodeError:
continue
else:
# 两种编码都失败,隔离
try:
# 尝试二进制转义保留信息
safe = raw.decode("utf-8", errors="replace")
except Exception:
safe = str(raw)
f_bad.write(safe)
bad += 1
return {"total_lines": total, "encoded_ok": good, "encoded_bad": bad}
# ===== 2) 分隔符统一与坏行隔离(逗号/分号自动切换;统一写成逗号) =====
def parse_with_delim(line, delim):
buf = io.StringIO(line)
reader = csv.reader(buf, delimiter=delim, quotechar='"', escapechar="\\")
return next(reader)
def unify_delimiters(utf8_path, unified_path, quarantine_path, expected_cols=None):
with open(utf8_path, "r", encoding="utf-8", newline="") as f_in, \
open(unified_path, "w", encoding="utf-8", newline="") as f_out, \
open(quarantine_path, "a", encoding="utf-8", newline="") as f_bad: # 追加写入
writer = csv.writer(f_out, delimiter=",", quotechar='"', escapechar="\\")
# 读header,确定主分隔符和列数
header_line = f_in.readline()
if not header_line:
return {"rows_ok": 0, "rows_bad": 0, "expected_cols": 0}
try:
h_comma = parse_with_delim(header_line, ",")
except Exception:
h_comma = []
try:
h_semi = parse_with_delim(header_line, ";")
except Exception:
h_semi = []
header = h_comma if len(h_comma) >= len(h_semi) else h_semi
main_delim = "," if header is h_comma else ";"
exp_cols = expected_cols or len(header)
writer.writerow(header)
rows_ok, rows_bad = 0, 0
# 逐行处理
for line in f_in:
row = None
for delim in (main_delim, ";" if main_delim == "," else ","):
try:
fields = parse_with_delim(line, delim)
if len(fields) == exp_cols:
row = fields
break
except Exception:
continue
if row is None:
f_bad.write(line)
rows_bad += 1
continue
writer.writerow(row)
rows_ok += 1
return {"rows_ok": rows_ok, "rows_bad": rows_bad, "expected_cols": exp_cols}
# ===== 3) 自动检测候选列 =====
def detect_datetime_column(df):
candidates = [c for c in df.columns if re.search(r"(time|date)", c, flags=re.I)]
if not candidates:
candidates = list(df.select_dtypes(include=["object", "string"]).columns)
best_col, best_ratio = None, 0.0
for c in candidates:
s = df[c].dropna().astype(str).head(200)
ok = 0
for v in s:
try:
parser.parse(v, dayfirst=True, fuzzy=True)
ok += 1
except Exception:
pass
ratio = ok / max(1, len(s))
if ratio > best_ratio:
best_ratio, best_col = ratio, c
return best_col if best_ratio >= 0.6 else None
def detect_amount_column(df):
name_hits = [c for c in df.columns if re.search(r"(amount|price|total)", c, flags=re.I)]
if name_hits:
return name_hits[0]
# 回退:找含货币符号或小数的大概率金额列
for c in df.select_dtypes(include=["object", "string"]).columns:
s = df[c].dropna().astype(str).head(50)
hit = sum(bool(re.search(r"[$€£¥]|[A-Z]{3}|^\d+[.,]\d{2}$", v)) for v in s)
if hit / max(1, len(s)) >= 0.4:
return c
return None
# ===== 4) 日期解析(统一UTC,输出为带时区的datetime64[ns, UTC]) =====
LOCAL_TZ_CACHE = {}
def get_local_tz(name):
if name not in LOCAL_TZ_CACHE:
LOCAL_TZ_CACHE[name] = tz.gettz(name)
return LOCAL_TZ_CACHE[name]
def parse_datetime_utc(val, local_tz_name):
if pd.isna(val):
return pd.NaT
text = str(val)
try:
dt = parser.parse(text, dayfirst=True, fuzzy=True)
except Exception:
return pd.NaT
# 赋默认本地时区,再转UTC
local_zone = get_local_tz(local_tz_name)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=local_zone)
return pd.Timestamp(dt).tz_convert("UTC")
# ===== 5) 金额规范化与币种剥离 =====
SYM_TO_CODE = {"$": "USD", "€": "EUR", "£": "GBP", "¥": "CNY"}
EURO_PATTERN = re.compile(r"^\d{1,3}(\.\d{3})*,\d{2}$")
def normalize_amount_cell(val):
if pd.isna(val):
return (np.nan, None)
s = str(val).strip()
# 抽取币种
m_code = re.search(r"(?i)\b([A-Z]{3})\b", s)
currency = m_code.group(1).upper() if m_code else None
m_sym = re.search(r"[$€£¥]", s)
if currency is None and m_sym:
currency = SYM_TO_CODE.get(m_sym.group(0))
# 去除币种与符号
s = re.sub(r"(?i)\b[A-Z]{3}\b", "", s)
s = re.sub(r"[$€£¥]", "", s).strip()
# 欧式小数
if EURO_PATTERN.fullmatch(s):
num_str = s.replace(".", "").replace(",", ".")
else:
num_str = s.replace(",", "").replace(" ", "")
num_str = re.sub(r"[^\d\.\-]", "", num_str)
if num_str == "":
return (np.nan, currency)
try:
dec = Decimal(num_str)
except InvalidOperation:
return (np.nan, currency)
return (dec, currency)
# ===== 6) 主清洗流程 =====
def clean_events_csv(source_csv, output_clean_csv, quarantine_csv, config=CONFIG):
tmp_utf8 = source_csv + ".utf8.tmp"
tmp_unified = source_csv + ".unified.tmp"
stats_enc = unify_encoding(source_csv, tmp_utf8, quarantine_csv)
stats_sep = unify_delimiters(tmp_utf8, tmp_unified, quarantine_csv, expected_cols=config.get("expected_cols"))
# pandas惰性读取可用chunksize,但这里先一次读入(文件大时可改为chunksize处理)
df = pd.read_csv(
tmp_unified,
engine="python",
dtype={config["user_id_col"]: "string"},
keep_default_na=False,
na_values=["N/A", "-", ""],
)
# user_id 强制字符串 & 去空白
if config["user_id_col"] in df.columns:
df[config["user_id_col"]] = df[config["user_id_col"]].astype("string").str.strip()
# 自动检测列名
ts_col = config.get("timestamp_col") or detect_datetime_column(df)
amt_col = config.get("amount_col") or detect_amount_column(df)
# 日期解析 -> UTC
parse_ok = 0
if ts_col:
df["timestamp_utc"] = df[ts_col].map(lambda v: parse_datetime_utc(v, config["local_tz"]))
parse_ok = df["timestamp_utc"].notna().sum()
else:
df["timestamp_utc"] = pd.NaT
# 金额规范化与币种剥离
amt_ok = 0
if amt_col:
pairs = df[amt_col].map(normalize_amount_cell)
df["amount_dec"] = pairs.map(lambda x: x[0])
df["currency"] = pairs.map(lambda x: x[1])
# 转float避免后续计算问题;保留原Decimal列以检查
df["amount"] = df["amount_dec"].astype("float64")
amt_ok = df["amount"].notna().sum()
# 去重策略
before = len(df)
if config.get("dedupe_subset"):
df_clean = df.drop_duplicates(subset=config["dedupe_subset"], keep="first")
else:
df_clean = df.drop_duplicates(keep="first")
after = len(df_clean)
# 输出
df_clean.to_csv(output_clean_csv, index=False)
# 清理临时文件
for p in (tmp_utf8, tmp_unified):
try:
os.remove(p)
except Exception:
pass
metrics = {
"total_lines": stats_enc["total_lines"],
"encoded_ok": stats_enc["encoded_ok"],
"encoded_bad": stats_enc["encoded_bad"],
"rows_ok": stats_sep["rows_ok"],
"rows_bad": stats_sep["rows_bad"],
"expected_cols": stats_sep["expected_cols"],
"parse_time_ok": parse_ok,
"parse_amount_ok": amt_ok,
"rows_before": before,
"rows_after": after,
}
return df_clean, metrics
# ===== 7) 验证与打分 =====
def validate(df, metrics, config=CONFIG, sample_n=20, min_year=2000, max_year=2035, max_abs_amount=1e9):
report = {}
# 读入成功率
read_rate = metrics["rows_ok"] / max(1, metrics["rows_ok"] + metrics["rows_bad"])
report["read_rate"] = read_rate
# 日期解析成功率
if "timestamp_utc" in df.columns:
time_rate = df["timestamp_utc"].notna().mean()
report["time_parse_rate"] = time_rate
# 日期范围约束
dt_min = pd.Timestamp(f"{min_year}-01-01", tz="UTC")
dt_max = pd.Timestamp(f"{max_year}-12-31", tz="UTC")
in_range = df["timestamp_utc"].dropna().between(dt_min, dt_max).mean() if df["timestamp_utc"].notna().any() else np.nan
report["time_in_range_rate"] = in_range
# 金额约束
if "amount" in df.columns:
amt_valid = df["amount"].dropna().apply(lambda x: abs(x) <= max_abs_amount).mean() if df["amount"].notna().any() else np.nan
report["amount_valid_rate"] = amt_valid
# 重复率统计
dupe_dropped = metrics["rows_before"] - metrics["rows_after"]
dupe_rate = dupe_dropped / max(1, metrics["rows_before"])
report["dedupe_drop_rate"] = dupe_rate
# 随机抽样检查
sample = df.sample(min(sample_n, len(df)), random_state=42) if len(df) else df
sample_ok = {
"timestamp_non_null": int(sample["timestamp_utc"].notna().sum()) if "timestamp_utc" in df.columns else 0,
"amount_non_null": int(sample["amount"].notna().sum()) if "amount" in df.columns else 0,
}
report["sample_ok"] = sample_ok
# 简单断言(不抛错,只记录)
report["assertions"] = {
"no_int_user_id": df[CONFIG["user_id_col"]].dtype.name == "string",
}
# 交互式打分
# 成功读入率、日期解析成功率、去重消除率各占权重
score = 0
score += int(round(read_rate * 4)) # 0-4分
score += int(round(report.get("time_parse_rate", 0) * 3)) # 0-3分
score += int(round(dupe_rate * 3)) # 0-3分
report["score"] = score
return report
if __name__ == "__main__":
# 示例调用:修改路径与配置后运行
src = "user_events.csv"
out = "user_events_clean.csv"
quarantine = "quarantine.csv"
CONFIG.update({
"expected_cols": 7,
"local_tz": "Asia/Shanghai",
"user_id_col": "user_id",
"timestamp_col": None, # 自动检测
"amount_col": None, # 自动检测
"dedupe_subset": None # 或例如 ["user_id", "timestamp_utc"]
})
df_clean, metrics = clean_events_csv(src, out, quarantine, CONFIG)
report = validate(df_clean, metrics, CONFIG)
print("Metrics:", metrics)
print("Report:", report)
df_clean, metrics = clean_events_csv("user_events.csv", "user_events_clean.csv", "quarantine.csv", CONFIG)
report = validate(df_clean, metrics, CONFIG)
print(f"成功读入率: {report['read_rate']:.2%}")
print(f"日期解析成功率: {report.get('time_parse_rate', 0):.2%}")
print(f"日期范围命中率: {report.get('time_in_range_rate', np.nan)}")
print(f"金额有效率: {report.get('amount_valid_rate', np.nan)}")
print(f"去重消除率: {report['dedupe_drop_rate']:.2%}")
print(f"得分: {report['score']}/10")
加分建议 🎯
互动打分与积分规则 🕹️
任务完成!获得10分。 当前等级:初级调试员 下一等级:代码修复者(距离20分)
1️⃣ 错误识别 🔎
2️⃣ 调试步骤 🧭 A. 枚举并隔离非法日期
B. 检查跨时区月边界
C. 识别退款重复
D. 校验聚合维度
3️⃣ 解决方案实施 🛠 A. CTE 预清洗:日期解析与时区归一(统一到 UTC),并去重
WITH params AS ( SELECT 'Asia/Shanghai'::text AS local_tz ),
-- 1) 订单清洗(假设 order_date 为 text;若已为 timestamp,简化为 AT TIME ZONE 'UTC') orders_clean AS ( SELECT o.id, o.user_id, o.total::numeric AS total, o.currency, -- 规范化分隔符 regexp_replace(trim(o.order_date::text), '[./]', '-', 'g') AS s FROM orders o ), orders_valid AS ( SELECT id, user_id, total, currency, -- 基于解析后的文本构造 UTC timestamptz(订单时间已为 UTC 语义) make_timestamptz( split_part(s,'-',1)::int, split_part(s,'-',2)::int, split_part(split_part(s,' ',1),'-',3)::int, split_part(coalesce(split_part(s,' ',2),'00:00:00'),':',1)::int, split_part(coalesce(split_part(s,' ',2),'00:00:00'),':',2)::int, split_part(coalesce(split_part(s,' ',2),'00:00:00'),':',3)::int, 'UTC' ) AS order_ts_utc FROM orders_clean WHERE s ~ '^\d{4}-\d{2}-\d{2}(\s\d{2}:\d{2}(:\d{2})?)?$' AND split_part(s,'-',2)::int BETWEEN 1 AND 12 AND split_part(split_part(s,' ',1),'-',3)::int BETWEEN 1 AND EXTRACT(day FROM (date_trunc('month', make_date( split_part(s,'-',1)::int, split_part(s,'-',2)::int, 1 )) + interval '1 month - 1 day')) ), orders_dedup AS ( -- 以主键去重;若存在重复 id,选最新时间 SELECT DISTINCT ON (id) id, user_id, total, currency, order_ts_utc FROM orders_valid ORDER BY id, order_ts_utc DESC ),
-- 2) 退款清洗(本地时区解释,再转 UTC) refunds_clean AS ( SELECT r.order_id, r.amount::numeric AS amount, regexp_replace(trim(r.refund_date::text), '[./]', '-', 'g') AS s, (SELECT local_tz FROM params) AS local_tz FROM refunds r ), refunds_valid AS ( SELECT order_id, amount, make_timestamptz( split_part(s,'-',1)::int, split_part(s,'-',2)::int, split_part(split_part(s,' ',1),'-',3)::int, split_part(coalesce(split_part(s,' ',2),'00:00:00'),':',1)::int, split_part(coalesce(split_part(s,' ',2),'00:00:00'),':',2)::int, split_part(coalesce(split_part(s,' ',2),'00:00:00'),':',3)::int, local_tz ) AS refund_ts_utc FROM refunds_clean WHERE s ~ '^\d{4}-\d{2}-\d{2}(\s\d{2}:\d{2}(:\d{2})?)?$' AND split_part(s,'-',2)::int BETWEEN 1 AND 12 AND split_part(split_part(s,' ',1),'-',3)::int BETWEEN 1 AND EXTRACT(day FROM (date_trunc('month', make_date( split_part(s,'-',1)::int, split_part(s,'-',2)::int, 1 )) + interval '1 month - 1 day')) ), refunds_mark AS ( -- 反连接去重:同 (order_id, refund_ts_utc, amount) 保留首行 SELECT order_id, amount, refund_ts_utc, ROW_NUMBER() OVER ( PARTITION BY order_id, refund_ts_utc, amount ORDER BY order_id ) AS rn FROM refunds_valid ), refunds_dedup AS ( SELECT order_id, amount, refund_ts_utc FROM refunds_mark WHERE rn = 1 ),
-- 3) 聚合维度统一为 UTC 月 monthly_orders AS ( SELECT DATE_TRUNC('month', order_ts_utc) AS month_utc, currency, SUM(total) AS order_total FROM orders_dedup GROUP BY DATE_TRUNC('month', order_ts_utc), currency ), monthly_refunds AS ( -- 半连接:只关联一次拿到 currency,避免行数膨胀 SELECT DATE_TRUNC('month', rd.refund_ts_utc) AS month_utc, o.currency, SUM(rd.amount) AS refund_total FROM refunds_dedup rd INNER JOIN orders_dedup o ON o.id = rd.order_id GROUP BY DATE_TRUNC('month', rd.refund_ts_utc), o.currency ),
-- 4) 净额 net_revenue AS ( SELECT mo.month_utc, mo.currency, mo.order_total, COALESCE(mr.refund_total, 0) AS refund_total, mo.order_total - COALESCE(mr.refund_total, 0) AS net_total FROM monthly_orders mo LEFT JOIN monthly_refunds mr ON mr.month_utc = mo.month_utc AND mr.currency = mo.currency ) SELECT * FROM net_revenue ORDER BY month_utc, currency;
B. 索引与约束建议
行动描述:提高聚合与连接性能,防止重复插入
适用代码片段: -- 如果能迁移到标准类型(强烈建议): -- orders(order_ts_utc timestamptz), refunds(refund_ts_utc timestamptz) CREATE INDEX idx_orders_ts_currency ON orders (order_ts_utc, currency); CREATE INDEX idx_refunds_ts_order ON refunds (refund_ts_utc, order_id);
-- 防重复(如业务允许同键唯一) CREATE UNIQUE INDEX CONCURRENTLY u_refunds_dedupe ON refunds (order_id, refund_date, amount);
预期结果:
C. EXPLAIN 分析
4️⃣ 测试与验证 ✅ A. 样例数据与期望值断言
行动描述:插入边界与重复样例,验证净额
适用代码片段: -- 样例(请在测试库执行) INSERT INTO orders(id, user_id, order_date, total, currency) VALUES (1, 101, '2024-10-31 23:30', 100, 'USD'), -- UTC (2, 102, '2024/11/01', 50, 'USD'), (3, 103, '2024/11/31', 70, 'USD'); -- 非法,将被剔除
INSERT INTO refunds(order_id, refund_date, amount) VALUES (1, '2024-11-01 08:00', 30), -- 本地 Asia/Shanghai,UTC 为 2024-11-01 00:00 (1, '2024-11-01 08:00', 30); -- 重复
预期结果:
B. 边界月与跨月订单检查
C. 重复键计数前后对比
D. 正确性断言
交互式评分 🎯
可迭代建议
任务完成!获得10分。 当前等级:初级调试员 下一等级:代码修复者(距离20分)
// 反例:text 可能为 undefined,直接 .match 触发 TypeError
const phrases = item.text.match(/[^\s.!?]+/g);
// 1) 轻量 schema 校验与安全访问
import { z } from "zod";
const ItemSchema = z.object({
id: z.union([z.string(), z.number()]).optional(),
text: z.string().optional(),
metadata: z.object({ lang: z.string().optional() }).optional(),
});
const PageSchema = z.object({
items: z.array(ItemSchema),
nextCursor: z.string().nullable().optional(),
});
// 2) 字段分布日志与缺失统计
function logItemShape(items: unknown[]) {
let missingText = 0, hasHTML = 0;
for (const it of items) {
const parsed = ItemSchema.safeParse(it);
if (!parsed.success || !parsed.data?.text) missingText++;
else if (/<[^>]+>/.test(parsed.data.text)) hasHTML++;
}
console.log({ missingText, hasHTML, total: items.length });
}
// 3) 预编译正则与可选链
const RE = {
TAG: /<[^>]*>/g, // 去 HTML 标签
ZERO_WIDTH: /[\u200B-\u200D\uFEFF]/g, // 零宽字符
CONTROL: /[\p{Cc}\p{Cf}]/gu, // 控制与格式字符
EMOJI: /\p{Extended_Pictographic}/gu, // 表情符号
MULTI_WS: /\s{2,}/g, // 多重空白
SPLIT: /[.!?;:,、。!?;]+/u // 句边界
};
// 4) 简单采样:执行时长与内存
function samplePerf(label: string) {
const start = performance.now();
const memStart = process.memoryUsage().heapUsed;
return () => {
const ms = performance.now() - start;
const memDelta = process.memoryUsage().heapUsed - memStart;
console.log(`${label}: ${ms.toFixed(1)}ms, Δheap=${(memDelta/1024/1024).toFixed(2)}MB`);
};
}
// 5) 模拟分页重复游标
function makeFakePage(cursor?: string) {
if (!cursor) return { items: [{ text: "A" }], nextCursor: "c2" };
if (cursor === "c2") return { items: [{ text: "B" }], nextCursor: null };
return { items: [], nextCursor: null };
}
// 可执行示例:Node.js ≥18,TypeScript
// npm i zod
// 运行:ts-node index.ts
import { z } from "zod";
type Item = z.infer<typeof ItemSchema>;
type Page = z.infer<typeof PageSchema>;
const RE = {
TAG: /<[^>]*>/g,
ZERO_WIDTH: /[\u200B-\u200D\uFEFF]/g,
CONTROL: /[\p{Cc}\p{Cf}]/gu,
EMOJI: /\p{Extended_Pictographic}/gu,
MULTI_WS: /\s{2,}/g,
SPLIT: /[.!?;:,、。!?;]+/u,
};
function htmlStrip(s: string): string {
// 简易去标签;实体仅做部分解码以控复杂度
return s
.replace(RE.TAG, " ")
.replace(/ /g, " ")
.replace(/&/g, "&")
.replace(/</g, "<")
.replace(/>/g, ">");
}
function normalizeText(raw: string, lang?: string): string {
// NFC 归一、去标签、去控制/零宽/表情、统一空白
const nfc = raw.normalize("NFC");
const stripped = htmlStrip(nfc);
const noZW = stripped.replace(RE.ZERO_WIDTH, "");
const noCtrl = noZW.replace(RE.CONTROL, "");
const noEmoji = noCtrl.replace(RE.EMOJI, ""); // 如需保留表情,跳过此步
const sp = noEmoji.replace(RE.MULTI_WS, " ").trim();
// 根据语言选择是否大小写归一(示例:英语小写,中文不变)
const lower = lang && /^en\b/i.test(lang) ? sp.toLowerCase() : sp;
return lower;
}
function extractPhrases(text: string, lang?: string): string[] {
// 以句界分割,再按词界简单裁剪,避免重度正则
const sentences = text.split(RE.SPLIT).map(s => s.trim()).filter(Boolean);
const phrases: string[] = [];
for (const s of sentences) {
// 进一步按空白切分并重组短语(2-10词),避免超长
const tokens = s.split(/\s+/).filter(Boolean);
if (tokens.length === 0) continue;
// 简化:整句作为短语,但限制最大长度
const maxLen = 15; // 字符长度限制以控内存
const p = s.length > maxLen ? s.slice(0, maxLen).trim() : s;
phrases.push(p);
}
return phrases;
}
// 幂等游标分页:避免最后一页重复抓取;游标与 item.id 双重去重
async function* fetchPages(baseUrl: string, startCursor?: string): AsyncGenerator<Item[]> {
const seenCursors = new Set<string | null>();
let cursor: string | undefined = startCursor;
while (true) {
const url = new URL(baseUrl);
if (cursor) url.searchParams.set("cursor", cursor);
const res = await fetch(url, { headers: { "accept": "application/json" } });
if (!res.ok) throw new Error(`HTTP ${res.status}`);
const json = await res.json();
const parsed = PageSchema.safeParse(json);
if (!parsed.success) throw new Error("Page schema invalid");
const page = parsed.data;
const nextCursor = page.nextCursor ?? null;
// 幂等:若游标重复或为 null,终止
if (seenCursors.has(nextCursor)) break;
seenCursors.add(nextCursor);
yield page.items;
if (nextCursor === null) break;
cursor = nextCursor || undefined;
}
}
// 处理管道:流式、去重(短语级)
async function* phrasePipeline(baseUrl: string, startCursor?: string): AsyncGenerator<string> {
const seenPhrase = new Set<string>();
for await (const items of fetchPages(baseUrl, startCursor)) {
for (const it of items) {
const raw = it?.text ?? ""; // 可选链 + 默认值
if (!raw) continue;
const lang = it?.metadata?.lang;
const norm = normalizeText(raw, lang);
if (!norm) continue;
for (const p of extractPhrases(norm, lang)) {
const key = p; // 归一后的短语作为键
if (key && !seenPhrase.has(key)) {
seenPhrase.add(key);
yield key;
}
}
}
}
}
// 使用示例
async function main() {
const stop = samplePerf("pipeline");
const baseUrl = "https://api.example.com/events";
const out: string[] = [];
for await (const phrase of phrasePipeline(baseUrl)) {
out.push(phrase);
// 可替换为写文件/入库
}
console.log(`phrases=${out.length}`);
stop();
}
// 启动
if (require.main === module) {
main().catch(err => {
console.error(err);
process.exit(1);
});
}
// 简易性能采样
function samplePerf(label: string) {
const start = performance.now();
const m0 = process.memoryUsage().heapUsed;
return () => {
const ms = performance.now() - start;
const m1 = process.memoryUsage().heapUsed;
console.log(`${label}: ${ms.toFixed(1)}ms, heap=${(m1/1024/1024).toFixed(2)}MB, Δ=${((m1-m0)/1024/1024).toFixed(2)}MB`);
};
}
// __tests__/normalize.test.ts
import { normalizeText } from "../index";
test("removes zero-width, controls, emojis, tags", () => {
const raw = "<b>Hello </b>\u200B\uFEFF😀 world\t";
const out = normalizeText(raw, "en");
expect(out).toBe("hello world"); // 英文小写、清洗完成
expect(/[\u200B-\u200D\uFEFF]/.test(out)).toBe(false);
expect(/\p{Extended_Pictographic}/u.test(out)).toBe(false);
expect(/<[^>]*>/.test(out)).toBe(false);
});
// __tests__/phrases.test.ts
import { extractPhrases } from "../index";
test("splits sentences by multilingual punctuation", () => {
const out = extractPhrases("点击按钮。Submit form! 完成支付?");
expect(out.length).toBeGreaterThanOrEqual(3);
});
// __tests__/paging.test.ts
import { PageSchema } from "../index";
test("page schema validates items with optional fields", () => {
const ok = PageSchema.safeParse({ items: [{ text: "x" }, { metadata: { lang: "zh" } }], nextCursor: null });
expect(ok.success).toBe(true);
});
// 属性测试(fast-check)
// npm i -D fast-check jest @types/jest ts-node
import fc from "fast-check";
import { normalizeText } from "../index";
test("normalize is total and removes control chars", () => {
fc.assert(
fc.property(
fc.string(), fc.string(),
(raw, lang) => {
const out = normalizeText(raw, lang);
// 不抛错且无控制/零宽
expect(out).not.toBeUndefined();
expect(/[\p{Cc}\p{Cf}]/u.test(out)).toBe(false);
// 长度不剧增(避免爆炸式扩张)
expect(out.length).toBeLessThanOrEqual(Math.max(raw.length, 1000));
}
),
{ verbose: true }
);
});
// 性能与内存基准(示例)
// __tests__/perf.test.ts
import { normalizeText, extractPhrases } from "../index";
test("perf baseline", () => {
const samples = Array.from({ length: 5000 }, (_, i) => `<p>Event ${i} 😀 点击按钮,提交表单。</p>`);
const t0 = performance.now();
const m0 = process.memoryUsage().heapUsed;
let count = 0;
for (const s of samples) {
const n = normalizeText(s);
count += extractPhrases(n).length;
}
const t1 = performance.now();
const m1 = process.memoryUsage().heapUsed;
const qps = samples.length / ((t1 - t0) / 1000);
const memDeltaMB = (m1 - m0) / 1024 / 1024;
console.log({ qps: Math.round(qps), memDeltaMB: memDeltaMB.toFixed(2), phrases: count });
expect(qps).toBeGreaterThan(8000); // 目标 QPS(示例阈值)
expect(memDeltaMB).toBeLessThan(30); // 内存增长上线(示例阈值)
});
// 分页幂等测试(模拟 fetch)
// __tests__/idempotent.test.ts
import { phrasePipeline } from "../index";
const pages = [
{ items: [{ text: "A" }], nextCursor: "c2" },
{ items: [{ text: "B" }], nextCursor: null },
{ items: [{ text: "B" }], nextCursor: null }, // 重复
];
let callIdx = 0;
// @ts-ignore
global.fetch = async () => ({ ok: true, json: async () => pages[Math.min(callIdx++, pages.length - 1)] });
test("no duplicate last page", async () => {
const phrases: string[] = [];
for await (const p of phrasePipeline("https://fake")) phrases.push(p);
expect(phrases).toEqual(["a", "b"]); // 英文小写归一
});
// 指标计算(示例)
type Metrics = {
parseSuccessRate: number; // 成功解析 / 总数
qps: number; // 每秒处理条数
peakMemMB: number; // 峰值内存 MB
};
function score(m: Metrics): number {
let pts = 0;
if (m.parseSuccessRate >= 0.99) pts += 4; else if (m.parseSuccessRate >= 0.97) pts += 2;
if (m.qps >= 8000) pts += 4; else if (m.qps >= 6000) pts += 2;
if (m.peakMemMB <= 256) pts += 2; else if (m.peakMemMB <= 512) pts += 1;
return pts; // 满分 10
}
任务完成!获得10分。 当前等级:初级调试员 下一等级:代码修复者(距离20分)
提供一个强大的解决方案,帮助用户轻松、高效地解决数据相关的编程错误问题。在用户遇到复杂数据处理错误时,通过应用该提示词,指导他们精确定位问题原因,同时提供步骤化调试指导及代码示例,让用户能快速修复问题并验证解决方案的有效性。这一提示词特别适合以数据处理为核心任务的编程开发者、分析师和工程师们,从而节省调试时间并提高工作效率。
帮助他们快速解决数据清理、转换或分析过程中遇到的编程错误,提升工作效率,专注于关键数据洞察。
支持数据管道构建和优化,快速修复数据传输及存储错误,确保数据完整性与流通顺畅。
诊断与解决模型开发和训练环节中的数据问题,减少因错误导致的时间浪费或性能下降。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期