代码智能补全

416 浏览
41 试用
10 购买
Nov 24, 2025更新

智能识别用户提供的代码片段和业务目标,自动补全生成完整、可运行的代码方案,支持多语言、多场景开发任务,提升开发效率并减少调试时间,适用于快速原型和功能实现。

下面给出一个可直接运行的完整示例,包括主服务代码、启动脚本(内置 uvicorn 运行入口)、结构化日志与中间件、异常与输入校验、requirements、Dockerfile、示例请求和单元测试。

文件:main.py 说明:FastAPI 文本分析服务,提供 /health 与 /analyze,包含结构化日志 JSONFormatter、中间件请求日志、异常处理与输入校验,并附带 uvicorn 运行入口。

# main.py
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from pydantic import BaseModel, Field
import logging
import json
import os
import re
import time
import uuid
from datetime import datetime, timezone

# -------------------------
# 基本配置与常量
# -------------------------
APP_NAME = "text-analyze-service"
APP_VERSION = os.getenv("APP_VERSION", "1.0.0")
MAX_TEXT_LEN = int(os.getenv("MAX_TEXT_LEN", "100000"))  # 业务逻辑限制:最大文本长度(去除首尾空白后)
START_TIME = time.time()

# -------------------------
# 结构化日志配置(JSON)
# -------------------------
class JSONFormatter(logging.Formatter):
    """简单的 JSON 日志格式化器,以结构化形式输出日志,便于收集与检索。"""
    def format(self, record: logging.LogRecord) -> str:
        base = {
            "time": datetime.now(timezone.utc).isoformat(),
            "level": record.levelname,
            "logger": record.name,
        }
        # 将 msg 作为事件名称
        base["event"] = record.getMessage()

        # 选取我们关心的扩展字段
        for key in [
            "request_id", "path", "method", "status_code", "duration_ms",
            "chars", "lines", "unique_words", "detail"
        ]:
            if hasattr(record, key):
                base[key] = getattr(record, key)

        # 如果有异常,记录堆栈信息
        if record.exc_info:
            base["exc_info"] = self.formatException(record.exc_info)

        return json.dumps(base, ensure_ascii=False)

logger = logging.getLogger(APP_NAME)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.handlers = [handler]
logger.propagate = False

# -------------------------
# FastAPI 应用与中间件
# -------------------------
app = FastAPI(title=APP_NAME, version=APP_VERSION)

@app.middleware("http")
async def logging_middleware(request: Request, call_next):
    """请求级中间件:生成/透传 request_id,记录耗时与响应码。"""
    request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
    start = time.perf_counter()
    try:
        response = await call_next(request)
        duration_ms = round((time.perf_counter() - start) * 1000, 2)
        response.headers["X-Request-ID"] = request_id
        logger.info(
            "request_completed",
            extra={
                "request_id": request_id,
                "path": request.url.path,
                "method": request.method,
                "status_code": response.status_code,
                "duration_ms": duration_ms,
            },
        )
        return response
    except Exception:
        duration_ms = round((time.perf_counter() - start) * 1000, 2)
        logger.exception(
            "request_failed",
            extra={
                "request_id": request_id,
                "path": request.url.path,
                "method": request.method,
                "duration_ms": duration_ms,
            },
        )
        # 继续抛出异常,由统一异常处理器接管
        raise

# -------------------------
# 数据模型
# -------------------------
class AnalyzeReq(BaseModel):
    # 不在 Pydantic 中直接限制长度,以便业务逻辑能做更细的处理(trim 后判断长度/空文本)。
    text: str = Field(..., description="待分析的文本")

class AnalyzeResp(BaseModel):
    chars: int
    lines: int
    unique_words: int

# -------------------------
# 异常处理
# -------------------------
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
    """统一 HTTP 异常返回结构。"""
    request_id = request.headers.get("X-Request-ID")
    logger.warning(
        "http_error",
        extra={
            "request_id": request_id,
            "path": request.url.path,
            "status_code": exc.status_code,
            "detail": exc.detail,
        },
    )
    return JSONResponse(
        status_code=exc.status_code,
        content={"error": {"code": exc.status_code, "detail": exc.detail}},
    )

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    """请求体验证异常(例如字段缺失或类型错误)。"""
    request_id = request.headers.get("X-Request-ID")
    logger.warning(
        "validation_error",
        extra={
            "request_id": request_id,
            "path": request.url.path,
            "status_code": 422,
            "detail": exc.errors(),
        },
    )
    return JSONResponse(
        status_code=422,
        content={"error": {"code": 422, "detail": exc.errors()}},
    )

@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
    """兜底异常处理,避免未捕获错误泄露。"""
    request_id = request.headers.get("X-Request-ID")
    logger.exception(
        "unhandled_error",
        extra={"request_id": request_id, "path": request.url.path, "detail": str(exc)},
    )
    return JSONResponse(
        status_code=500,
        content={"error": {"code": 500, "detail": "Internal server error"}},
    )

# -------------------------
# 路由
# -------------------------
@app.get("/health")
def health():
    """健康检查接口,返回服务状态与版本信息。"""
    uptime = round(time.time() - START_TIME, 2)
    return {
        "status": "ok",
        "version": APP_VERSION,
        "name": APP_NAME,
        "uptime_seconds": uptime,
    }

_word_pattern = re.compile(r"\b\w+\b", flags=re.UNICODE)

