算法伪代码生成专家

8 浏览
1 试用
0 购买
Nov 29, 2025更新

本提示词专为AI/ML工程师设计,能够根据用户提供的算法名称和具体需求,生成结构清晰、逻辑严谨的伪代码实现方案。通过系统化的任务分解和逐步推理,确保生成的伪代码既符合算法原理又具备实际可操作性,同时严格遵循技术文档的规范要求,避免冗余信息和错误表述,为算法实现提供可靠的参考依据。

算法名称和简介

基于优先队列的 Dijkstra 最短路径(邻接表,非负权,支持单源到终点早停与路径重构)。
算法利用二叉堆/优先队列维护当前最小的临时距离条目,按出堆顺序逐步“定稿”节点的最短距离;当终点首次出堆时可安全早停。输出包含全局距离表与源到终点的实际路径。


输入输出说明

  • 输入
    • G:图的邻接表,类型为字典/映射,键为节点,值为列表[(邻接点, 边权重)]。允许有节点无出边(需在 G 中显式给出或通过扫描邻接收集)。
    • s:源点
    • t:终点(可选;本实现支持单源到终点早停与路径重构,t 必填以启用早停)
    • 可选参数
      • early_stop(默认 true):若为 true,t 首次出堆即停止
      • check_input(默认 true):运行前检查非负权与节点存在性
  • 输出
    • dist:字典/映射,dist[v] 表示 s 到 v 的最短距离(不可达则为 +∞)
    • parent:字典/映射,用于路径重构,parent[v] 为使 dist[v] 最优时的前驱节点(源点 parent[s] = NIL)
    • path:列表,s 到 t 的最短路径节点序列;若 t 不可达,则为空列表

伪代码实现

PROCEDURE DIJKSTRA_PQ(G, s, t, early_stop = true, check_input = true):
    # 关键变量:
    #   V            —— 节点集合(从 G 的键与所有邻接点联合得到)
    #   INF          —— 正无穷(表示不可达)
    #   dist[v]      —— s 到 v 的当前最短已知距离(初始化为 INF)
    #   parent[v]    —— 最优路径上 v 的前驱(初始化为 NIL)
    #   PQ           —— 最小堆/优先队列,保存条目 (key=dist, value=node)
    #   settled      —— 已“定稿”的节点集合(可选,仅作逻辑明确;使用懒惰删除时非必需)

    # -------- 输入合法性与前置处理 --------
    V ← EMPTY_SET
    FOR EACH u IN KEYS(G): 
        ADD u TO V
        FOR EACH (v, w) IN G[u]:
            ADD v TO V
    IF s NOT IN V: RAISE Error("源点 s 不在图中")
    IF t NOT IN V: RAISE Error("终点 t 不在图中")

    IF check_input == true:
        FOR EACH u IN V:
            FOR EACH (v, w) IN G.get(u, []):
                IF w < 0: RAISE Error("检测到负权边,Dijkstra 不适用")

    # 确保对无出边节点也能访问邻接(返回空列表)
    FUNCTION ADJ(u):
        RETURN G.get(u, [])

    # -------- 初始化 --------
    INF ← +∞
    FOR EACH v IN V:
        dist[v] ← INF
        parent[v] ← NIL
    dist[s] ← 0

    PQ ← MinHeap()                        # 支持 push((key,node)) 与 pop_min() → (key,node)
    PQ.push((0, s))
    settled ← EMPTY_SET                   # 可选

    # -------- 主循环 --------
    WHILE PQ IS NOT EMPTY:
        (d, u) ← PQ.pop_min()

        # 懒惰删除(关键边界):若该条目已过期(比 dist[u] 大),跳过
        IF d ≠ dist[u]:
            CONTINUE

        # 可选:标记 u 已定稿(非必须,因有 d==dist[u] 的检查)
        IF u IN settled:
            CONTINUE
        ADD u TO settled

        # 早停条件(关键):首次将 t 出堆时,dist[t] 已为全局最短,可安全停止
        IF early_stop == true AND u == t:
            BREAK

        # 松弛(关键步骤)
        FOR EACH (v, w) IN ADJ(u):
            # 此处假设 w ≥ 0,若需健壮可再次断言
            alt ← dist[u] + w
            IF alt < dist[v]:
                dist[v] ← alt
                parent[v] ← u
                PQ.push((alt, v))        # 懒惰更新,无需 decrease-key

    # -------- 路径重构 --------
    path ← EMPTY_LIST
    IF dist[t] ≠ INF:
        cur ← t
        WHILE cur ≠ NIL:
            APPEND cur TO path
            cur ← parent[cur]
        REVERSE(path)                     # s → ... → t
    ELSE:
        path ← EMPTY_LIST                 # t 不可达

    RETURN (dist, parent, path)

