热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词专为JavaScript开发场景设计,能够根据具体异步任务需求生成符合最佳实践的async/await函数代码。通过结构化参数输入,自动构建包含错误处理、性能优化和技术说明的高质量异步解决方案,适用于数据获取、文件操作、API调用等多种异步场景,帮助开发者快速实现可靠且易于维护的异步编程逻辑。
函数定义 async function fetchProducts(endpoint, { pageSize = 50, maxPages = 5, headers = {}, signal, onProgress } = {})
代码实现
// 简单的全局(进程内)ETag缓存,按URL键控
const ETagCache = new Map(); // url -> { etag: string, body: any }
/**
* 解析 Retry-After,返回等待毫秒数。如果不存在则按指数退避生成。
* attempt 从 1 开始计数(第一次重试的序号)
*/
function computeRetryDelayMs(headers, attempt, baseMs = 400) {
// 优先使用服务端提供的 Retry-After
const retryAfter = headers?.get?.('Retry-After');
if (retryAfter) {
// 可能是秒,或HTTP日期
const seconds = Number(retryAfter);
if (Number.isFinite(seconds) && seconds >= 0) {
return Math.max(0, seconds * 1000);
}
const dateMs = Date.parse(retryAfter);
if (!Number.isNaN(dateMs)) {
return Math.max(0, dateMs - Date.now());
}
}
// 指数退避 + 抖动:base * 2^(attempt-1) + [0, 250)
const exp = baseMs * Math.pow(2, attempt - 1);
const jitter = Math.floor(Math.random() * 250);
return exp + jitter;
}
/** 支持可取消的 sleep */
function sleep(ms, signal) {
return new Promise((resolve, reject) => {
if (signal?.aborted) return reject(signal.reason || new DOMException('Aborted', 'AbortError'));
const t = setTimeout(() => {
cleanup();
resolve();
}, ms);
const onAbort = () => {
cleanup();
reject(signal.reason || new DOMException('Aborted', 'AbortError'));
};
const cleanup = () => {
clearTimeout(t);
if (signal) signal.removeEventListener('abort', onAbort);
};
if (signal) signal.addEventListener('abort', onAbort, { once: true });
});
}
/** 为相对/绝对URL安全地添加/覆盖查询参数 */
function ensureQueryParam(url, key, value) {
const [path, query = ''] = url.split('?');
const sp = new URLSearchParams(query);
if (!sp.has(key)) sp.set(key, String(value));
return `${path}?${sp.toString()}`;
}
/** 从响应中解析 next 链接(优先 body.next,其次 Link: <...>; rel=next) */
function getNextUrl(body, headers) {
if (body && typeof body.next === 'string' && body.next) return body.next;
const link = headers?.get?.('Link');
if (link) {
const m = link.match(/<([^>]+)>\s*;\s*rel="?next"?/i);
if (m) return m[1];
}
return null;
}
/** 将原始产品数据规范化为 {id, name, price, updatedAt} */
function normalizeItem(raw) {
const id = raw?.id ?? raw?.productId ?? raw?._id ?? null;
const name = raw?.name ?? raw?.title ?? '';
const priceVal = typeof raw?.price === 'number' ? raw.price : parseFloat(raw?.price);
const updatedRaw = raw?.updatedAt ?? raw?.updated_at ?? raw?.modifiedAt ?? raw?.updated ?? raw?.modified ?? null;
let updatedISO;
if (updatedRaw) {
const d = new Date(updatedRaw);
updatedISO = Number.isFinite(d.getTime()) ? d.toISOString() : new Date(0).toISOString();
} else {
updatedISO = new Date(0).toISOString(); // 缺失时使用极早时间,排序置后
}
return {
id,
name,
price: Number.isFinite(priceVal) ? priceVal : null,
updatedAt: updatedISO,
};
}
/**
* 单请求封装:ETag缓存、8s超时、最多3次重试(网络错误/429/5xx)、支持 Retry-After。
* 返回 { body, status, headers, fromCache, retries }
*/
async function fetchWithRetry(url, { headers = {}, signal, timeoutMs = 8000, maxRetries = 3 }) {
let attempt = 0; // 含初次请求
let lastErr = null;
while (attempt <= maxRetries) {
const controller = new AbortController();
const composedSignal = controller.signal;
// 绑定外部 signal -> 内部
const onAbort = () => controller.abort(signal.reason || new DOMException('Aborted', 'AbortError'));
if (signal) {
if (signal.aborted) {
throw signal.reason || new DOMException('Aborted', 'AbortError');
}
signal.addEventListener('abort', onAbort, { once: true });
}
// 设置超时
const timeoutId = setTimeout(() => {
controller.abort(new DOMException('Timeout', 'TimeoutError'));
}, timeoutMs);
try {
attempt++;
const reqHeaders = new Headers(headers);
const cached = ETagCache.get(url);
if (cached?.etag) {
reqHeaders.set('If-None-Match', cached.etag);
}
const res = await fetch(url, { method: 'GET', headers: reqHeaders, signal: composedSignal });
const { status, headers: resHeaders } = res;
// 处理 304:使用缓存体
if (status === 304) {
clearTimeout(timeoutId);
if (signal) signal.removeEventListener('abort', onAbort);
if (cached?.body != null) {
return { body: cached.body, status, headers: resHeaders, fromCache: true, retries: attempt - 1 };
}
// 接收到304但无缓存体,视为异常(不重试 or 作为一次错误后按指数退避重试)
lastErr = new Error('304 received but cache body missing');
// 尝试下一次重试(极少见的状态不同步)
} else if (status >= 200 && status < 300) {
// 正常 2xx
const body = await res.json().catch(() => {
throw new Error('Failed to parse JSON');
});
const etag = resHeaders.get('ETag') || resHeaders.get('Etag') || resHeaders.get('etag');
if (etag) {
ETagCache.set(url, { etag, body });
}
clearTimeout(timeoutId);
if (signal) signal.removeEventListener('abort', onAbort);
return { body, status, headers: resHeaders, fromCache: false, retries: attempt - 1 };
} else if (status === 429 || (status >= 500 && status < 600)) {
// 需要重试的状态码
const delay = computeRetryDelayMs(resHeaders, attempt);
clearTimeout(timeoutId);
if (signal) signal.removeEventListener('abort', onAbort);
if (attempt > maxRetries) {
const text = await res.text().catch(() => '');
throw new Error(`Request failed after retries: ${status} ${text?.slice(0, 160)}`);
}
await sleep(delay, signal);
continue; // 下一次尝试
} else {
// 其他 4xx:不重试
const text = await res.text().catch(() => '');
clearTimeout(timeoutId);
if (signal) signal.removeEventListener('abort', onAbort);
throw new Error(`Request failed: ${status} ${text?.slice(0, 160)}`);
}
} catch (err) {
clearTimeout(timeoutId);
if (signal) signal.removeEventListener('abort', onAbort);
// 取消直接抛出
if (err?.name === 'AbortError') throw err;
lastErr = err;
if (attempt > maxRetries) {
throw lastErr;
}
const delay = computeRetryDelayMs(null, attempt);
await sleep(delay, signal);
continue;
}
}
// 理论上不会到达
throw lastErr || new Error('Unknown fetch error');
}
/**
* 主函数:分页抓取产品,重试/超时/并发/ETag缓存/标准化/去重/排序。
* 返回 { items, total, fromCache, durationMs, errors }
*/
async function fetchProducts(endpoint, { pageSize = 50, maxPages = 5, headers = {}, signal, onProgress } = {}) {
const start = (typeof performance !== 'undefined' && performance.now) ? performance.now() : Date.now();
// 并发上限
const MAX_CONCURRENCY = 3;
// 初始URL:注入 pageSize(若已存在则不改变)
let firstUrl = ensureQueryParam(endpoint, 'pageSize', pageSize);
// 结果汇总
const byId = new Map(); // id -> normalizedItem(保留最新updatedAt)
let anyFromCache = false;
let serverTotal = null;
const errors = [];
// 队列 + 并发 worker
const queue = [];
const visited = new Set(); // 防止意外循环
let pagesFetched = 0;
// 入队初始页
queue.push(firstUrl);
visited.add(firstUrl);
async function processPage(url, pageIndex) {
const resp = await fetchWithRetry(url, { headers, signal, timeoutMs: 8000, maxRetries: 3 });
const { body, status, headers: resHeaders, fromCache, retries } = resp;
anyFromCache = anyFromCache || !!fromCache;
// 解析产品列表
const list = body?.items ?? body?.products ?? body?.data ?? [];
const total = (typeof body?.total === 'number' && body.total >= 0) ? body.total : null;
if (serverTotal == null && total != null) serverTotal = total;
// 规范化 + 去重(按最新updatedAt保留)
for (const raw of list) {
const item = normalizeItem(raw);
if (item.id == null) continue; // 跳过无ID项
const prev = byId.get(item.id);
if (!prev) {
byId.set(item.id, item);
} else {
const prevTs = new Date(prev.updatedAt).getTime();
const currTs = new Date(item.updatedAt).getTime();
if (currTs >= prevTs) {
byId.set(item.id, item);
}
}
}
// 进度回调
if (typeof onProgress === 'function') {
try {
onProgress({
page: pageIndex,
url,
received: Array.isArray(list) ? list.length : 0,
total,
fromCache: !!fromCache,
retries,
status,
});
} catch {
// 保持稳健,忽略回调内部错误
}
}
// 解析下一页
const nextUrl = getNextUrl(body, resHeaders);
return { nextUrl };
}
// Worker:从队列取任务直到满足条件或被取消
async function worker(id) {
while (!signal?.aborted && queue.length > 0 && pagesFetched < maxPages) {
const url = queue.shift();
const pageIndex = pagesFetched + 1;
try {
const { nextUrl } = await processPage(url, pageIndex);
pagesFetched++;
if (nextUrl && !visited.has(nextUrl) && pagesFetched < maxPages) {
visited.add(nextUrl);
queue.push(nextUrl);
}
} catch (err) {
// 记录错误并继续处理队列中其他任务(线性分页通常不会再有后续)
errors.push({
url,
message: String(err?.message || err),
name: err?.name || 'Error',
});
// 失败页不增加 pagesFetched(因为没有成功处理该页)
// 不再入队下一页,因为无法获取 next 链接
}
}
}
// 启动并发 worker(在单链分页上并发自然退化为 1,但逻辑仍健壮)
const workers = Array.from({ length: Math.min(MAX_CONCURRENCY, queue.length) }, (_, i) => worker(i));
await Promise.all(workers);
// 汇总与排序(按 updatedAt 倒序)
const items = Array.from(byId.values()).sort(
(a, b) => new Date(b.updatedAt).getTime() - new Date(a.updatedAt).getTime()
);
const end = (typeof performance !== 'undefined' && performance.now) ? performance.now() : Date.now();
return {
items,
total: serverTotal != null ? serverTotal : items.length,
fromCache: anyFromCache,
durationMs: Math.round(end - start),
errors,
};
}
错误处理
使用示例
// 取消控制(可选)
const controller = new AbortController();
// 进度回调(可选)
function onProgress(evt) {
// evt: { page, url, received, total, fromCache, retries, status }
console.log(`Page #${evt.page} status=${evt.status} items=${evt.received} total=${evt.total ?? 'n/a'} cache=${evt.fromCache} retries=${evt.retries}`);
}
(async () => {
try {
const result = await fetchProducts('/products', {
pageSize: 100,
maxPages: 5,
headers: { Accept: 'application/json' },
signal: controller.signal,
onProgress,
});
console.log('Total:', result.total);
console.log('Duration(ms):', result.durationMs);
console.log('From cache:', result.fromCache);
console.log('Errors:', result.errors);
console.log('First item:', result.items[0]); // {id,name,price,updatedAt}
} catch (e) {
console.error('Fetch failed:', e);
}
})();
技术要点
async function scrapeArticles(urls, { concurrency = 5, timeoutMs = 6000, cacheTTL = 600000, signal } = {})
参数说明
返回值
注意:若传入的 AbortSignal 触发,将尽可能中止正在执行的请求;函数不会抛错而是把已完成和失败的请求通过 allSettled 汇总并返回(失败中会标记 Abort)。
// 模块级内存缓存(基于 TTL)。键为规范化 URL 或 canonical 规范化 URL。
const __articleCache = new Map(); // key -> { value, expiresAt }
/**
* 规范化 URL:
* - 只接受 http/https
* - 小写协议与主机名,移除 hash
* - 清理默认端口(:80 / :443)
* - 规范路径(去除多余斜杠与根路径尾部斜杠)
* - 搜索参数排序且移除常见追踪参数
*/
function normalizeUrl(input) {
let u;
try {
u = new URL(input);
} catch {
throw new Error(`Invalid URL: ${input}`);
}
if (!/^https?:$/.test(u.protocol)) {
throw new Error(`Unsupported protocol (only http/https): ${input}`);
}
u.protocol = u.protocol.toLowerCase();
u.hostname = u.hostname.toLowerCase();
// remove default ports
if ((u.protocol === 'http:' && u.port === '80') || (u.protocol === 'https:' && u.port === '443')) {
u.port = '';
}
// remove hash
u.hash = '';
// normalize path: collapse multiple slashes
u.pathname = u.pathname.replace(/\/{2,}/g, '/');
// remove trailing slash except root
if (u.pathname.length > 1 && u.pathname.endsWith('/')) {
u.pathname = u.pathname.slice(0, -1);
}
// clean & sort search params
const trackingParams = new Set([
'utm_source', 'utm_medium', 'utm_campaign', 'utm_term', 'utm_content',
'fbclid', 'gclid', 'yclid', 'mc_cid', 'mc_eid'
]);
const params = Array.from(u.searchParams.entries())
.filter(([k]) => !trackingParams.has(k.toLowerCase()))
.sort(([a], [b]) => a.localeCompare(b));
u.search = '';
for (const [k, v] of params) u.searchParams.append(k, v);
return u.toString();
}
/** 简易 HTML 实体解码(覆盖常见实体与数字实体) */
function decodeHtmlEntities(str) {
if (!str) return str;
const map = { amp: '&', lt: '<', gt: '>', quot: '"', apos: "'", nbsp: ' ' };
return str
.replace(/&(#\d+|#x[0-9a-fA-F]+|[a-zA-Z]+);/g, (m, t) => {
if (t[0] === '#') {
const code = t[1].toLowerCase() === 'x' ? parseInt(t.slice(2), 16) : parseInt(t.slice(1), 10);
if (!Number.isNaN(code)) return String.fromCodePoint(code);
return m;
}
return map[t] ?? m;
})
.trim();
}
/** 从标签中提取纯文本(粗略移除内部标签) */
function stripTags(html) {
return decodeHtmlEntities(
html
.replace(/<script[\s\S]*?<\/script>/gi, '')
.replace(/<style[\s\S]*?<\/style>/gi, '')
.replace(/<[^>]+>/g, ' ')
.replace(/\s+/g, ' ')
.trim()
);
}
/** 获取首个匹配的开始/结束标签内容 */
function getFirstTagInnerHTML(html, tag) {
const re = new RegExp(`<${tag}\\b[^>]*>([\\s\\S]*?)<\\/${tag}>`, 'i');
const m = html.match(re);
return m ? m[1] : null;
}
/** 解析 <title> 与 <h1> */
function extractTitle(html) {
const titleRaw = getFirstTagInnerHTML(html, 'title');
if (titleRaw) {
const t = stripTags(titleRaw);
if (t) return t;
}
const h1Raw = getFirstTagInnerHTML(html, 'h1');
if (h1Raw) {
const t = stripTags(h1Raw);
if (t) return t;
}
return null;
}
/** 解析 <link rel="canonical" href="...">(返回相对或绝对 URL 字符串) */
function extractCanonical(html) {
// 捕获所有 link 标签,解析 rel 和 href
const links = html.match(/<link\b[^>]*>/gi) || [];
for (const tag of links) {
// 解析属性
const attrs = {};
tag.replace(/(\w[\w:-]*)\s*=\s*("([^"]*)"|'([^']*)'|([^\s"'>]+))/g, (_, name, _v, v1, v2, v3) => {
attrs[name.toLowerCase()] = (v1 ?? v2 ?? v3 ?? '').trim();
return _;
});
const rel = (attrs.rel || '').toLowerCase();
if (rel.split(/\s+/).includes('canonical') && attrs.href) {
return attrs.href.trim();
}
}
return null;
}
/** 解析 meta[property="article:published_time"] 或 meta[name="article:published_time"] */
function extractPublishedTimeFromMeta(html) {
const metas = html.match(/<meta\b[^>]*>/gi) || [];
for (const tag of metas) {
const attrs = {};
tag.replace(/(\w[\w:-]*)\s*=\s*("([^"]*)"|'([^']*)'|([^\s"'>]+))/g, (_, name, _v, v1, v2, v3) => {
attrs[name.toLowerCase()] = (v1 ?? v2 ?? v3 ?? '').trim();
return _;
});
const prop = (attrs.property || attrs.name || '').toLowerCase();
if (prop === 'article:published_time' && attrs.content) {
return attrs.content.trim();
}
}
return null;
}
/** 从首个 <p> 文本中尽力提取日期时间(支持 ISO 与常见中文格式) */
function extractPublishedTimeFromFirstParagraph(html) {
const p = getFirstTagInnerHTML(html, 'p');
if (!p) return null;
const text = stripTags(p);
// 1) 直接尝试 Date.parse 可识别格式(包含 ISO/RFC)
const direct = Date.parse(text);
if (!Number.isNaN(direct)) return new Date(direct).toISOString();
// 2) 匹配 YYYY-MM-DD( HH:MM(:SS)?(Z|±HH:MM)?)
const isoLike = text.match(/(\d{4})-(\d{1,2})-(\d{1,2})(?:[ T](\d{1,2}):(\d{2})(?::(\d{2}))?(?:Z|([+-]\d{2}:?\d{2}))?)?/);
if (isoLike) {
const [ , Y, M, D, h='0', m='0', s='0', tz] = isoLike;
const base = new Date(Date.UTC(+Y, +M - 1, +D, +h, +m, +s));
if (tz && tz !== 'Z') {
// 处理时区偏移
const m2 = tz.match(/([+-]\d{2}):?(\d{2})/);
if (m2) {
const offsetMin = (parseInt(m2[1], 10) * 60) + parseInt(m2[2], 10);
// UTC = local - offset => 已按 UTC 构建了 base,无需额外处理
// 但 isoLike 在无 tz 时按 UTC 处理;有 tz 时我们已用 UTC 构造,再按 offset 调整
base.setUTCMinutes(base.getUTCMinutes() - offsetMin);
}
}
return base.toISOString();
}
// 3) 中文日期:YYYY年MM月DD日 HH:MM(:SS)?
const zh = text.match(/(\d{4})年(\d{1,2})月(\d{1,2})日(?:\s+(\d{1,2}):(\d{2})(?::(\d{2}))?)?/);
if (zh) {
const [ , Y, M, D, h='0', m='0', s='0'] = zh;
const d = new Date(+Y, +M - 1, +D, +h, +m, +s);
return new Date(d.getTime()).toISOString();
}
return null;
}
/** 提取 HTML 信息:title、canonical、publishedAt(ISO string 或 null) */
function parseHtml(html, baseUrl) {
const title = extractTitle(html);
let publishedAt = extractPublishedTimeFromMeta(html);
if (publishedAt) {
const ts = Date.parse(publishedAt);
publishedAt = Number.isNaN(ts) ? null : new Date(ts).toISOString();
} else {
publishedAt = extractPublishedTimeFromFirstParagraph(html);
}
const canonicalHref = extractCanonical(html);
let canonical = null;
if (canonicalHref) {
try {
const abs = new URL(canonicalHref, baseUrl).toString();
canonical = abs;
} catch {
// ignore bad canonical
}
}
return { title, canonical, publishedAt };
}
/** TTL 缓存:读取 */
function cacheGet(key) {
const hit = __articleCache.get(key);
if (hit && hit.expiresAt > Date.now()) return hit.value;
if (hit) __articleCache.delete(key);
return null;
}
/** TTL 缓存:写入 */
function cacheSet(key, value, ttl) {
__articleCache.set(key, { value, expiresAt: Date.now() + ttl });
}
/** 组合多个 AbortSignal(支持 AbortSignal.any 的环境;无则降级) */
function combineSignals(signals) {
const valid = signals.filter(Boolean);
if (valid.length === 0) return undefined;
if (typeof AbortSignal !== 'undefined' && typeof AbortSignal.any === 'function') {
return AbortSignal.any(valid);
}
const controller = new AbortController();
const onAbort = (e) => {
try { controller.abort(e?.target?.reason); } catch { controller.abort(); }
cleanup();
};
const cleanup = () => valid.forEach(s => s.removeEventListener('abort', onAbort));
let alreadyAborted = false;
for (const s of valid) {
if (s.aborted) {
alreadyAborted = true;
try { controller.abort(s.reason); } catch { controller.abort(); }
break;
}
}
if (!alreadyAborted) valid.forEach(s => s.addEventListener('abort', onAbort, { once: true }));
return controller.signal;
}
/** 基于并发的 limiter */
function createLimiter(limit, outerSignal) {
let active = 0;
const queue = [];
const runNext = () => {
if (outerSignal?.aborted) return;
if (active >= limit || queue.length === 0) return;
const task = queue.shift();
active++;
task().finally(() => {
active--;
runNext();
});
};
return (fn) => new Promise((resolve) => {
const wrapped = () => fn().then(resolve, resolve); // 将错误留待 allSettled 处理
queue.push(wrapped);
runNext();
});
}
/** 单 URL 抓取与解析 */
async function fetchOne(originalUrl, { timeoutMs, cacheTTL, outerSignal }) {
const normInput = normalizeUrl(originalUrl);
// 命中缓存
const cached = cacheGet(normInput);
if (cached) return cached;
// 每请求单独超时控制
const timeoutCtrl = new AbortController();
const tId = setTimeout(() => {
try { timeoutCtrl.abort(new Error('Timeout')); } catch { timeoutCtrl.abort(); }
}, timeoutMs);
const combinedSignal = combineSignals([timeoutCtrl.signal, outerSignal]);
try {
const res = await fetch(originalUrl, {
method: 'GET',
headers: {
'Accept': 'text/html,application/xhtml+xml;q=0.9,*/*;q=0.8',
// 可选 UA,部分站点对默认 UA 严格
'User-Agent': 'scrapeArticles/1.0 (+https://example.com)'
},
redirect: 'follow',
// gzip/br/deflate 由 fetch 实现自动协商与解压
signal: combinedSignal
});
if (!res.ok) {
throw new Error(`HTTP ${res.status} ${res.statusText}`);
}
const finalUrl = res.url || originalUrl;
const html = await res.text();
const { title, canonical, publishedAt } = parseHtml(html, finalUrl);
// 使用 canonical 或最终跳转后的 URL 进行规范化
const finalNorm = normalizeUrl(canonical ? new URL(canonical, finalUrl).toString() : finalUrl);
const result = {
url: finalNorm,
title: title || null,
publishedAt: publishedAt || null,
// 以字符长度衡量,如需字节可用 Buffer.byteLength(html, 'utf8')(Node 环境)
length: html.length
};
// 写入缓存(输入规范化 URL 与 canonical 规范化 URL 都写入,提升命中率)
cacheSet(normInput, result, cacheTTL);
cacheSet(finalNorm, result, cacheTTL);
return result;
} finally {
clearTimeout(tId);
}
}
/**
* 主函数:批量抓取文章标题与时间(限流、超时、缓存、去重、allSettled 汇总)
*/
async function scrapeArticles(urls, { concurrency = 5, timeoutMs = 6000, cacheTTL = 600000, signal } = {}) {
if (!Array.isArray(urls)) throw new Error('urls must be an array of strings');
if (concurrency <= 0) throw new Error('concurrency must be > 0');
// 预过滤非法/重复输入(按规范化 URL 去重)
const uniqueInputs = [];
const seen = new Set();
for (const u of urls) {
if (typeof u !== 'string' || !u.trim()) continue;
try {
const n = normalizeUrl(u);
if (!seen.has(n)) {
seen.add(n);
uniqueInputs.push({ original: u, normalized: n });
}
} catch {
// 跳过非法 URL(最终会在 failed 中体现)
uniqueInputs.push({ original: u, normalized: null });
}
}
const limiter = createLimiter(concurrency, signal);
const tasks = uniqueInputs.map(({ original, normalized }) =>
limiter(async () => {
if (!normalized) {
// 预规范化失败
return { status: 'rejected', reason: new Error('Invalid URL'), original };
}
try {
const value = await fetchOne(original, { timeoutMs, cacheTTL, outerSignal: signal });
return { status: 'fulfilled', value, original };
} catch (err) {
return { status: 'rejected', reason: err, original };
}
})
);
// 等待任务全部 settle(内部每项已包装为 fulfilled/rejected 结构)
const settled = await Promise.all(tasks);
// 汇总:去重(基于 canonical 或规范化 URL)
const finalList = [];
const finalFailed = [];
const dedupe = new Set();
for (const s of settled) {
if (s.status === 'fulfilled') {
const item = s.value;
const key = normalizeUrl(item.url);
if (!dedupe.has(key)) {
dedupe.add(key);
finalList.push(item);
}
} else {
const reason = s.reason;
const msg = (reason && (reason.message || String(reason))) || 'Unknown error';
finalFailed.push({
url: s.original,
reason: msg
});
}
}
return { list: finalList, failed: finalFailed };
}
注意:函数最终使用 allSettled 风格汇总,无论个别请求失败或被中止,均返回 { list, failed },不会整体抛错(除非参数校验失败)。
(async () => {
const urls = [
'https://example.com/article/123',
'https://example.com/article/123?utm_source=twitter',
'https://blog.example.org/post/abc',
'https://news.example.com/path#section',
'invalid-url'
];
const controller = new AbortController();
// 可选:在 10 秒后取消整个批量任务
// setTimeout(() => controller.abort(new Error('User aborted')), 10000);
const { list, failed } = await scrapeArticles(urls, {
concurrency: 4,
timeoutMs: 6000,
cacheTTL: 10 * 60 * 1000,
signal: controller.signal
});
console.log('Success:', list);
console.log('Failed:', failed);
})();
如果你需要扩展:支持重试策略、更多日期格式、内容字节长度统计、或将缓存替换为 LRU 等,我可以进一步完善。
函数定义 async function processCsvAndUpload(csvPath, endpoint, { batchSize = 100, concurrency = 4, signal, onProgress } = {})
代码实现 注意:该实现仅依赖 Node.js 内置模块(Node.js >= 18,内置 fetch),无第三方依赖。
// Node.js >= 18
import fs from 'node:fs';
import path from 'node:path';
import { once } from 'node:events';
export async function processCsvAndUpload(
csvPath,
endpoint,
{ batchSize = 100, concurrency = 4, signal, onProgress } = {}
) {
assertPositiveInteger(batchSize, 'batchSize');
assertPositiveInteger(concurrency, 'concurrency');
if (typeof endpoint !== 'string' || !/^https?:\/\//i.test(endpoint)) {
throw new TypeError('endpoint must be a valid HTTP/HTTPS URL string');
}
if (signal?.aborted) throw abortError();
const start = Date.now();
const dir = path.resolve(path.dirname(csvPath));
const successPath = path.join(dir, 'success.ndjson');
const errorsPath = path.join(dir, 'errors.ndjson');
// 输出文件写入流(NDJSON)
const successStream = fs.createWriteStream(successPath, { flags: 'w', encoding: 'utf8', highWaterMark: 1 << 20 });
const errorsStream = fs.createWriteStream(errorsPath, { flags: 'w', encoding: 'utf8', highWaterMark: 1 << 20 });
// 捕获写入流错误
const streamErrorHandler = (err) => {
// 将任何写入错误升级为操作失败
throw err;
};
successStream.on('error', streamErrorHandler);
errorsStream.on('error', streamErrorHandler);
const stats = {
linesRead: 0, // 读到的有效数据行(不含表头)
recordsProcessed: 0, // 进入批处理的记录数
batchesDispatched: 0,
batchesCompleted: 0,
batchesSucceeded: 0,
batchesFailed: 0,
skipped: 0,
};
const notify = () => {
if (typeof onProgress === 'function') {
try {
onProgress({
linesRead: stats.linesRead,
batchesDispatched: stats.batchesDispatched,
batchesCompleted: stats.batchesCompleted,
batchesSucceeded: stats.batchesSucceeded,
batchesFailed: stats.batchesFailed,
recordsProcessed: stats.recordsProcessed,
});
} catch {
// 忽略用户回调中的异常,避免影响主流程
}
}
};
// 简单的并发控制器(不创建队列:当活跃任务达到 concurrency 时,读入会等待最早完成的任务)
const active = new Set();
async function runLimited(fn) {
while (active.size >= concurrency) {
if (signal?.aborted) throw abortError();
await Promise.race(active);
}
let p;
p = (async () => {
try {
return await fn();
} finally {
active.delete(p);
}
})();
active.add(p);
return p;
}
// 写入 NDJSON 的工具函数(处理背压)
async function writeNdjson(stream, obj) {
const line = JSON.stringify(obj) + '\n';
if (!stream.write(line)) {
await once(stream, 'drain');
}
}
// 解析 Retry-After 头(秒或日期)
function parseRetryAfter(headerValue) {
if (!headerValue) return null;
const secs = Number(headerValue);
if (Number.isFinite(secs)) return secs * 1000;
const date = new Date(headerValue);
const delta = date.getTime() - Date.now();
return Number.isFinite(delta) && delta > 0 ? delta : null;
}
// 带指数退避重试的 POST
async function postWithRetry(url, jsonBody, { signal, maxRetries = 3, baseDelayMs = 500 }) {
let attempt = 0;
// 最多 attempt = 0..maxRetries,成功或遇到不可重试错误则提前返回
while (attempt <= maxRetries) {
if (signal?.aborted) throw abortError();
try {
const res = await fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
'accept': 'application/json, text/plain, */*',
},
body: jsonBody,
// 将外部 signal 直接传入:可在请求中途取消
signal,
keepalive: true,
});
const status = res.status;
if (res.ok) {
// 尝试读取少量响应信息(非强制)
let responseSnippet = '';
try {
responseSnippet = await res.text();
if (responseSnippet.length > 2048) responseSnippet = responseSnippet.slice(0, 2048);
} catch { /* ignore body read errors */ }
return { ok: true, status, responseSnippet };
}
// 可重试的状态码:5xx/429
if ((status >= 500 && status <= 599) || status === 429) {
if (attempt === maxRetries) {
const bodyText = await safeReadText(res);
return {
ok: false,
retryable: true,
status,
bodyText,
};
}
// 计算等待时间:Retry-After 优先,其次指数退避 + 抖动
const retryAfterMs = parseRetryAfter(res.headers.get('retry-after'));
const backoff = retryAfterMs ?? Math.min(30000, baseDelayMs * 2 ** attempt) + Math.floor(Math.random() * 250);
await sleep(backoff, signal);
attempt += 1;
continue;
}
// 不可重试的 4xx/其它错误
const bodyText = await safeReadText(res);
return { ok: false, retryable: false, status, bodyText };
} catch (err) {
// fetch 抛出(网络错误/超时/取消)
if (isAbortError(err)) throw err;
if (attempt === maxRetries) {
return { ok: false, retryable: true, status: 0, bodyText: String(err?.message ?? err) };
}
const backoff = Math.min(30000, baseDelayMs * 2 ** attempt) + Math.floor(Math.random() * 250);
await sleep(backoff, signal);
attempt += 1;
}
}
// 正常不会走到此处
return { ok: false, retryable: true, status: 0, bodyText: 'unknown error' };
}
function isAbortError(err) {
return (err && (err.name === 'AbortError' || err.code === 'ABORT_ERR'));
}
function abortError() {
// 在 Node18+ 中 DOMException 可能不可用,退回 Error
try {
// eslint-disable-next-line no-undef
return new DOMException('The operation was aborted', 'AbortError');
} catch {
const e = new Error('The operation was aborted');
e.name = 'AbortError';
return e;
}
}
async function sleep(ms, signal) {
if (ms <= 0) return;
await new Promise((resolve, reject) => {
const t = setTimeout(resolve, ms);
const onAbort = () => {
clearTimeout(t);
reject(abortError());
};
if (signal) {
if (signal.aborted) {
onAbort();
} else {
signal.addEventListener('abort', onAbort, { once: true });
}
}
});
}
async function safeReadText(res) {
try {
const t = await res.text();
return t.length > 4096 ? t.slice(0, 4096) : t;
} catch {
return '';
}
}
function assertPositiveInteger(n, name) {
if (!Number.isInteger(n) || n <= 0) {
throw new TypeError(`${name} must be a positive integer`);
}
}
// CSV 流式解析(RFC4180 近似实现)
async function* readCsvRecords(filePath, { signal }) {
const stream = fs.createReadStream(filePath, { encoding: 'utf8', highWaterMark: 1 << 20 });
if (signal) {
const onAbort = () => stream.destroy(abortError());
if (signal.aborted) onAbort();
else signal.addEventListener('abort', onAbort, { once: true });
}
let buf = '';
let i = 0;
let inQuotes = false;
let field = '';
let record = [];
let sawAny = false;
for await (const chunk of stream) {
if (!sawAny && chunk.charCodeAt(0) === 0xFEFF) {
// 去除 UTF-8 BOM
buf += chunk.slice(1);
} else {
buf += chunk;
}
sawAny = true;
while (i < buf.length) {
const ch = buf[i];
if (inQuotes) {
if (ch === '"') {
const next = buf[i + 1];
if (next === '"') {
field += '"'; // 转义双引号
i += 2;
continue;
}
inQuotes = false;
i += 1;
continue;
}
field += ch;
i += 1;
continue;
} else {
if (ch === '"') {
inQuotes = true;
i += 1;
continue;
}
if (ch === ',') {
record.push(field);
field = '';
i += 1;
continue;
}
if (ch === '\n' || ch === '\r') {
// 结束一条记录
record.push(field);
field = '';
// 跳过 CRLF
if (ch === '\r' && buf[i + 1] === '\n') i += 2;
else i += 1;
yield record;
record = [];
continue;
}
field += ch;
i += 1;
}
}
// 移除已消费段,保留尾部未完成字段/记录
buf = '';
i = 0;
}
// 文件结束,输出最后一条(若存在)
if (inQuotes) {
// 未闭合引号,作为损坏行交给上层处理(会被当作字段数不匹配)
}
if (field.length > 0 || record.length > 0) {
record.push(field);
yield record;
}
}
// 表头唯一化(重复列名添加后缀 _2, _3,...)
function uniquifyHeaders(headers) {
const map = new Map();
return headers.map((h0) => {
let h = String(h0 ?? '').trim();
if (h.startsWith('\uFEFF')) h = h.slice(1); // 再次保险去BOM
const count = (map.get(h) ?? 0) + 1;
map.set(h, count);
return count === 1 ? h : `${h}_${count}`;
});
}
// 主流程:读取 -> 组批 -> 并发上报
const batch = [];
let batchId = 0;
let rowNumber = 0; // 数据行号(不含表头,首条数据行为 1)
let headers = null;
let headerLen = 0;
const reader = readCsvRecords(csvPath, { signal });
try {
// 读取表头
const first = await reader.next();
if (first.done) {
await closeStreams();
return finalizeResult();
}
headers = uniquifyHeaders(first.value);
headerLen = headers.length;
// 持续读取数据行
for await (const rec of reader) {
if (signal?.aborted) throw abortError();
// 跳过完全空行
if (rec.length === 1 && rec[0].trim() === '') {
stats.skipped += 1;
await writeNdjson(errorsStream, {
status: 'error',
reason: 'empty_line',
rowNumber: rowNumber + 1, // 预增前的下一行
});
continue;
}
rowNumber += 1;
stats.linesRead += 1;
// 字段对齐:严格要求字段数一致,不一致则跳过并记录
if (rec.length !== headerLen) {
stats.skipped += 1;
await writeNdjson(errorsStream, {
status: 'error',
reason: 'field_count_mismatch',
rowNumber,
expected: headerLen,
actual: rec.length,
raw: rec,
});
// 每 1000 行或错误发生时可通知
if (rowNumber % 1000 === 0) notify();
continue;
}
const obj = {};
for (let i = 0; i < headerLen; i += 1) {
obj[headers[i]] = rec[i];
}
batch.push({ rowNumber, data: obj });
stats.recordsProcessed += 1;
if (batch.length >= batchSize) {
const toSend = batch.splice(0, batch.length);
stats.batchesDispatched += 1;
const myBatchId = ++batchId;
// 并发受限执行
runLimited(() => handleBatch(myBatchId, toSend)).catch(async (err) => {
// 将内部错误冒泡:取消时会是 AbortError,其他错误也终止流程
// 尽快关闭流并重抛
await closeStreams().catch(() => {});
throw err;
});
// 每发送一个批次就通知一次
notify();
}
if (rowNumber % 1000 === 0) {
notify();
}
}
// 发送最后不足一批的数据
if (batch.length > 0) {
const toSend = batch.splice(0, batch.length);
stats.batchesDispatched += 1;
const myBatchId = ++batchId;
runLimited(() => handleBatch(myBatchId, toSend)).catch(async (err) => {
await closeStreams().catch(() => {});
throw err;
});
notify();
}
// 等待所有进行中的任务完成
while (active.size > 0) {
if (signal?.aborted) throw abortError();
await Promise.race(active);
}
await closeStreams();
return finalizeResult();
} catch (err) {
// 主流程异常(包含取消)
await closeStreams().catch(() => {});
throw err;
}
async function handleBatch(myBatchId, items) {
if (signal?.aborted) throw abortError();
// 准备 JSON body(仅发送 data 对象数组)
const payload = items.map((it) => it.data);
const jsonBody = JSON.stringify(payload);
const res = await postWithRetry(endpoint, jsonBody, { signal, maxRetries: 3, baseDelayMs: 500 });
stats.batchesCompleted += 1;
if (res.ok) {
stats.batchesSucceeded += 1;
// 对成功的每条记录写入 success.ndjson(保留可观测性)
// 如需降采样,可考虑仅写入批级记录
for (const it of items) {
await writeNdjson(successStream, {
status: 'success',
batchId: myBatchId,
rowNumber: it.rowNumber,
endpoint,
responseStatus: 200,
});
}
} else {
stats.batchesFailed += 1;
// 将整个批次写入死信文件(逐条记录,包含失败原因)
const reason = res.retryable ? 'network_or_server' : 'client_error';
const message = res.bodyText ? truncate(res.bodyText, 2048) : undefined;
for (const it of items) {
await writeNdjson(errorsStream, {
status: 'error',
reason,
batchId: myBatchId,
rowNumber: it.rowNumber,
endpoint,
httpStatus: res.status,
message,
data: it.data,
});
}
}
notify();
}
function truncate(s, n) {
return s && s.length > n ? s.slice(0, n) : s;
}
async function closeStreams() {
await Promise.all([
new Promise((resolve) => successStream.end(resolve)),
new Promise((resolve) => errorsStream.end(resolve)),
]);
}
function finalizeResult() {
const elapsedMs = Date.now() - start;
return {
total: stats.recordsProcessed,
batchesSucceeded: stats.batchesSucceeded,
batchesFailed: stats.batchesFailed,
skipped: stats.skipped,
elapsedMs,
files: {
success: successPath,
errors: errorsPath,
},
};
}
}
错误处理
使用示例
import { processCsvAndUpload } from './processCsvAndUpload.js';
const ac = new AbortController();
(async () => {
try {
const result = await processCsvAndUpload(
'./data/input.csv',
'https://api.example.com/ingest/batch',
{
batchSize: 200,
concurrency: 4,
signal: ac.signal,
onProgress: ({ linesRead, batchesDispatched, batchesCompleted, batchesSucceeded, batchesFailed, recordsProcessed }) => {
console.log(
`lines=${linesRead}, dispatched=${batchesDispatched}, completed=${batchesCompleted}, ok=${batchesSucceeded}, fail=${batchesFailed}, records=${recordsProcessed}`
);
},
}
);
console.log('Done:', result);
// 预期输出:
// {
// total: 12345,
// batchesSucceeded: 60,
// batchesFailed: 2,
// skipped: 10,
// elapsedMs: 9876,
// files: { success: '/abs/path/success.ndjson', errors: '/abs/path/errors.ndjson' }
// }
} catch (e) {
if (e.name === 'AbortError') {
console.error('Canceled by user');
} else {
console.error('Failed:', e);
}
}
})();
// 在需要时取消
// ac.abort();
技术要点
为前端/全栈/Node 开发者与技术团队提供一键式的异步函数生成体验:基于简单的任务描述,自动产出符合最佳实践的 async/await 代码、完善的错误处理、清晰的使用示例与关键说明;显著缩短开发与评审时间、降低线上故障风险、统一团队编码风格;覆盖数据获取、文件读写与服务请求等高频场景,帮助个人快速交付、团队高质量复用,并推动从试用到持续付费的转化。
在新页面快速搭建数据请求与渲染逻辑;为列表、图片等多源数据并发加载;为上传下载加入超时与重试;统一错误提示与空态处理。
封装与外部服务的异步调用;为批量任务设置并发限制与队列;完善日志与异常分级;减少手写样板,缩短交付周期。
沉淀团队通用模板与约定;统一超时、重试、超限策略;把控代码可维护性与规范;显著降低线上故障率。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期