@app.post("/analyze", response_model=AnalyzeResp)
def analyze(req: AnalyzeReq, request: Request):
    """
    文本分析:
    - 去除首尾空白后进行统计
    - 统计字符数(包含空格与换行符;由于 strip,首尾空白不计入)
    - 统计行数(按 splitlines 计算)
    - 使用 Unicode 单词边界提取词,大小写不敏感,统计唯一词数
    - 空文本处理:返回 400
    - 超长文本处理:返回 413
    """
    request_id = request.headers.get("X-Request-ID")

    # 去除首尾空白
    text = (req.text or "").strip()

    # 空文本处理
    if not text:
        raise HTTPException(status_code=400, detail="Text is empty after trimming")

    # 超长文本处理(业务逻辑限制)
    if len(text) > MAX_TEXT_LEN:
        raise HTTPException(
            status_code=413,
            detail=f"Text is too long (>{MAX_TEXT_LEN} chars after trimming)",
        )

    # 字符数(采用去除首尾空白后的长度)
    chars = len(text)

    # 行数:splitlines 会正确处理不同平台换行符
    lines = len(text.splitlines()) if text else 0

    # 唯一词数:按 Unicode 单词边界匹配,忽略大小写
    words = _word_pattern.findall(text.lower())
    unique_words = len(set(words))

    logger.info(
        "analyze_success",
        extra={
            "request_id": request_id,
            "chars": chars,
            "lines": lines,
            "unique_words": unique_words,
        },
    )

    return AnalyzeResp(chars=chars, lines=lines, unique_words=unique_words)

# -------------------------
# 运行入口(启动脚本)
# -------------------------
if __name__ == "__main__":
    # 允许通过环境变量配置端口与是否热重载(开发模式)
    import uvicorn
    port = int(os.getenv("PORT", "8000"))
    reload = os.getenv("RELOAD", "false").lower() in ("1", "true", "yes")
    uvicorn.run("main:app", host="0.0.0.0", port=port, reload=reload)

文件:requirements.txt

fastapi>=0.110
uvicorn[standard]>=0.23
pydantic>=2.5
pytest>=7.0
httpx>=0.24

文件:Dockerfile

# Dockerfile
FROM python:3.11-slim

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

EXPOSE 8000

# 默认使用 uvicorn 启动服务
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

文件:README.md

# 文本分析服务

基于 FastAPI 的简单文本分析服务,提供:
- POST /analyze:返回字符数、行数、唯一词数
- GET /health:健康检查
- 结构化日志(JSON 格式)、请求中间件(记录耗时与 request_id)
- 输入校验:空文本返回 400;超长文本(默认 >100000)返回 413
- 统一异常处理

## 本地运行

```bash
# 安装依赖
pip install -r requirements.txt

# 启动服务(默认 8000 端口)
python main.py

# 可选环境变量
# APP_VERSION=1.0.0 MAX_TEXT_LEN=200000 PORT=8000 RELOAD=true

Docker 运行

docker build -t text-analyze-service .
docker run -p 8000:8000 -e APP_VERSION=1.0.0 text-analyze-service

示例请求

  • 健康检查
curl -s http://localhost:8000/health
  • 文本分析
curl -s -X POST http://localhost:8000/analyze \
  -H 'Content-Type: application/json' \
  -H 'X-Request-ID: demo-123' \
  -d '{"text": "Hello world\nhello"}'
# 响应示例:{"chars":17,"lines":2,"unique_words":2}
  • Python 示例
import requests
resp = requests.post("http://localhost:8000/analyze", json={"text": "你好 世界\n世界"})
print(resp.json())  # {'chars': 8, 'lines': 2, 'unique_words': 2}

日志示例(JSON)

{"time":"2024-01-01T00:00:00+00:00","level":"INFO","logger":"text-analyze-service","event":"request_completed","request_id":"demo-123","path":"/analyze","method":"POST","status_code":200,"duration_ms":1.23}
{"time":"2024-01-01T00:00:00+00:00","level":"INFO","logger":"text-analyze-service","event":"analyze_success","request_id":"demo-123","chars":17,"lines":2,"unique_words":2}

单元测试

运行:

pytest -q

文件:tests/test_app.py
```python
# tests/test_app.py
import pytest
from fastapi.testclient import TestClient
import main

client = TestClient(main.app)

def test_health():
    r = client.get("/health")
    assert r.status_code == 200
    data = r.json()
    assert data["status"] == "ok"
    assert "version" in data

def test_analyze_basic():
    payload = {"text": "Hello world\nhello"}
    r = client.post("/analyze", json=payload, headers={"X-Request-ID": "test-req-1"})
    assert r.status_code == 200
    data = r.json()
    assert data["chars"] == 17  # "Hello world"=11 + "\n"=1 + "hello"=5
    assert data["lines"] == 2
    assert data["unique_words"] == 2  # hello, world

def test_analyze_chinese():
    payload = {"text": "你好 世界\n世界"}
    r = client.post("/analyze", json=payload)
    assert r.status_code == 200
    data = r.json()
    assert data["chars"] == 8   # 2 + 1 + 2 + 1 + 2
    assert data["lines"] == 2
    assert data["unique_words"] == 2  # 你好, 世界

def test_analyze_empty_after_trim():
    payload = {"text": "   \n   "}
    r = client.post("/analyze", json=payload)
    assert r.status_code == 400
    data = r.json()
    assert data["error"]["detail"] == "Text is empty after trimming"

def test_analyze_too_long():
    long_text = "a" * (main.MAX_TEXT_LEN + 1)
    r = client.post("/analyze", json={"text": long_text})
    assert r.status_code == 413
    data = r.json()
    assert "too long" in data["error"]["detail"]

def test_bad_payload_validation():
    # 缺少 text 字段 -> 422
    r = client.post("/analyze", json={})
    assert r.status_code == 422
    data = r.json()
    assert data["error"]["code"] == 422

说明与设计要点:

  • 输入校验:
    • req.text 取值后进行 strip,再判断空文本并返回 400;
    • strip 后长度超出 MAX_TEXT_LEN 返回 413(可通过环境变量 MAX_TEXT_LEN 调整)。
  • 唯一词数:
    • 使用 Unicode 单词边界(\b\w+\b)提取,统一转换为小写后去重。
    • 对中文也能基本工作(按词/连续字母数字下划线序列匹配)。
  • 结构化日志:
    • 采用自定义 JSONFormatter 输出标准化的字段,便于采集与分析。
    • 中间件记录每个请求的 request_id、耗时、路径、方法、状态码。
  • 异常处理:
    • 统一 HTTPException、RequestValidationError 与兜底 Exception 响应结构。
  • 启动脚本:
    • main.py 直接可运行,或通过 Docker 运行。
  • 单元测试:
    • 覆盖健康检查、正常分析、中文场景、空文本、超长文本、请求体验证错误。