# -------- 结果自检(可选,但推荐在开发/调试中启用) --------
PROCEDURE VALIDATE_DIJKSTRA_RESULT(G, s, t, dist, parent):
    # 1) 源点距离检查
    ASSERT dist[s] == 0

    # 2) 约束检查:对每条边 (u,v,w) 有 dist[v] ≤ dist[u] + w (若 dist[u] = ∞ 则跳过)
    FOR EACH u IN KEYS(G):
        FOR EACH (v, w) IN G[u]:
            IF dist[u] ≠ +∞:
                ASSERT dist[v] ≤ dist[u] + w

    # 3) 前驱一致性:若 parent[v] = u,则存在边 (u,v,w),且 dist[v] = dist[u] + w
    #    (该检查对检测实现错误非常有用)
    H ← MAP()  # H[(u,v)] = w,便于 O(1) 查询
    FOR EACH u IN KEYS(G):
        FOR EACH (v, w) IN G[u]:
            H[(u, v)] ← w
    FOR EACH v IN dist.Keys():
        IF v == s: CONTINUE
        u ← parent[v]
        IF u ≠ NIL:
            ASSERT (u, v) IN H
            ASSERT dist[v] == dist[u] + H[(u, v)]

    # 4) 路径正确性(若 t 可达):累加 path 上边权应等于 dist[t]
    IF dist[t] ≠ +∞:
        (dist2, _, path2) ← (dist, parent, EMPTY_LIST)  # 逻辑占位
        # 重构一次
        cur ← t
        sum_cost ← 0
        WHILE parent[cur] ≠ NIL:
            u ← parent[cur]
            sum_cost ← sum_cost + H[(u, cur)]
            cur ← u
        ASSERT sum_cost == dist[t]

关键参数说明

  • G:邻接表,若为无向图需同时存储双向边(u,v,w)与(v,u,w)
  • s, t:源点与终点。t 必填用于早停;若不早停可令 t = NIL 并遍历至队列空
  • early_stop:
    • true:t 首次出堆终止,构造 s→t 的路径,未必计算所有节点的最终最短路
    • false:计算到所有可达节点的最短路(不建议用于大图一对一点询问)
  • check_input:运行前是否检查负权与节点存在性
  • INF:建议使用语言内置的大数或浮点正无穷
  • 优先队列实现:
    • 推荐二叉堆(push/pop O(log V)),采用“懒惰删除”(重复入堆,出堆比对 d 与 dist[u])
    • 如使用可减键堆(decrease-key),可避免重复入堆,但实现复杂

边界条件与注意事项:

  • s 不在图或 t 不在图:抛出错误
  • s == t:初始化后可直接早停,path = [s],dist[s] = 0
  • 不可达:dist[t] = +∞,path = []
  • 允许零权边与自环(自环不影响最短路)
  • 禁止负权边(否则算法不正确)

算法复杂度分析

  • 时间复杂度(使用二叉堆,懒惰删除):
    • 最坏情况:O((V + E) log V),稀疏图(E ≈ O(V))约为 O(V log V)
    • 启用早停:平均可显著减少出堆与松弛次数,但最坏界不变
  • 空间复杂度:
    • O(V + E) 存储图
    • O(V) 存储 dist、parent、堆条目(懒惰删除可能导致堆大小 O(E) 的常数因子增大,但阶同)

适用场景说明

  • 适用:非负权、稀疏型大图的一对一最短路径(如路网导航、时延路由、成本最小任务流转)
  • 不适用:存在负权边/负环(需 Bellman-Ford / Johnson),或需要启发式引导(可用 A*)

伪随机示例(含早停、路径重构与自检)

为便于复现实验,给出一个固定示例(可视作使用固定种子生成的伪随机稀疏图)。

  • 节点集:{A, B, C, D, E, F}
  • 邻接表 G(有向图,非负权):
    • A: (B, 4), (C, 2)
    • B: (C, 1), (D, 5)
    • C: (B, 1), (D, 8), (E, 10)
    • D: (E, 2), (F, 6)
    • E: (F, 3)
    • F: ( ) # 无出边
  • 源点 s = A,终点 t = F,early_stop = true

运行 DIJKSTRA_PQ(G, A, F):

  • 出堆/松弛过程(略),首次出堆 F 时 dist[F] = 13,早停
  • 路径重构:
    • parent 映射形成:C←A, B←C, D←B, E←D, F←E
    • path = [A, C, B, D, E, F]
    • 路径代价 = 2 + 1 + 5 + 2 + 3 = 13
  • 距离表 dist(其余节点为 +∞ 则未列出,本例均可达):
    • dist[A]=0, dist[C]=2, dist[B]=3, dist[D]=8, dist[E]=10, dist[F]=13

自检 VALIDATE_DIJKSTRA_RESULT(G, A, F, dist, parent):

  • 通过:对所有边均满足 dist[v] ≤ dist[u] + w;parent 一致性与路径代价检查均成立

说明:

  • 由于启用早停,算法未必访问所有节点的最终状态,但本例中除 F 外均已定稿(不影响 dist 表正确性与 s→t 路径正确性)。

以上方案、伪代码与示例可直接作为工程实现与技术文档的参考模板。

  • Algorithm Name and Overview K-Means++ Clustering with Reproducible Seeding, Empty-Cluster Repair, and SSE-based Stopping

This algorithm partitions n data points into k clusters by minimizing the within-cluster Sum of Squared Errors (SSE). It uses K-Means++ for center initialization to improve convergence and reproducibility via a fixed random seed. During iterations, it repairs empty clusters by reassigning the farthest points. The loop stops when the maximum number of iterations is reached or when the SSE improvement falls below a tolerance.

  • Inputs and Outputs Inputs:
  • X: n-by-d matrix of data points (real-valued)
  • k: number of clusters (integer, 1 <= k <= n_unique)
  • max_iter: maximum iterations for Lloyd’s steps (default 300)
  • tol: relative tolerance on SSE improvement, i.e., stop if (SSE_prev - SSE_curr) <= tol * SSE_prev (default 1e-4)
  • random_state: integer seed for reproducible RNG (default 42)

Assumptions:

  • Distance metric: squared Euclidean distance
  • Indices are 0-based
  • Deterministic tie-breaking by smallest index

Outputs:

  • centers: k-by-d matrix of final cluster centers

  • labels: length-n vector of cluster assignments in {0, …, k-1}

  • SSE: final Sum of Squared Errors with respect to centers and labels

  • Pseudocode Implementation Note: Uses only squared distances; no square roots.

FUNCTION KMEANS_PP_CLUSTER(X, k, max_iter=300, tol=1e-4, random_state=42):
    INPUT:
        X ∈ R^{n×d}, k ∈ N, max_iter ∈ N, tol ∈ R≥0, random_state ∈ Z
    OUTPUT:
        centers ∈ R^{k×d}, labels ∈ {0..k-1}^n, SSE ∈ R≥0

    REQUIRE: 1 ≤ k ≤ number_of_unique_rows(X)
    RNG ← SeededRandom(random_state)

    centers ← INIT_KMEANS_PLUS_PLUS(X, k, RNG)

    prev_SSE ← +∞
    FOR iter FROM 1 TO max_iter:
        # Assignment step: compute nearest center for each point
        labels, best_d2 ← ASSIGN_TO_NEAREST_CENTERS(X, centers)

        # Update step (compute means) with empty-cluster detection
        centers_new, sizes, empty_list ← COMPUTE_MEANS_AND_EMPTY(X, labels, k)

        IF empty_list IS NOT EMPTY:
            # Repair empty clusters by reassigning farthest points
            REPAIR_EMPTY_CLUSTERS(X, labels, sizes, centers_new, best_d2, empty_list, RNG)

            # After repair, recompute centers_new (means) for non-singleton clusters
            centers_new, sizes, _ ← COMPUTE_MEANS_AND_EMPTY(X, labels, k)
            # Note: After repair, clusters are guaranteed non-empty if feasible.

        # Compute SSE relative to updated centers
        SSE ← COMPUTE_SSE(X, labels, centers_new)

        # Check relative improvement on SSE
        IF prev_SSE - SSE ≤ tol * prev_SSE:
            centers ← centers_new
            BREAK
        ELSE:
            centers ← centers_new
            prev_SSE ← SSE
            CONTINUE
    END FOR

    # Final labels aligned to final centers (ensure consistency)
    labels, _ ← ASSIGN_TO_NEAREST_CENTERS(X, centers)
    SSE ← COMPUTE_SSE(X, labels, centers)

    RETURN centers, labels, SSE


FUNCTION INIT_KMEANS_PLUS_PLUS(X, k, RNG):
    n, d ← shape(X)
    centers ← empty list
    # Choose first center uniformly at random
    c0 ← RNG.randint(0, n-1)
    centers.append(X[c0])

    # Maintain D2: squared distance to nearest chosen center
    D2[0..n-1] ← +∞
    FOR i IN 0..n-1:
        D2[i] ← SQUARED_DIST(X[i], centers[0])

    WHILE length(centers) < k:
        total ← sum(D2)
        IF total == 0:
            # All points coincide with chosen centers; pick deterministically for reproducibility
            # Choose the smallest index not yet in centers by index uniqueness, or random fallback
            idx ← smallest i with D2[i] == 0 and X[i] not in centers by value; 
            IF no such idx:
                idx ← RNG.randint(0, n-1)
        ELSE:
            # Sample next center with probability proportional to D2
            probs[i] ← D2[i] / total for i=0..n-1
            idx ← RNG.categorical(probs)  # reproducible given RNG
        centers.append(X[idx])

        # Update D2: distance to nearest of the chosen centers so far
        FOR i IN 0..n-1:
            d2 ← SQUARED_DIST(X[i], X[idx])
            IF d2 < D2[i]:
                D2[i] ← d2
    RETURN stack(centers)  # k×d


FUNCTION ASSIGN_TO_NEAREST_CENTERS(X, centers):
    n, d ← shape(X); k ← number of rows in centers
    labels[0..n-1], best_d2[0..n-1]
    FOR i IN 0..n-1:
        best_j ← 0
        best_val ← SQUARED_DIST(X[i], centers[0])
        FOR j IN 1..k-1:
            val ← SQUARED_DIST(X[i], centers[j])
            IF val < best_val OR (val == best_val AND j < best_j):  # deterministic tie-break
                best_val ← val
                best_j ← j
        labels[i] ← best_j
        best_d2[i] ← best_val
    RETURN labels, best_d2