下面给出一个完整的可复用实现,包含:

  • RegisterForm 组件(带字段校验、节流防重复提交、加载与错误/成功提示、样式与可访问性)
  • useForm Hook(可复用的通用表单管理)
  • 基于 Jest + Testing Library 的测试用例

代码包含必要的注释,方便后续维护与扩展。

// RegisterForm.jsx import React, { useRef, useState } from 'react';

/**

  • 通用 useForm Hook
    • 管理 values/errors/touched/loading
    • 字段级与表单级校验
    • 节流防重复提交
    • 暴露可复用的表单操作 */ export function useForm({ initialValues, validators = {}, onSubmit, validateOnChange = true, validateOnBlur = true, throttleMs = 1500, }) { const [values, setValues] = useState(initialValues || {}); const [errors, setErrors] = useState({}); const [touched, setTouched] = useState({}); const [loading, setLoading] = useState(false); const [submitError, setSubmitError] = useState(''); const [submitSuccess, setSubmitSuccess] = useState(''); const lastSubmitTimeRef = useRef(0);

// 计算当前值的所有错误 function computeErrors(vs) { const nextErrors = {}; for (const name in validators) { const fn = validators[name]; if (typeof fn === 'function') { const msg = fn(vs[name], vs); if (msg) nextErrors[name] = msg; } } return nextErrors; }

const isThrottled = () => Date.now() - lastSubmitTimeRef.current < throttleMs;

const isValid = Object.keys(computeErrors(values)).length === 0; const canSubmit = isValid && !loading && !isThrottled();

function handleChange(e) { const { name, value } = e.target; setValues(prev => { const vs = { ...prev, [name]: value }; if (validateOnChange) { setErrors(computeErrors(vs)); } return vs; }); }

function handleBlur(e) { const { name } = e.target; setTouched(prev => ({ ...prev, [name]: true })); if (validateOnBlur) { setErrors(prev => { const next = computeErrors(values); return next; }); } }

function resetForm() { setValues(initialValues || {}); setErrors({}); setTouched({}); setSubmitError(''); setSubmitSuccess(''); }

async function handleSubmit(e) { e.preventDefault(); setSubmitError(''); setSubmitSuccess('');

// 节流防重复提交
if (loading || isThrottled()) {
  setSubmitError('提交过于频繁,请稍后再试');
  return;
}

const nextErrors = computeErrors(values);
setErrors(nextErrors);

if (Object.keys(nextErrors).length > 0) {
  setSubmitError('请修正表单错误后再提交');
  return;
}

lastSubmitTimeRef.current = Date.now();
setLoading(true);

try {
  await onSubmit(values, { resetForm, setSubmitError, setSubmitSuccess });
} catch (err) {
  setSubmitError(err?.message || '提交失败,请稍后重试');
} finally {
  setLoading(false);
}

}

return { values, errors, touched, loading, submitError, submitSuccess, canSubmit, isValid, isThrottled: isThrottled(), handleChange, handleBlur, handleSubmit, resetForm, setSubmitError, setSubmitSuccess, setValues, }; }

// 简单样式(可替换为 CSS/SCSS 或 Tailwind) const styles = { form: { maxWidth: 400, margin: '24px auto', padding: 16, border: '1px solid #e5e7eb', borderRadius: 8, fontFamily: 'system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial', background: '#fff', }, label: { display: 'block', fontSize: 14, marginBottom: 6, color: '#374151' }, input: { width: '100%', padding: '8px 10px', marginBottom: 8, border: '1px solid #d1d5db', borderRadius: 6, fontSize: 14, outline: 'none', }, inputInvalid: { borderColor: '#ef4444', background: '#fff7f7' }, help: { fontSize: 12, color: '#6b7280', marginBottom: 8 }, errorText: { fontSize: 12, color: '#ef4444', marginBottom: 8 }, successText: { fontSize: 13, color: '#10b981', marginBottom: 8 }, button: { width: '100%', padding: '10px 12px', fontSize: 15, border: 'none', borderRadius: 6, background: '#3b82f6', color: '#fff', cursor: 'pointer', }, buttonDisabled: { background: '#93c5fd', cursor: 'not-allowed' }, };

/**

  • 字段校验规则:
    • 用户名:必填,3-20,字母数字下划线和中划线
    • 邮箱:必填,基本格式校验
    • 密码:必填,≥8,包含大小写字母、数字、特殊字符 / const validators = { username: v => { if (!v || !v.trim()) return '用户名为必填项'; if (v.length < 3 || v.length > 20) return '用户名长度需在 3-20 个字符之间'; if (!/^[A-Za-z0-9_-]+$/.test(v)) return '用户名仅允许字母、数字、下划线或中划线'; return ''; }, email: v => { if (!v || !v.trim()) return '邮箱为必填项'; // 简化邮箱校验:足够应对常见情况 if (!/^[^\s@]+@[^\s@]+.[^\s@]+$/.test(v)) return '邮箱格式不正确'; return ''; }, password: v => { if (!v) return '密码为必填项'; if (v.length < 8) return '密码长度至少 8 位'; if (!/[a-z]/.test(v)) return '密码需包含小写字母'; if (!/[A-Z]/.test(v)) return '密码需包含大写字母'; if (!/[0-9]/.test(v)) return '密码需包含数字'; if (!/[!@#$%^&()_+-=[]{};':"\|,.<>/?`~]/.test(v)) return '密码需包含特殊字符'; return ''; }, };

export default function RegisterForm() { const { values, errors, touched, loading, submitError, submitSuccess, canSubmit, isValid, handleChange, handleBlur, handleSubmit, resetForm, setSubmitError, setSubmitSuccess, } = useForm({ initialValues: { username: '', email: '', password: '' }, validators, throttleMs: 1500, onSubmit: async (formValues, helpers) => { try { const res = await fetch('/api/register', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(formValues), });

    // 解析响应
    let data = null;
    try {
      data = await res.json();
    } catch {
      // 响应非 JSON 时忽略
    }

    if (!res.ok) {
      const msg =
        data?.error ||
        data?.message ||
        `注册失败(${res.status})`;
      helpers.setSubmitError(msg);
      return;
    }

    // 成功:重置表单并给出提示
    helpers.setSubmitSuccess('注册成功!');
    helpers.resetForm();
  } catch (err) {
    helpers.setSubmitError(err?.message || '网络错误,请稍后重试');
  }
},

});

// 提供一个清理提示的按钮(可选) function clearMessages() { setSubmitError(''); setSubmitSuccess(''); }

return (

<h2 style={{ marginBottom: 12, fontSize: 18 }}>用户注册

  <label htmlFor="username" style={styles.label}>用户名</label>
  <input
    id="username"
    name="username"
    value={values.username}
    onChange={handleChange}
    onBlur={handleBlur}
    placeholder="请输入用户名(3-20长度)"
    aria-invalid={Boolean(errors.username && touched.username)}
    aria-describedby="username-error username-help"
    required
    style={{
      ...styles.input,
      ...(errors.username && touched.username ? styles.inputInvalid : {}),
    }}
  />
  <div id="username-help" style={styles.help}>
    允许字母、数字、下划线与中划线
  </div>
  {errors.username && touched.username && (
    <div id="username-error" role="alert" style={styles.errorText}>
      {errors.username}
    </div>
  )}

  <label htmlFor="email" style={styles.label}>邮箱</label>
  <input
    id="email"
    name="email"
    type="email"
    value={values.email}
    onChange={handleChange}
    onBlur={handleBlur}
    placeholder="you@example.com"
    aria-invalid={Boolean(errors.email && touched.email)}
    aria-describedby="email-error"
    required
    style={{
      ...styles.input,
      ...(errors.email && touched.email ? styles.inputInvalid : {}),
    }}
  />
  {errors.email && touched.email && (
    <div id="email-error" role="alert" style={styles.errorText}>
      {errors.email}
    </div>
  )}

  <label htmlFor="password" style={styles.label}>密码</label>
  <input
    id="password"
    name="password"
    type="password"
    value={values.password}
    onChange={handleChange}
    onBlur={handleBlur}
    placeholder="至少8位,包含大小写、数字与特殊字符"
    aria-invalid={Boolean(errors.password && touched.password)}
    aria-describedby="password-error password-help"
    required
    style={{
      ...styles.input,
      ...(errors.password && touched.password ? styles.inputInvalid : {}),
    }}
  />
  <div id="password-help" style={styles.help}>
    示例:Aa123456!
  </div>
  {errors.password && touched.password && (
    <div id="password-error" role="alert" style={styles.errorText}>
      {errors.password}
    </div>
  )}

  {submitError && (
    <div role="alert" data-testid="error-banner" style={styles.errorText}>
      {submitError}
    </div>
  )}
  {submitSuccess && (
    <div role="status" data-testid="success-banner" style={styles.successText}>
      {submitSuccess}
    </div>
  )}

  <button
    type="submit"
    disabled={!canSubmit}
    style={{
      ...styles.button,
      ...(!canSubmit ? styles.buttonDisabled : {}),
      marginTop: 8,
    }}
    aria-busy={loading || undefined}
  >
    {loading ? '提交中…' : '注册'}
  </button>

  {(submitError || submitSuccess) && (
    <button
      type="button"
      onClick={clearMessages}
      style={{ ...styles.button, marginTop: 8, background: '#6b7280' }}
    >
      清除提示
    </button>
  )}

  {/* 为测试方便暴露 data-testid */}
  <div data-testid="is-valid" style={{ display: 'none' }}>{String(isValid)}</div>
  <div data-testid="can-submit" style={{ display: 'none' }}>{String(canSubmit)}</div>
  <div data-testid="loading" style={{ display: 'none' }}>{String(loading)}</div>
</form>

); }

// tests/RegisterForm.test.jsx import React from 'react'; import { render, screen, fireEvent, waitFor } from '@testing-library/react'; import RegisterForm from '../RegisterForm';