FUNCTION COMPUTE_MEANS_AND_EMPTY(X, labels, k):
    n, d ← shape(X)
    centers_new ← zeros(k, d)
    sizes[0..k-1] ← 0
    FOR i IN 0..n-1:
        j ← labels[i]
        centers_new[j] ← centers_new[j] + X[i]
        sizes[j] ← sizes[j] + 1
    empty_list ← empty list
    FOR j IN 0..k-1:
        IF sizes[j] > 0:
            centers_new[j] ← centers_new[j] / sizes[j]
        ELSE:
            empty_list.append(j)
    RETURN centers_new, sizes, empty_list


FUNCTION REPAIR_EMPTY_CLUSTERS(X, labels, sizes, centers_new, best_d2, empty_list, RNG):
    # Reassign farthest points (by squared distance to their current assigned center)
    # to fill empty clusters. Prefer donors from clusters with size ≥ 2 to avoid creating new empties.
    n ← length(labels)

    # Create a priority ordering of candidate points by decreasing best_d2, deterministic ties by smallest index
    idxs ← [0,1,...,n-1]
    sort idxs by (-best_d2[i], i)  # descending distance, then ascending index

    queue ← copy(empty_list)  # process one empty cluster at a time
    q_ptr ← 0
    WHILE q_ptr < length(queue):
        j_empty ← queue[q_ptr]
        q_ptr ← q_ptr + 1

        # Find farthest eligible donor
        donor_i ← -1
        FOR t IN 0..n-1:
            i ← idxs[t]
            j_old ← labels[i]
            IF sizes[j_old] >= 2:
                donor_i ← i
                BREAK

        IF donor_i == -1:
            # No multi-point clusters exist; pick global farthest (might create a new empty)
            donor_i ← idxs[0]  # farthest overall

        j_old ← labels[donor_i]

        # Reassign donor_i to the empty cluster
        labels[donor_i] ← j_empty
        sizes[j_old] ← sizes[j_old] - 1
        sizes[j_empty] ← sizes[j_empty] + 1

        # Set the empty center to the donor point (seed); mean will be recomputed later
        centers_new[j_empty] ← X[donor_i]

        # If we created a new empty cluster, append it to the queue
        IF sizes[j_old] == 0 AND j_old != j_empty:
            append j_old to queue

    # Note: After this routine, labels/sizes reflect no empties if feasible.
    RETURN  # labels, sizes, centers_new are modified in-place


FUNCTION COMPUTE_SSE(X, labels, centers):
    n ← length(labels)
    SSE ← 0
    FOR i IN 0..n-1:
        j ← labels[i]
        SSE ← SSE + SQUARED_DIST(X[i], centers[j])
    RETURN SSE


FUNCTION SQUARED_DIST(a, b):
    # Returns ||a - b||_2^2
    s ← 0
    FOR t IN 0..length(a)-1:
        d ← a[t] - b[t]
        s ← s + d*d
    RETURN s
  • Key Parameter Descriptions

  • k (required): number of clusters. Should satisfy 1 ≤ k ≤ number of unique samples; otherwise, some clusters cannot be populated meaningfully.

  • max_iter (default 300): maximum number of Lloyd iterations (assignment + update).

  • tol (default 1e-4): relative SSE improvement threshold. The loop stops when prev_SSE - SSE ≤ tol * prev_SSE. Use tol = 0 to require strict monotonic improvement until max_iter.

  • random_state (default 42): integer seed for reproducible K-Means++ sampling and any tie-breaking that uses RNG.

  • Distance metric: squared Euclidean distance; avoids square roots and is standard for K-Means.

  • Tie-breaking: when distances are equal, the smallest center index is chosen to ensure determinism.

  • Algorithm Complexity Analysis Let:

  • n: number of points

  • d: dimensionality

  • k: number of clusters

  • T: number of iterations until convergence (T ≤ max_iter)

Time:

  • Initialization (K-Means++): O(n k d). Each new center requires scanning all points to update the nearest-center squared distances.
  • Each iteration:
    • Assignment: O(n k d).
    • Empty-cluster repair: sorting indices by best_d2 can be avoided by tracking the farthest online; if sorted once per iteration it is O(n log n), but you can achieve O(n) selection by single pass with a max-heap or argmax. The reassignment itself is O(k) lookups. Overall O(n) to O(n log n), negligible vs O(n k d) for typical k and d.
    • Center update (means): O(n d).
    • SSE computation: O(n d). Dominated by O(n k d) per iteration.

Total: O((k + T·k) n d) ≈ O(T n k d), since initialization also scales as O(n k d).

Space:

  • Data and centers: O(n d + k d)

  • Labels and per-point best squared distances: O(n)

  • No need to store a full n×k distance matrix.

  • Applicable Scenarios

  • General-purpose clustering for numerical data when k is known or chosen via a model selection criterion.

  • Large datasets where improved initialization (K-Means++) accelerates convergence and reduces poor local minima.

  • Workflows requiring deterministic, reproducible results (fixed random_state and deterministic tie-breaking).

  • Production systems where empty clusters can occur; the farthest-point reassignment ensures stable progress without failures.