describe('RegisterForm', () => { beforeEach(() => { jest.restoreAllMocks(); });

test('显示基础表单控件与禁用状态', () => { render(); expect(screen.getByLabelText('用户名')).toBeInTheDocument(); expect(screen.getByLabelText('邮箱')).toBeInTheDocument(); expect(screen.getByLabelText('密码')).toBeInTheDocument();

const submitBtn = screen.getByRole('button', { name: '注册' });
const canSubmitHidden = screen.getByTestId('can-submit');
expect(submitBtn).toBeDisabled();
expect(canSubmitHidden.textContent).toBe('false');

});

test('输入非法用户名并显示错误', () => { render(); const username = screen.getByLabelText('用户名'); fireEvent.change(username, { target: { value: 'a' } }); fireEvent.blur(username); expect(screen.getByText('用户名长度需在 3-20 个字符之间')).toBeInTheDocument(); });

test('邮箱格式错误提示', () => { render(); const email = screen.getByLabelText('邮箱'); fireEvent.change(email, { target: { value: 'not-an-email' } }); fireEvent.blur(email); expect(screen.getByText('邮箱格式不正确')).toBeInTheDocument(); });

test('密码强度校验提示', () => { render(); const password = screen.getByLabelText('密码'); fireEvent.change(password, { target: { value: 'abc' } }); fireEvent.blur(password); expect(screen.getByText('密码长度至少 8 位')).toBeInTheDocument();

fireEvent.change(password, { target: { value: 'abcdefgh' } });
fireEvent.blur(password);
expect(screen.getByText('密码需包含大写字母')).toBeInTheDocument();

});

test('成功注册:清空表单并显示提示', async () => { jest.spyOn(global, 'fetch').mockResolvedValue({ ok: true, json: async () => ({ message: 'ok' }), });

render(<RegisterForm />);

fireEvent.change(screen.getByLabelText('用户名'), { target: { value: 'User_123' } });
fireEvent.blur(screen.getByLabelText('用户名'));
fireEvent.change(screen.getByLabelText('邮箱'), { target: { value: 'user@example.com' } });
fireEvent.blur(screen.getByLabelText('邮箱'));
fireEvent.change(screen.getByLabelText('密码'), { target: { value: 'Aa123456!' } });
fireEvent.blur(screen.getByLabelText('密码'));

const submitBtn = screen.getByRole('button', { name: '注册' });
expect(submitBtn).not.toBeDisabled();

fireEvent.click(submitBtn);

await waitFor(() => {
  expect(screen.getByTestId('success-banner')).toHaveTextContent('注册成功!');
});

// 成功后应重置表单
expect(screen.getByLabelText('用户名')).toHaveValue('');
expect(screen.getByLabelText('邮箱')).toHaveValue('');
expect(screen.getByLabelText('密码')).toHaveValue('');

});

test('API 错误时显示错误提示', async () => { jest.spyOn(global, 'fetch').mockResolvedValue({ ok: false, status: 400, json: async () => ({ error: '邮箱已存在' }), });

render(<RegisterForm />);

fireEvent.change(screen.getByLabelText('用户名'), { target: { value: 'User_123' } });
fireEvent.change(screen.getByLabelText('邮箱'), { target: { value: 'user@example.com' } });
fireEvent.change(screen.getByLabelText('密码'), { target: { value: 'Aa123456!' } });

fireEvent.click(screen.getByRole('button', { name: '注册' }));

await waitFor(() => {
  expect(screen.getByTestId('error-banner')).toHaveTextContent('邮箱已存在');
});

});

test('节流防重复提交:快速点击只触发一次请求', async () => { const fetchMock = jest.spyOn(global, 'fetch').mockResolvedValue({ ok: true, json: async () => ({ message: 'ok' }), });

render(<RegisterForm />);

fireEvent.change(screen.getByLabelText('用户名'), { target: { value: 'User_123' } });
fireEvent.change(screen.getByLabelText('邮箱'), { target: { value: 'user@example.com' } });
fireEvent.change(screen.getByLabelText('密码'), { target: { value: 'Aa123456!' } });

const submitBtn = screen.getByRole('button', { name: '注册' });
fireEvent.click(submitBtn);
fireEvent.click(submitBtn); // 快速第二次点击

await waitFor(() => {
  expect(fetchMock).toHaveBeenCalledTimes(1);
});

}); });

说明与要点:

  • useForm Hook 封装了表单值、错误、触摸态、loading、节流与提交逻辑,validators 接收字段值并返回错误消息字符串或空字符串。
  • canSubmit 同时依赖 isValid、loading 与节流状态,确保按钮在不满足条件时禁用。
  • handleSubmit 内部先做全量校验,再执行节流检查,最后调用 onSubmit。onSubmit 中处理 /api/register 响应,成功后重置并提示;失败显示错误信息。
  • 简单内联样式与 ARIA 属性确保基本可访问性与提示清晰。
  • 测试覆盖:渲染、校验提示、成功重置、API 错误、节流防重复提交。

go.mod module github.com/example/concurrent-fetch

go 1.21

main.go package main

import ( "context" "encoding/json" "flag" "fmt" "os" "os/signal" "path/filepath" "sort" "strings" "syscall" "time" )

type URLResult struct { URL string json:"url" Bytes int json:"bytes" Headings int json:"headings" Attempts int json:"attempts" Cached bool json:"cached" DurationMS int64 json:"duration_ms" Error string json:"error,omitempty" }

type Totals struct { URLs int json:"urls" Successes int json:"successes" Failures int json:"failures" Bytes int json:"bytes" Headings int json:"headings" }

type Meta struct { Workers int json:"workers" Timeout string json:"timeout" Retries int json:"retries" Backoff string json:"backoff" CacheDir string json:"cache_dir" CacheTTL string json:"cache_ttl" Started string json:"started" Duration string json:"duration" }

type Summary struct { Results []URLResult json:"results" Totals Totals json:"totals" Meta Meta json:"meta" }

// Example usage: // go run ./cmd -workers=8 -timeout=4s -retries=2 -backoff=300ms https://example.com https://golang.org // go run . -urls-file=urls.txt func main() { workers := flag.Int("workers", 4, "number of concurrent workers") timeout := flag.Duration("timeout", 5time.Second, "per-request timeout") retries := flag.Int("retries", 2, "retries on transient errors") backoff := flag.Duration("backoff", 300time.Millisecond, "base backoff between retries") cacheDir := flag.String("cache-dir", "", "cache directory (default: $TMP/concurrent-fetch-cache)") cacheTTL := flag.Duration("cache-ttl", 10*time.Minute, "use cached content if not older than this duration") urlsFile := flag.String("urls-file", "", "path to file with URLs (one per line); if empty, takes URLs from args") logLevel := flag.String("log-level", "info", "log level: debug, info, warn, error") flag.Parse()

urls := []string{}
if *urlsFile != "" {
	b, err := os.ReadFile(*urlsFile)
	if err != nil {
		fmt.Fprintf(os.Stderr, "error reading urls file: %v\n", err)
		os.Exit(1)
	}
	for _, l := range strings.Split(string(b), "\n") {
		l = strings.TrimSpace(l)
		if l != "" && !strings.HasPrefix(l, "#") {
			urls = append(urls, l)
		}
	}
} else if flag.NArg() > 0 {
	urls = flag.Args()
} else {
	urls = []string{
		"https://example.com",
		"https://example.com/",
	}
}