Notes and Practical Tips:

  • Standardize or normalize features if scales differ significantly.
  • If k > n_unique, true k-way separation is impossible; the repair strategy will still run but multiple centers may collapse on identical points.
  • To reduce iteration cost, reuse best_d2 during assignment and only recompute distances to candidate centers when necessary (not shown for clarity).
  • 算法名称和简介 带长度惩罚的Beam Search解码(批量版) 在自回归生成任务(机器翻译、文本生成、语音识别等)中,使用Beam Search在每步扩展多个候选,结合长度惩罚α缓解短句偏好,支持重复n元阻断(no-repeat n-gram)避免循环,结合温度缩放与top-k过滤控制多样性与计算开销。算法支持批量解码与EOS早停,返回每个样本的前N个候选序列及其对数概率。

  • 输入输出说明 输入

  • ModelStep: 增量解码接口,签名: logits, new_cache = ModelStep(prev_tokens, cache, batch_beam_mask)

    • prev_tokens: [B_active],当前每个活跃beam的最后一个token(或用于增量的当步输入)
    • cache: 模型缓存(见“关键状态缓存”)
    • batch_beam_mask: [B_active],可选,用于指示哪些beam是有效(便于一次性批处理)
    • 返回 logits: [B_active, V](未归一化logits),new_cache 为更新后的缓存
  • prompts: List[List[int]],长度为batch_size,每项为初始提示序列(可含BOS)

  • max_steps: 最大解码步数T_max(不含prompt长度)

  • beam_size: 每个样本的beam数B

  • n_best: 返回的候选数N(N ≤ B)

  • alpha: 长度惩罚系数α(GNMT样式)

  • no_repeat_ngram_size: n元阻断大小,n=0表示关闭

  • temperature: 温度τ > 0,τ=1为不缩放

  • top_k: k值(1 ≤ k ≤ V),k=V表示不裁剪

  • eos_token_id, bos_token_id, pad_token_id: 特殊符号ID

  • early_stop: 是否启用EOS早停(默认true)

输出

  • 对于batch中每个样本,返回长度为N的候选列表:

    • tokens: 生成的完整token序列(含EOS,必要时pad到相同长度)
    • sum_logprob: 原始对数概率之和(未归一化)
    • score: 长度惩罚后的排序分数(sum_logprob / length_penalty)
    • length: 生成长度(含EOS)
  • 伪代码实现 注意:以下伪代码采用GNMT长度惩罚;对数域累积概率;在扩展时对logits按温度与top-k处理,并应用n元阻断;批量与beam维度合并后一次调用ModelStep;每个样本独立维护完成候选集合并支持早停。

函数定义 function LengthPenalty(L, alpha): // GNMT长度惩罚 // lp(L) = ((5 + L) ^ alpha) / (6 ^ alpha) return ((5 + L) ^ alpha) / (6 ^ alpha)

function TopKFilter(logits_vec, k): // logits_vec: [V] if k >= V: return logits_vec indices = argsort_desc(logits_vec) // 从大到小 threshold_index = indices[k-1] threshold = logits_vec[threshold_index] for v in 0..V-1: if logits_vec[v] < threshold: logits_vec[v] = -INF return logits_vec

function NoRepeatNGramMask(seq, n, V): // 返回 banned_mask: [V],true表示禁止 if n <= 0 or length(seq) < n - 1: return BoolVector(V, false) prefix = seq[(length(seq) - (n - 1)) : length(seq)] // 长度n-1 banned = Set() // 将与该prefix构成重复n-gram的下一个token // 扫描历史,收集所有prefix后续token for i in 0 .. (length(seq) - n): if seq[i : i + (n - 1)] == prefix: next_token = seq[i + (n - 1)] banned.add(next_token) banned_mask = BoolVector(V, false) for t in banned: banned_mask[t] = true return banned_mask

数据结构 struct BeamState: tokens: List[int] // 已生成(不含prompt);用于输出可与prompt拼接 last_token: int sum_logprob: float // 累积对数概率(含当前tokens) length: int // 已生成长度(含EOS时计入) cache: Any // 模型增量缓存(KV缓存/隐藏状态等) finished: bool // 是否已生成EOS score: float // 用于排序:sum_logprob / LengthPenalty(length, alpha)

主过程 function BeamSearchBatch(ModelStep, prompts, max_steps, beam_size, n_best, alpha, no_repeat_ngram_size, temperature, top_k, eos_token_id, bos_token_id, pad_token_id, early_stop):

batch_size = length(prompts)
V = vocabulary_size()

// 初始化每个样本的beam
beams = List[batch_size] of List[BeamState]
finished_lists = List[batch_size] of PriorityQueue(desc_by_score) // 存储完成的候选
for b in 0 .. batch_size-1:
    // 初始化cache:将prompt编码至cache。具体取决于ModelStep/模型结构
    init_cache = InitCacheFromPrompt(prompts[b])
    init_last = prompts[b][-1]  // prompt的最后一个token作为第一步输入
    beams[b] = []
    // 将单一初始beam复制为beam_size份(共享或拷贝cache)
    for k in 0 .. beam_size-1:
        beams[b].append(BeamState(tokens=[], last_token=init_last,
                                  sum_logprob=0.0, length=0, cache=Clone(init_cache),
                                  finished=false, score=0.0))