if len(urls) == 0 {
	fmt.Fprintln(os.Stderr, "no URLs provided")
	os.Exit(1)
}

if *cacheDir == "" {
	*cacheDir = filepath.Join(os.TempDir(), "concurrent-fetch-cache")
}

logger := NewLogger(os.Stderr, ParseLevel(*logLevel))
logger.Info("starting", Fields{
	"workers":   *workers,
	"timeout":   timeout.String(),
	"retries":   *retries,
	"backoff":   backoff.String(),
	"cache_dir": *cacheDir,
	"cache_ttl": cacheTTL.String(),
	"urls":      len(urls),
})

if err := os.MkdirAll(*cacheDir, 0o755); err != nil {
	logger.Error("failed to create cache dir", Fields{"err": err.Error(), "dir": *cacheDir})
	os.Exit(1)
}

cache := NewFileCache(*cacheDir)
fetcher := &Fetcher{
	Timeout: *timeout,
	Retries: *retries,
	Backoff: *backoff,
	Cache:   cache,
	TTL:     *cacheTTL,
	Logger:  logger,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sigCh := make(chan os.Signal, 2)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
go func() {
	s := <-sigCh
	logger.Warn("signal received, shutting down", Fields{"signal": s.String()})
	cancel()
	// Second signal forces exit
	s = <-sigCh
	logger.Error("second signal received, forcing exit", Fields{"signal": s.String()})
	os.Exit(2)
}()

start := time.Now()

jobs := make(chan Job)
resultsCh := make(chan URLResult)

// Start workers
go func() {
	RunWorkers(ctx, *workers, jobs, func(ctx context.Context, j Job) URLResult {
		t0 := time.Now()
		data, attempts, cached, err := fetcher.Fetch(ctx, j.URL)
		res := URLResult{
			URL:        j.URL,
			Attempts:   attempts,
			Cached:     cached,
			DurationMS: time.Since(t0).Milliseconds(),
		}
		if err != nil {
			res.Error = err.Error()
			return res
		}
		res.Bytes = len(data)
		res.Headings = CountHeadings(data)
		return res
	}, resultsCh)
}()

// Feed jobs
go func() {
	defer close(jobs)
	for _, u := range urls {
		select {
		case <-ctx.Done():
			return
		case jobs <- Job{URL: strings.TrimSpace(u)}:
		}
	}
}()

// Collect results
collected := make([]URLResult, 0, len(urls))
for r := range resultsCh {
	collected = append(collected, r)
	if len(collected) == len(urls) {
		break
	}
}
// Ensure workers stop
cancel()

// Aggregate
sort.Slice(collected, func(i, j int) bool { return collected[i].URL < collected[j].URL })
var totals Totals
totals.URLs = len(collected)
for _, r := range collected {
	if r.Error == "" {
		totals.Successes++
		totals.Bytes += r.Bytes
		totals.Headings += r.Headings
	} else {
		totals.Failures++
	}
}
sum := Summary{
	Results: collected,
	Totals:  totals,
	Meta: Meta{
		Workers:  *workers,
		Timeout:  timeout.String(),
		Retries:  *retries,
		Backoff:  backoff.String(),
		CacheDir: *cacheDir,
		CacheTTL: cacheTTL.String(),
		Started:  start.Format(time.RFC3339Nano),
		Duration: time.Since(start).String(),
	},
}

enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", "  ")
if err := enc.Encode(sum); err != nil {
	logger.Error("failed to write summary", Fields{"err": err.Error()})
	os.Exit(1)
}
logger.Info("done", Fields{"duration": time.Since(start).String(), "successes": totals.Successes, "failures": totals.Failures})

}

logger.go package main

import ( "encoding/json" "io" "strings" "sync" "time" )

type Level int

const ( LevelDebug Level = iota LevelInfo LevelWarn LevelError )

func ParseLevel(s string) Level { switch strings.ToLower(strings.TrimSpace(s)) { case "debug": return LevelDebug case "warn", "warning": return LevelWarn case "error": return LevelError default: return LevelInfo } }

type Fields map[string]any

type Logger struct { mu sync.Mutex enc *json.Encoder level Level }

func NewLogger(w io.Writer, level Level) *Logger { return &Logger{enc: json.NewEncoder(w), level: level} }

func (l *Logger) log(level Level, msg string, fields Fields) { if level < l.level { return } entry := map[string]any{ "ts": time.Now().Format(time.RFC3339Nano), "lvl": [...]string{"debug", "info", "warn", "error"}[level], "msg": msg, "pid": getPID(), "prog": getProg(), } for k, v := range fields { entry[k] = v } l.mu.Lock() defer l.mu.Unlock() _ = l.enc.Encode(entry) }

func (l *Logger) Debug(msg string, f Fields) { l.log(LevelDebug, msg, f) } func (l *Logger) Info(msg string, f Fields) { l.log(LevelInfo, msg, f) } func (l *Logger) Warn(msg string, f Fields) { l.log(LevelWarn, msg, f) } func (l *Logger) Error(msg string, f Fields) { l.log(LevelError, msg, f) }

func getPID() int { return osGetpid() }

func getProg() string { return osArgs0() }

logger_os.go package main

import "os"

func osGetpid() int { return os.Getpid() } func osArgs0() string { if len(os.Args) > 0 { return os.Args[0] } return "app" }

worker.go package main

import ( "context" "sync" )

type Job struct { URL string }

type Handler func(context.Context, Job) URLResult

func RunWorkers(ctx context.Context, n int, jobs <-chan Job, handler Handler, out chan<- URLResult) { var wg sync.WaitGroup wg.Add(n) for i := 0; i < n; i++ { go func(id int) { defer wg.Done() for { select { case <-ctx.Done(): return case j, ok := <-jobs: if !ok { return } out <- handler(ctx, j) } } }(i) } go func() { wg.Wait() close(out) }() }

fetcher.go package main

import ( "context" "errors" "fmt" "io" "math/rand" "net" "net/http" "time" )

type Cache interface { Get(url string, ttl time.Duration) ([]byte, bool, error) Put(url string, data []byte) error }

type Fetcher struct { Timeout time.Duration Retries int Backoff time.Duration Cache Cache TTL time.Duration Logger *Logger Client *http.Client // optional; if nil, constructed per request }

func (f *Fetcher) httpClient() *http.Client { if f.Client != nil { return f.Client } return &http.Client{ Timeout: f.Timeout, Transport: &http.Transport{ Proxy: http.ProxyFromEnvironment, MaxIdleConns: 100, MaxConnsPerHost: 20, MaxIdleConnsPerHost: 20, IdleConnTimeout: 90 * time.Second, DialContext: (&net.Dialer{ Timeout: 5 * time.Second, KeepAlive: 30 * time.Second, }).DialContext, }, } }

func (f *Fetcher) Fetch(ctx context.Context, url string) ([]byte, int, bool, error) { if f.Cache != nil && f.TTL > 0 { if data, ok, err := f.Cache.Get(url, f.TTL); err == nil && ok { f.Logger.Debug("cache hit", Fields{"url": url}) return data, 0, true, nil } }

attempts := 0
var lastErr error
for {
	select {
	case <-ctx.Done():
		return nil, attempts, false, ctx.Err()
	default:
	}
	attempts++
	data, err := f.fetchOnce(ctx, url)
	if err == nil {
		if f.Cache != nil {
			_ = f.Cache.Put(url, data)
		}
		return data, attempts, false, nil
	}
	lastErr = err
	if attempts > f.Retries {
		break
	}
	// backoff with jitter
	delay := f.Backoff + time.Duration(rand.Int63n(int64(f.Backoff)/2+1))
	if delay <= 0 {
		delay = 100 * time.Millisecond
	}
	f.Logger.Warn("fetch failed, retrying", Fields{"url": url, "attempt": attempts, "err": err.Error(), "sleep": delay.String()})
	t := time.NewTimer(delay)
	select {
	case <-ctx.Done():
		t.Stop()
		return nil, attempts, false, ctx.Err()
	case <-t.C:
	}
}
return nil, attempts, false, fmt.Errorf("fetch failed after %d attempt(s): %w", attempts, lastErr)

}

func (f *Fetcher) fetchOnce(ctx context.Context, url string) ([]byte, error) { client := f.httpClient() req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, err } req.Header.Set("User-Agent", "concurrent-fetch/1.0 (+https://example)") resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode >= 500 { return nil, fmt.Errorf("server error: %d", resp.StatusCode) } if resp.StatusCode >= 400 { // treat 4xx as non-retryable error return nil, fmt.Errorf("client error: %d", resp.StatusCode) } limited := &io.LimitedReader{R: resp.Body, N: 10 << 20} // 10MB safety limit data, err := io.ReadAll(limited) if err != nil { return nil, err } if limited.N == 0 { return nil, errors.New("response too large") } return data, nil }

parser.go package main

import ( "bytes" "strings"

"golang.org/x/net/html"

)

func CountHeadings(content []byte) int { // Try HTML first if n, ok := countHTMLHeadings(content); ok { return n } // Fallback: Markdown-style headings return countMarkdownHeadings(content) }

func countHTMLHeadings(content []byte) (int, bool) { doc, err := html.Parse(bytes.NewReader(content)) if err != nil { return 0, false } count := 0 var walk func(*html.Node) walk = func(n *html.Node) { if n.Type == html.ElementNode { switch strings.ToLower(n.Data) { case "h1", "h2", "h3", "h4", "h5", "h6": count++ } } for c := n.FirstChild; c != nil; c = c.NextSibling { walk(c) } } walk(doc) return count, true }

func countMarkdownHeadings(content []byte) int { lines := strings.Split(string(content), "\n") count := 0 for _, l := range lines { lt := strings.TrimSpace(l) if lt == "" || strings.HasPrefix(lt, "<") { // probably HTML; skip noisy matches continue } if strings.HasPrefix(lt, "#") { count++ } } return count }

cache.go package main

import ( "crypto/sha256" "encoding/hex" "errors" "os" "path/filepath" "time" )

type FileCache struct { dir string }

func NewFileCache(dir string) *FileCache { return &FileCache{dir: dir} }

func (c *FileCache) key(url string) string { sum := sha256.Sum256([]byte(url)) return filepath.Join(c.dir, hex.EncodeToString(sum[:])+".cache") }

func (c *FileCache) Get(url string, ttl time.Duration) ([]byte, bool, error) { path := c.key(url) st, err := os.Stat(path) if err != nil { if errors.Is(err, os.ErrNotExist) { return nil, false, nil } return nil, false, err } if ttl > 0 && time.Since(st.ModTime()) > ttl { return nil, false, nil } data, err := os.ReadFile(path) if err != nil { return nil, false, err } return data, true, nil }

func (c *FileCache) Put(url string, data []byte) error { path := c.key(url) tmp := path + ".tmp" if err := os.WriteFile(tmp, data, 0o644); err != nil { return err } return os.Rename(tmp, path) }

go.sum // intentionally empty; no external dependencies recorded for stdlib only

parser_test.go package main

import "testing"

func TestCountHeadingsHTML(t *testing.T) { html := `

x

A

B

C

no

` got := CountHeadings([]byte(html)) if got != 3 { t.Fatalf("expected 3 headings, got %d", got) } }

func TestCountHeadingsMarkdown(t *testing.T) { md := `

Title

some text

Subtitle

normal

Third

not a heading # text ` got := CountHeadings([]byte(md)) if got != 3 { t.Fatalf("expected 3 headings, got %d", got) } }

fetcher_test.go package main

import ( "context" "net/http" "net/http/httptest" "os" "path/filepath" "sync/atomic" "testing" "time" )

func TestFetcherRetry(t *testing.T) { var hits int32 srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { n := atomic.AddInt32(&hits, 1) if n <= 2 { http.Error(w, "temporary", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/plain") w.Write([]byte("

ok

")) })) defer srv.Close()

logger := NewLogger(os.Stderr, LevelError)
f := &Fetcher{
	Timeout: 2 * time.Second,
	Retries: 3,
	Backoff: 10 * time.Millisecond,
	TTL:     0,
	Logger:  logger,
	Client:  srv.Client(),
}

ctx := context.Background()
data, attempts, cached, err := f.Fetch(ctx, srv.URL)
if err != nil {
	t.Fatalf("unexpected error: %v", err)
}
if cached {
	t.Fatalf("unexpected cached result")
}
if attempts < 3 {
	t.Fatalf("expected at least 3 attempts, got %d", attempts)
}
if string(data) != "<h1>ok</h1>" {
	t.Fatalf("unexpected body: %q", string(data))
}

}

func TestFetcherCache(t *testing.T) { dir := t.TempDir() cache := NewFileCache(dir) logger := NewLogger(os.Stderr, LevelError) f := &Fetcher{ Timeout: 1 * time.Second, Retries: 0, Backoff: 10 * time.Millisecond, TTL: 10 * time.Minute, Cache: cache, Logger: logger, Client: &http.Client{Timeout: 1 * time.Second}, }

url := "https://example.com/cached"
// Seed cache
data := []byte("hello # world")
if err := os.WriteFile(filepath.Join(dir, cache.key(url)), data, 0o644); err != nil {
	t.Fatalf("write cache: %v", err)
}

got, attempts, cached, err := f.Fetch(context.Background(), url)
if err != nil {
	t.Fatalf("unexpected error: %v", err)
}
if !cached {
	t.Fatalf("expected cached")
}
if attempts != 0 {
	t.Fatalf("expected 0 attempts when cached, got %d", attempts)
}
if string(got) != "hello # world" {
	t.Fatalf("unexpected cached content: %q", string(got))
}

}

integration_test.go package main

import ( "context" "encoding/json" "net/http" "net/http/httptest" "os" "sort" "testing" "time" )

func TestEndToEnd(t *testing.T) { mux := http.NewServeMux() mux.HandleFunc("/a", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("

a

x

")) }) mux.HandleFunc("/b", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("# md\n## md2\ntext")) }) srv := httptest.NewServer(mux) defer srv.Close()

urls := []string{srv.URL + "/a", srv.URL + "/b"}

logger := NewLogger(os.Stderr, LevelError)
cache := NewFileCache(t.TempDir())
fetcher := &Fetcher{
	Timeout: 1 * time.Second,
	Retries: 1,
	Backoff: 10 * time.Millisecond,
	TTL:     0,
	Cache:   cache,
	Logger:  logger,
	Client:  srv.Client(),
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

jobs := make(chan Job)
out := make(chan URLResult)
go func() {
	RunWorkers(ctx, 2, jobs, func(ctx context.Context, j Job) URLResult {
		t0 := time.Now()
		data, attempts, cached, err := fetcher.Fetch(ctx, j.URL)
		r := URLResult{URL: j.URL, Attempts: attempts, Cached: cached, DurationMS: time.Since(t0).Milliseconds()}
		if err != nil {
			r.Error = err.Error()
			return r
		}
		r.Bytes = len(data)
		r.Headings = CountHeadings(data)
		return r
	}, out)
}()

go func() {
	defer close(jobs)
	for _, u := range urls {
		jobs <- Job{URL: u}
	}
}()

var results []URLResult
for r := range out {
	results = append(results, r)
	if len(results) == len(urls) {
		break
	}
}

if len(results) != 2 {
	t.Fatalf("expected 2 results, got %d", len(results))
}
sort.Slice(results, func(i, j int) bool { return results[i].URL < results[j].URL })
if results[0].Headings != 2 || results[1].Headings != 2 {
	b, _ := json.MarshalIndent(results, "", "  ")
	t.Fatalf("unexpected headings count: %s", string(b))
}

}

go.mod note: This code uses golang.org/x/net/html for robust HTML heading parsing. Add the following requirement if your environment needs an explicit module line:

  • run: go get golang.org/x/net@latest

Example usage:

Behavior:

  • Structured JSON logs to stderr.
  • Final aggregated JSON printed to stdout.
  • Graceful shutdown on Ctrl+C (first cancels in-flight work, second exits).

示例详情

解决的问题

通过智能补全功能,帮助开发者快速生成完整可运行的代码实现,从而提升开发效率、减少编码错误,并加速目标功能的实现。

适用用户

后端开发人员

通过该提示词快速补全业务逻辑代码,减少重复性工作并提升接口开发效率,专注于核心功能实现。

初学编程者

为新手提供有效的代码示例和指导,帮助快速学习语言特性与实现常见功能,降低学习曲线。

全栈开发人员

高效生成前后端代码片段,无需频繁查阅文档或资料,提升开发一体化效率。

特征总结

根据输入的代码片段与目标功能,智能生成完整可运行的代码,实现快速开发与迭代。
支持多种编程语言,让程序员可以在多语言项目中轻松切换,提升开发效率。
自动优化代码结构并添加注释,让代码不仅高效,还易于维护和阅读。
基于上下文理解与目标要求,提供高质量的功能实现,减少出错率。
一键生成复杂功能模块,轻松满足业务需求,无需从零开始开发。
针对开发需求定制生成逻辑,可灵活调整,实现针对性输出。
提升团队协作效率,通过统一的代码风格与逻辑帮助开发者保持一致性。
简化学习过程,让新手开发者在指导下快速掌握代码实现,缩短上手时间。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 87 tokens
- 4 个可调节参数
{ 目标功能描述 } { 待补全代码片段 } { 编程语言 } { 是否添加注释 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59