step = 0
while step < max_steps:
    step += 1

    // 汇总所有未完成beam用于一次性前向
    active_indices = []   // (b, j)对
    inputs = []           // 对应last_token
    caches = []           // 对应cache
    for b in 0 .. batch_size-1:
        for j in 0 .. beam_size-1:
            if not beams[b][j].finished:
                active_indices.append((b, j))
                inputs.append(beams[b][j].last_token)
                caches.append(beams[b][j].cache)

    if length(active_indices) == 0:
        break  // 所有样本都已完成

    // 一次性模型前向
    logits_batch, new_caches = ModelStep(inputs, caches, None)  // 维度: [B_active, V]

    // 对每个活跃beam进行温度缩放、top-k、n元阻断,然后log-softmax
    log_probs_batch = Matrix(len(active_indices), V)
    for idx in 0 .. len(active_indices)-1:
        b, j = active_indices[idx]
        logits = logits_batch[idx]
        // 温度缩放
        if temperature > 0:
            logits = logits / temperature
        // top-k过滤
        logits = TopKFilter(logits, top_k)
        // n元阻断
        if no_repeat_ngram_size > 0:
            // 计算基于prompt+当前beam.tokens的序列
            prefix_full = Concat(prompts[b], beams[b][j].tokens)
            banned_mask = NoRepeatNGramMask(prefix_full, no_repeat_ngram_size, V)
            for v in 0 .. V-1:
                if banned_mask[v] == true:
                    logits[v] = -INF
        // 归一化为log概率
        log_probs = LogSoftmax(logits)  // 数值稳定
        log_probs_batch[idx] = log_probs

    // 为每个样本收集候选扩展,并选出下一个step的beam
    new_beams = List[batch_size] of empty list
    for b in 0 .. batch_size-1:
        // 汇总该样本的所有候选 (来自其未完成beam)
        candidate_scores = []  // (score, from_beam_j, token_id, new_sum_logprob, new_cache_idx)
        // 记录活跃beam在active_indices中的位置
        pos_list = []
        for idx in 0 .. len(active_indices)-1:
            bb, jj = active_indices[idx]
            if bb == b:
                pos_list.append((idx, jj))

        // 为该样本每个活跃beam枚举所有词汇的扩展(可只枚举非-INF部分)
        for (idx, jj) in pos_list:
            if beams[b][jj].finished:
                continue
            log_probs = log_probs_batch[idx]  // [V]
            for v in 0 .. V-1:
                if log_probs[v] == -INF:
                    continue
                new_sum = beams[b][jj].sum_logprob + log_probs[v]
                new_len = beams[b][jj].length + 1
                lp = LengthPenalty(new_len, alpha)
                new_score = new_sum / lp
                candidate_scores.append((new_score, jj, v, new_sum, idx))

        // 将EOS扩展放入完成列表,不进入下一步beam;非EOS扩展用于下一步beam选取
        // 拆分候选
        eos_candidates = []
        cont_candidates = []
        for c in candidate_scores:
            (sc, from_j, tok, new_sum, idx) = c
            if tok == eos_token_id:
                eos_candidates.append(c)
            else:
                cont_candidates.append(c)

        // 将EOS候选加入完成队列(按score排序)
        for c in eos_candidates:
            (sc, from_j, tok, new_sum, idx) = c
            // 生成完整tokens:旧tokens + [EOS]
            seq_tokens = Copy(beams[b][from_j].tokens)
            seq_tokens.append(tok)
            hyp = BeamState(tokens=seq_tokens,
                            last_token=tok,
                            sum_logprob=new_sum,
                            length=beams[b][from_j].length + 1,
                            cache=null, finished=true, score=sc)
            finished_lists[b].push(hyp)

        // 从非EOS候选中选出top beam_size个用于下一步
        // 注意:若非EOS候选少于beam_size,用EOS结束的候选填充将导致后续无活跃beam,依赖外层循环终止
        cont_candidates = top_k_tuples_by_first_desc(cont_candidates, beam_size)

        // 组装new_beams[b]
        for rank in 0 .. length(cont_candidates)-1:
            (sc, from_j, tok, new_sum, idx) = cont_candidates[rank]
            old = beams[b][from_j]
            new_beam = BeamState(tokens=Copy(old.tokens),
                                 last_token=tok,
                                 sum_logprob=new_sum,
                                 length=old.length + 1,
                                 cache=new_caches[idx],
                                 finished=false,
                                 score=sc)
            new_beam.tokens.append(tok)
            new_beams[b].append(new_beam)

        // 若不足beam_size,补齐占位(将其标为finished以加速结束)
        while length(new_beams[b]) < beam_size:
            // 尝试从当前最佳EOS候选复制一个“冻结beam”
            if length(eos_candidates) > 0:
                best_eos = argmax_by_first(eos_candidates)
                (sc, from_j, tok, new_sum, idx) = best_eos
                old = beams[b][from_j]
                frozen = BeamState(tokens=Copy(old.tokens), last_token=tok,
                                   sum_logprob=new_sum, length=old.length + 1,
                                   cache=null, finished=true, score=sc)
                frozen.tokens.append(tok)
                new_beams[b].append(frozen)
            else:
                // 没有可用候选时,放入一个PAD占位(不会再被使用)
                pad_beam = BeamState(tokens=[], last_token=pad_token_id,
                                     sum_logprob=-INF, length=0, cache=null,
                                     finished=true, score=-INF)
                new_beams[b].append(pad_beam)

    // 提前终止判断(每个样本独立)
    // 若已收集到n_best个完成候选,并且继续扩展的最佳上界不可能超过已完成的最差分数,则早停
    all_done = true
    for b in 0 .. batch_size-1:
        beams[b] = new_beams[b]  // 更新beam
        // 判断是否还有未完成beam
        sample_has_active = false
        best_active_upper = -INF
        for j in 0 .. beam_size-1:
            if not beams[b][j].finished:
                sample_has_active = true
                // 上界:当前score即为上界(未来对数增量≤0,且长度惩罚非减)
                upper = beams[b][j].sum_logprob / LengthPenalty(beams[b][j].length, alpha == 0 ? 0 : alpha)
                if upper > best_active_upper:
                    best_active_upper = upper
        if not sample_has_active:
            // 无活跃beam,认为此样本已完成
            // 若还没有n_best个完成候选,则后续在收尾阶段补齐
            continue

        all_done = false
        if early_stop and finished_lists[b].size() >= n_best:
            // 已完成候选中的第N名分数
            worst_needed = kth_score_desc(finished_lists[b], n_best)
            if best_active_upper <= worst_needed:
                // 将所有活跃beam丢弃,标记此样本完成
                for j in 0 .. beam_size-1:
                    beams[b][j].finished = true

    if all_done:
        break

// 收尾:若完成候选不足n_best,用当前活跃beam按score补足
results = List[batch_size] of empty list
for b in 0 .. batch_size-1:
    temp_list = []
    // 先取完成候选
    while finished_lists[b].size() > 0:
        temp_list.append(finished_lists[b].pop()) // 已按score降序
    // 不足则从活跃beam补足(按当前score排序)
    if length(temp_list) < n_best:
        active_sorted = sort_desc_by(beams[b], key=beam.score)
        for t in 0 .. min(n_best - length(temp_list), length(active_sorted)) - 1:
            temp_list.append(active_sorted[t])
    // 只保留前n_best
    temp_list = temp_list[0 : min(n_best, length(temp_list))]
    // tokens前可选择是否拼接prompt(依任务需求)
    results[b] = temp_list

return results
  • 关键参数说明

  • beam_size B: 每步保留的候选数,B越大搜索越广但计算更大。

  • n_best N: 返回的候选数,通常N ≤ B。

  • alpha: 长度惩罚强度,α=0时无惩罚;常用0.6左右。采用GNMT形式 lp(L)=((5+L)^α)/(6^α)。

  • no_repeat_ngram_size n: n元重复阻断大小;n=2为常用(阻断重复bigram)。

  • temperature τ: 温度缩放,τ<1使分布更尖锐,τ>1更平坦。

  • top_k k: 每步仅保留概率最高的k个token参与归一化与扩展,k减小可降计算并增多样性;k=V表示不裁剪。

  • early_stop: 启用EOS早停;当已收集到N个完成候选且活跃beam的最优上界不可能超过已完成的第N名分数时,提前终止该样本的搜索。

  • 算法复杂度分析

  • 时间复杂度(单样本): O(T · B · V)

    • 每步对B个beam计算V维logits并做log-softmax/top-k与阻断;若top-k生效且实现为截断,则有效复杂度近似为O(T · B · k)。
  • 批量下总体: O(batch_size · T · B · V)。

  • 空间复杂度(单样本):

    • 序列缓存:O(B · T)(存tokens与n-gram索引)
    • 模型状态缓存:O(B · CacheSize)
      • Transformer解码器KV缓存:O(Layers · Heads · B · T · d_head)
      • RNN隐藏态:O(Layers · B · d_hidden)
    • 临时logits/log_probs:O(B · V)
  • 关键状态缓存

    • 编码器输出(encoder memory):每样本一次性缓存,供所有beam共享。
    • 解码器增量缓存(KV缓存/隐藏态):每个beam一份,随步数累积;扩展到新beam时复制或引用增量更新的缓存。
    • Beam层面缓存:
      • sum_logprob、length、last_token、finished标志
      • tokens(或环形缓冲用于尾部n-gram计算)
      • n元阻断索引(可按prefix映射到被禁token集合,或按需即时构建)
  • 适用场景说明

  • 机器翻译、文本生成、摘要、对话、语音识别等自回归序列生成。

  • 需要在质量与速度之间折中(通过B、k、τ、α等调参),并避免重复模式(n元阻断)。

  • 批量生产环境推理与服务端在线推理(依赖增量缓存高效迭代)。

  • 自检用例(可复现实验) 目的:验证批量、长度惩罚、n元阻断、温度与top-k、EOS早停与N-best排序是否生效。

构造玩具模型

  • 词表V=5: {BOS=0, A=1, B=2, C=3, EOS=4}
  • 模型规则(确定性logits,方便计算):
    • 若上一个token是BOS(0): logits = [ -10, 3.0, 2.0, 1.0, 0.5 ] // A最优,其次B、C、EOS
    • 若上一个token是A(1): logits = [ -10, 0.5, 1.0, 0.0, 2.5 ]
    • 若上一个token是B(2): logits = [ -10, 1.5, 0.5, 0.0, 1.0 ]
    • 若上一个token是C(3): logits = [ -10, 0.0, 1.0, 0.5, 2.0 ]
    • 若上一个token是EOS(4): logits = [ -INF, -INF, -INF, -INF, -INF ] // 不再扩展
  • ModelStep实现:读取inputs[i]→按上表返回logits[i], new_cache原样返回。

测试1:基本功能与长度惩罚

  • 参数:batch_size=1, prompts=[[BOS]], B=3, N=2, α=0.6, n=0, τ=1.0, k=5, max_steps=4, early_stop=true
  • 期望:
    • 第一步优先扩展A、B、C
    • 后续倾向尽早生成EOS,但长度惩罚抑制过短句偏好(α>0)
    • 返回前2个候选,含tokens、sum_logprob、score,且score为降序

测试2:top-k与温度

  • 参数:同测试1,但τ=0.7, k=2
  • 期望:每步仅保留top-2(例如从BOS只保留A、B),生成多样性受τ影响;结果不含被裁掉的token扩展。

测试3:no-repeat bigram

  • 参数:batch_size=1, prompts=[[BOS, A]], B=3, N=2, α=0.0, n=2, τ=1, k=5, max_steps=5
  • 期望:若历史出现过某bigram,则阻断再次出现同bigram的后续token;验证NoRepeatNGramMask对当前prefix生效。

测试4:批量与EOS早停

  • 参数:batch_size=2, prompts=[[BOS], [BOS, A]], B=2, N=1, α=0.6, n=0, τ=1, k=5, max_steps=5, early_stop=true
  • 期望:两个样本独立早停;当已获得N个完成候选且活跃beam上界不超过已完成最差分时,提前结束该样本。

可选断言

  • 对于任意候选:score = sum_logprob / LengthPenalty(length, α)
  • 对于每步:每样本的活跃beam数≤B
  • 被n元阻断的token其log_probs应为-INF
  • 若early_stop触发:best_active_upper ≤ 第N个完成候选的分数

备注与实现提示

  • 为避免对所有V做log_softmax,可在top-k后仅对k个保留项归一化,并将其余置为-INF。
  • 为加速n元阻断,可维护哈希:key为(n-1)-gram前缀,value为其后续token集合;每步仅针对当前前缀查询而非重扫历史。
  • 对缓存复制(cache clone)可采用“写时复制”(copy-on-write)或结构化共享,避免大规模内存拷贝。
  • 若需要返回不含EOS的序列,可在收尾阶段去除尾部EOS但保留其贡献的对数概率与长度惩罚计算。

示例详情

解决的问题

把算法想法在最短时间内转化为“可讨论、可评审、可复现”的高质量伪代码方案。面向算法原型验证、模型改进、技术评审与教学演示,帮助用户基于算法名称与业务诉求,快速产出结构清晰、逻辑严谨、标准统一的伪代码与说明;内置任务分解与自检机制,减少沟通误差与实现偏差;自动补充关键参数与复杂度信息,提升评审效率与复现成功率;支持多语言输出与模板化复用,缩短交付周期,促进从试用到正式落地与团队采购的决策。

适用用户

AI/ML工程师

把需求快速转化为可执行的伪代码蓝图,明确输入输出与模块边界;在重构或迁移时先梳理流程,作为评审与编码依据。

算法研究员与博士生

将论文算法用标准伪代码复现,附带复杂度与参数说明;比较不同变体的关键差异,为实验与投稿准备清晰说明。

数据科学家

快速产出特征、训练、评估的端到端伪代码;把业务目标映射为算法流程,便于与工程同事对齐并落地。

特征总结

一键生成结构清晰的算法伪代码,自动标注输入输出与关键步骤,便于直接落地实现。
依据需求自动拆解任务,细化流程与模块边界,减少沟通成本,让团队快速对齐实现思路。
同步给出关键参数说明与取值建议,附带复杂度评估,帮助平衡精度、速度与资源消耗。
支持多场景调用:原型验证、论文复现、代码重构、教学演示,统一格式便于复用与沉淀。
自动润色术语与注释,避免冗余与歧义,确保跨部门阅读无障碍,提升文档可读性。
可按指定语言输出伪代码与说明,满足中英双语或本地规范,轻松融入现有文档体系。
为不确定需求提供澄清提问与边界假设,降低误解风险,保证生成结果可执行可验证。
内置规范校验与逻辑自检,提前发现缺漏与冲突,减少返工次数,缩短研发周期。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 508 tokens
- 3 个可调节参数
{ 算法名称 } { 具体需求 } { 输出语言 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59