¥
立即购买

自动化脚本生成

446 浏览
40 试用
11 购买
Nov 24, 2025更新

本提示词可根据用户需求生成指定语言的脚本,实现特定功能操作,支持从指定输入源读取数据、按目标格式输出到指定位置,并遵循用户设定的库依赖或参数约束,适用于自动化任务和脚本开发场景。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import glob
import json
import logging
import os
import sqlite3
import sys
from datetime import datetime
from logging.handlers import RotatingFileHandler

import pandas as pd
import yaml


# ----------------------------
# Configuration & Logging
# ----------------------------

def setup_logging():
    os.makedirs("logs", exist_ok=True)
    logger = logging.getLogger("app")
    logger.setLevel(logging.INFO)

    # Rotating file handler: 5MB per file, keep 3 backups
    file_handler = RotatingFileHandler("logs/app.log", maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8")
    file_handler.setLevel(logging.INFO)
    file_fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
    file_handler.setFormatter(file_fmt)

    # Console handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(logging.Formatter("%(message)s"))

    logger.handlers.clear()
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    return logger


def load_config(config_path: str, logger: logging.Logger):
    default = {
        "field_mapping": {},  # old_name -> new_name; should map to standard names: id, name, status, score
        "filters": {
            "status": "active",
            "min_score": 80
        }
    }
    if not os.path.exists(config_path):
        logger.warning(f"Config file '{config_path}' not found. Using defaults: {default}")
        return default
    try:
        with open(config_path, "r", encoding="utf-8") as f:
            cfg = yaml.safe_load(f) or {}
        # Merge defaults
        fm = cfg.get("field_mapping", {}) or {}
        filters = cfg.get("filters", {}) or {}
        status = filters.get("status", default["filters"]["status"])
        min_score = filters.get("min_score", default["filters"]["min_score"])
        return {"field_mapping": fm, "filters": {"status": status, "min_score": int(min_score)}}
    except Exception as e:
        logger.error(f"Failed to read config '{config_path}': {e}. Using defaults.")
        return default


# ----------------------------
# Database
# ----------------------------

def init_db(db_path: str, logger: logging.Logger):
    os.makedirs(os.path.dirname(db_path), exist_ok=True)
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL;")
    conn.execute("PRAGMA synchronous=NORMAL;")
    conn.execute("""
        CREATE TABLE IF NOT EXISTS cleaned_records (
            id TEXT PRIMARY KEY,
            name TEXT,
            status TEXT,
            score REAL,
            score_rank INTEGER,
            normalized_score REAL,
            source_file TEXT,
            processed_date TEXT
        )
    """)
    conn.execute("CREATE INDEX IF NOT EXISTS idx_cleaned_records_processed_date ON cleaned_records(processed_date)")
    conn.commit()
    logger.info(f"SQLite initialized at {db_path}")
    return conn


# ----------------------------
# Processing
# ----------------------------

def fetch_existing_ids(conn: sqlite3.Connection, ids):
    # SQLite has a default parameter limit ~999; chunk the IN query
    existing = set()
    ids = list(set(ids))  # unique
    CHUNK = 900
    for i in range(0, len(ids), CHUNK):
        chunk = ids[i:i + CHUNK]
        placeholders = ",".join(["?"] * len(chunk))
        cur = conn.execute(f"SELECT id FROM cleaned_records WHERE id IN ({placeholders})", chunk)
        existing.update([r[0] for r in cur.fetchall()])
    return existing


def process_files(files, conn, date_str, encoding, field_mapping, status_filter, min_score, logger):
    total_rows = 0
    filtered_rows = 0
    duplicate_count = 0
    input_files_used = []

    # Ensure output partition directory exists
    out_dir = os.path.join("output", date_str)
    os.makedirs(out_dir, exist_ok=True)

    # Each chunk limited to <= 100k; use 20k for safety
    CHUNKSIZE = 20000

    for fp in files:
        if not os.path.exists(fp):
            logger.warning(f"Input file not found, skipping: {fp}")
            continue
        input_files_used.append(fp)
        logger.info(f"Processing file: {fp}")
        try:
            reader = pd.read_csv(
                fp,
                chunksize=CHUNKSIZE,
                encoding=encoding,
                dtype=str,
                sep=",",
                on_bad_lines="skip"
            )
        except Exception as e:
            logger.error(f"Failed to open CSV '{fp}': {e}")
            continue

        for chunk in reader:
            total_rows += len(chunk)

            # Rename columns according to config mapping
            if field_mapping:
                chunk = chunk.rename(columns=field_mapping)

            # Ensure required columns exist
            required_cols = ["id", "name", "status", "score"]
            missing = [c for c in required_cols if c not in chunk.columns]
            if missing:
                logger.error(f"Chunk missing required columns {missing} in file {fp}. Skipping this chunk.")
                continue

            # Canonicalize types
            chunk["id"] = chunk["id"].astype(str).str.strip()
            chunk["name"] = chunk["name"].astype(str).str.strip()
            chunk["status"] = chunk["status"].astype(str).str.strip().str.lower()
            # score to numeric
            chunk["score"] = pd.to_numeric(chunk["score"], errors="coerce")

            # Drop rows without id or score
            before_filter = len(chunk)
            chunk = chunk.dropna(subset=["id", "score"])
            # Apply filters
            chunk = chunk[(chunk["status"] == status_filter) & (chunk["score"] >= float(min_score))]

            filtered_rows += len(chunk)

            if len(chunk) == 0:
                continue

            # Dedup within chunk by id: keep last occurrence
            dedup_chunk = chunk.drop_duplicates(subset=["id"], keep="last")
            dups_within_chunk = len(chunk) - len(dedup_chunk)
            duplicate_count += dups_within_chunk

            # Add metadata
            dedup_chunk["source_file"] = fp
            dedup_chunk["processed_date"] = date_str

            # Split into new vs existing in DB
            ids = dedup_chunk["id"].tolist()
            existing_ids = fetch_existing_ids(conn, ids)
            new_rows = dedup_chunk[~dedup_chunk["id"].isin(existing_ids)]
            existing_rows = dedup_chunk[dedup_chunk["id"].isin(existing_ids)]

            # Duplicates across DB
            duplicate_count += len(existing_rows)

            # Insert new rows
            if len(new_rows) > 0:
                conn.executemany(
                    """
                    INSERT INTO cleaned_records (id, name, status, score, source_file, processed_date)
                    VALUES (?, ?, ?, ?, ?, ?)
                    """,
                    list(zip(
                        new_rows["id"].tolist(),
                        new_rows["name"].tolist(),
                        new_rows["status"].tolist(),
                        new_rows["score"].astype(float).tolist(),
                        new_rows["source_file"].tolist(),
                        new_rows["processed_date"].tolist()
                    ))
                )

            # Update existing rows to keep the latest encountered values
            if len(existing_rows) > 0:
                conn.executemany(
                    """
                    UPDATE cleaned_records
                    SET name = ?, status = ?, score = ?, source_file = ?, processed_date = ?
                    WHERE id = ?
                    """,
                    list(zip(
                        existing_rows["name"].tolist(),
                        existing_rows["status"].tolist(),
                        existing_rows["score"].astype(float).tolist(),
                        existing_rows["source_file"].tolist(),
                        existing_rows["processed_date"].tolist(),
                        existing_rows["id"].tolist()
                    ))
                )

            conn.commit()

    return {
        "total_rows": total_rows,
        "filtered_rows": filtered_rows,
        "duplicate_count": duplicate_count,
        "input_files": input_files_used
    }


def sqlite_supports_window_functions(conn: sqlite3.Connection) -> bool:
    try:
        v = conn.execute("SELECT sqlite_version()").fetchone()[0]
        parts = [int(x) for x in v.split(".")]
        # Window functions since 3.25.0
        return parts[0] > 3 or (parts[0] == 3 and (parts[1] > 25 or (parts[1] == 25 and parts[2] >= 0)))
    except Exception:
        return False


def compute_derived_and_write_outputs(conn, date_str, logger):
    # Stats for today's partition
    cur = conn.execute(
        "SELECT COUNT(*), MIN(score), MAX(score) FROM cleaned_records WHERE processed_date = ?",
        (date_str,)
    )
    count, min_score_val, max_score_val = cur.fetchone()
    if count == 0:
        logger.info("No records in today's partition; nothing to write.")
        return {"final_count": 0, "min_score": None, "max_score": None}

    if min_score_val is None or max_score_val is None:
        # Should not happen since records have score
        min_score_val, max_score_val = 0.0, 0.0

    # Update normalized_score for today's records
    if float(min_score_val) == float(max_score_val):
        conn.execute(
            "UPDATE cleaned_records SET normalized_score = 1.0 WHERE processed_date = ?",
            (date_str,)
        )
    else:
        diff = float(max_score_val) - float(min_score_val)
        conn.execute(
            """
            UPDATE cleaned_records
            SET normalized_score = (score - ?) / ?
            WHERE processed_date = ?
            """,
            (float(min_score_val), diff, date_str)
        )
    conn.commit()

    # Compute score_rank for today's partition
    if sqlite_supports_window_functions(conn):
        conn.execute("DROP TABLE IF EXISTS tmp_rank")
        conn.execute(
            f"""
            CREATE TEMP TABLE tmp_rank AS
            SELECT id, ROW_NUMBER() OVER (ORDER BY score DESC) AS score_rank
            FROM cleaned_records
            WHERE processed_date = ?
            """,
            (date_str,)
        )
        conn.execute(
            """
            UPDATE cleaned_records
            SET score_rank = (SELECT score_rank FROM tmp_rank WHERE tmp_rank.id = cleaned_records.id)
            WHERE processed_date = ?
            """,
            (date_str,)
        )
        conn.execute("DROP TABLE IF EXISTS tmp_rank")
        conn.commit()
    else:
        # Fallback: stream ordered ids and assign ranks
        cur = conn.execute(
            "SELECT id FROM cleaned_records WHERE processed_date = ? ORDER BY score DESC",
            (date_str,)
        )
        ranks = []
        rank = 1
        while True:
            rows = cur.fetchmany(1000)
            if not rows:
                break
            for (rid,) in rows:
                ranks.append((rank, rid))
                rank += 1
        conn.executemany(
            "UPDATE cleaned_records SET score_rank = ? WHERE id = ?",
            ranks
        )
        conn.commit()

    # Write NDJSON for today's partition
    out_dir = os.path.join("output", date_str)
    os.makedirs(out_dir, exist_ok=True)
    ndjson_path = os.path.join(out_dir, "result.ndjson")
    with open(ndjson_path, "w", encoding="utf-8", newline="\n") as f:
        cur = conn.execute(
            """
            SELECT id, name, status, score, score_rank, normalized_score, source_file, processed_date
            FROM cleaned_records
            WHERE processed_date = ?
            """,
            (date_str,)
        )
        while True:
            rows = cur.fetchmany(2000)
            if not rows:
                break
            for r in rows:
                obj = {
                    "id": r[0],
                    "name": r[1],
                    "status": r[2],
                    "score": r[3],
                    "score_rank": r[4],
                    "normalized_score": r[5],
                    "source_file": r[6],
                    "processed_date": r[7]
                }
                f.write(json.dumps(obj, ensure_ascii=False) + "\n")

    logger.info(f"NDJSON written: {ndjson_path}")
    return {"final_count": count, "min_score": float(min_score_val), "max_score": float(max_score_val), "ndjson_path": ndjson_path}


def write_summary(date_str, summary_stats, process_stats, args, config_used, logger, start_time, end_time):
    out_dir = os.path.join("output", date_str)
    os.makedirs(out_dir, exist_ok=True)
    summary_path = os.path.join(out_dir, "summary.md")

    duration_sec = (end_time - start_time).total_seconds()
    total_rows = process_stats["total_rows"]
    filtered_rows = process_stats["filtered_rows"]
    duplicate_count = process_stats["duplicate_count"]
    final_count = summary_stats["final_count"]
    filter_ratio = (filtered_rows / total_rows) if total_rows > 0 else 0.0

    lines = []
    lines.append(f"# Summary Report {date_str}")
    lines.append("")
    lines.append(f"- Processing time: start={start_time.isoformat()}, end={end_time.isoformat()}, duration={duration_sec:.2f}s")
    lines.append(f"- Input encoding: {args.encoding}")
    lines.append(f"- Glob pattern: {args.glob}")
    lines.append(f"- Status filter: {args.status or config_used['filters']['status']}")
    lines.append(f"- Min score: {args.min_score if args.min_score is not None else config_used['filters']['min_score']}")
    lines.append("")
    lines.append("## Input files")
    for fp in process_stats["input_files"]:
        lines.append(f"- {fp}")
    if not process_stats["input_files"]:
        lines.append("- (none)")
    lines.append("")
    lines.append("## Statistics")
    lines.append(f"- Total rows read: {total_rows}")
    lines.append(f"- Rows after filter: {filtered_rows}")
    lines.append(f"- Filter ratio: {filter_ratio:.4f}")
    lines.append(f"- Duplicates encountered: {duplicate_count}")
    lines.append(f"- Final records (today's partition): {final_count}")
    lines.append(f"- Min score (today): {summary_stats['min_score']}")
    lines.append(f"- Max score (today): {summary_stats['max_score']}")
    lines.append("")
    lines.append("## Outputs")
    lines.append(f"- NDJSON: output/{date_str}/result.ndjson")
    lines.append(f"- SQLite DB: output/db.sqlite (table cleaned_records)")
    lines.append(f"- Log: logs/app.log")
    lines.append("")
    lines.append("## Config (field mapping)")
    fm = config_used.get("field_mapping", {})
    if fm:
        for k, v in fm.items():
            lines.append(f"- {k} -> {v}")
    else:
        lines.append("- (identity mapping)")

    with open(summary_path, "w", encoding="utf-8", newline="\n") as f:
        f.write("\n".join(lines))
    logger.info(f"Summary written: {summary_path}")


# ----------------------------
# Main
# ----------------------------

def main():
    logger = setup_logging()
    parser = argparse.ArgumentParser(description="Batch CSV cleaner: rename fields, filter, dedup, derive, output NDJSON & SQLite, summary.")
    parser.add_argument("--glob", default="data/*.csv", help="Glob pattern for input CSV files (default: data/*.csv)")
    parser.add_argument("--encoding", default="utf-8", help="File encoding (default: utf-8)")
    parser.add_argument("--min-score", type=int, help="Minimum score threshold (integer)")
    parser.add_argument("--status", choices=["active", "inactive"], help="Status filter value")
    parser.add_argument("--config", default="config.yaml", help="Path to config.yaml (default: ./config.yaml)")
    args = parser.parse_args()

    start_time = datetime.now()
    date_str = start_time.strftime("%Y%m%d")

    # Load config
    config = load_config(args.config, logger)

    # Resolve filters: CLI overrides config
    status_filter = args.status if args.status is not None else config["filters"]["status"]
    min_score = args.min_score if args.min_score is not None else int(config["filters"]["min_score"])

    # Init DB
    conn = init_db(os.path.join("output", "db.sqlite"), logger)

    # Resolve files
    files = sorted(glob.glob(args.glob))
    if not files:
        logger.warning(f"No files matched pattern: {args.glob}")

    # Process
    process_stats = process_files(
        files=files,
        conn=conn,
        date_str=date_str,
        encoding=args.encoding,
        field_mapping=config["field_mapping"],
        status_filter=status_filter,
        min_score=min_score,
        logger=logger
    )

    # Derive metrics & write NDJSON
    summary_stats = compute_derived_and_write_outputs(conn, date_str, logger)

    # Write summary
    end_time = datetime.now()
    write_summary(date_str, summary_stats, process_stats, args, config, logger, start_time, end_time)

    logger.info("Done.")


if __name__ == "__main__":
    # Pandas setting to reduce copies; not mandatory
    try:
        pd.options.mode.copy_on_write = True
    except Exception:
        pass
    main()
/**
 * Event ETL Script
 * Node >= 18
 * Dependencies: node-fetch@3, p-limit, ajv
 *
 * Features:
 * - Pull paginated events from REST API with cursor pagination and rate limit handling
 * - CLI args: --start-date, --end-date, --types (enum: login,purchase,error; comma-separated)
 * - Filters from filters.json (e.g. { "types": ["login","error"], "minSeverity": 2 })
 * - Normalize fields (timestamp to ISO8601; fill required fields from common aliases)
 * - Validate against strict JSON Schema; invalid records -> output/errors/invalid.jsonl
 * - Split into per-type JSONL files: output/events/{type}/YYYYMMDD.jsonl
 * - Support resume via checkpoint: output/state/checkpoint.json
 * - Exponential backoff retries (max 5)
 * - Concurrency limit <= 5 (processing)
 * - Zip per-day output files after processing: output/archive/events_YYYYMMDD.zip
 * - Generate manifest.json with batch metadata
 */

import fs from 'node:fs';
import fsp from 'node:fs/promises';
import path from 'node:path';
import { execFile } from 'node:child_process';
import Ajv from 'ajv';
import pLimit from 'p-limit';

const API_URL = 'https://api.example.test/v1/events';
const PAGE_SIZE = 100;
const MAX_RETRIES = 5;
const CONCURRENCY = 5;

const OUTPUT_BASE = 'output';
const OUTPUT_EVENTS_DIR = path.join(OUTPUT_BASE, 'events');
const OUTPUT_ERRORS_DIR = path.join(OUTPUT_BASE, 'errors');
const OUTPUT_ARCHIVE_DIR = path.join(OUTPUT_BASE, 'archive');
const OUTPUT_STATE_DIR = path.join(OUTPUT_BASE, 'state');
const INVALID_FILE = path.join(OUTPUT_ERRORS_DIR, 'invalid.jsonl');
const CHECKPOINT_FILE = path.join(OUTPUT_STATE_DIR, 'checkpoint.json');
const MANIFEST_FILE = path.join(OUTPUT_BASE, 'manifest.json');

const TYPES_ENUM = ['login', 'purchase', 'error'];

async function main() {
  const args = parseArgs(process.argv.slice(2));
  validateArgs(args);
  const token = process.env.EVENT_TOKEN;
  if (!token) {
    console.error('ERROR: Missing EVENT_TOKEN environment variable.');
    process.exit(1);
  }

  const filters = await readFilters('filters.json');
  const { types: cliTypes } = args;
  const typesFilter = filters.types && filters.types.length ? filters.types : null;
  const finalTypes = intersectTypes(cliTypes, typesFilter);
  if (!finalTypes.length) {
    console.error('ERROR: No types to process after applying filters.');
    process.exit(1);
  }
  const minSeverity = Number.isFinite(filters.minSeverity) ? Number(filters.minSeverity) : null;

  await ensureDirs([
    OUTPUT_BASE,
    OUTPUT_EVENTS_DIR,
    ...finalTypes.map(t => path.join(OUTPUT_EVENTS_DIR, t)),
    OUTPUT_ERRORS_DIR,
    OUTPUT_ARCHIVE_DIR,
    OUTPUT_STATE_DIR,
  ]);

  const batchId = cryptoRandomUUID();
  const checkpoint = await loadCheckpoint();
  const resume =
    checkpoint &&
    checkpoint.startDate === args.startDate &&
    checkpoint.endDate === args.endDate &&
    arrayEqual(checkpoint.types, finalTypes);
  let cursor = resume ? checkpoint.cursor : null;

  const ajv = new Ajv({ allErrors: true, strict: true });
  ajv.addFormat('date-time', (str) => !Number.isNaN(Date.parse(str)));
  const validate = ajv.compile(eventSchema());

  // Streams per type/day
  const streamMap = new Map(); // key: `${type}:${day}`, value: WriteStream
  const invalidStream = fs.createWriteStream(INVALID_FILE, { flags: 'a', encoding: 'utf-8' });

  const limit = pLimit(CONCURRENCY);

  let totalRequests = 0;
  let totalDurationMs = 0;
  let retryCount = 0;
  let totalPages = 0;
  let invalidCount = 0;
  const perTypeCounts = Object.fromEntries(finalTypes.map(t => [t, 0]));
  const perDayFiles = new Map(); // day -> Set of files produced

  const fetchFn = await loadFetch();

  console.log(`Batch ${batchId} starting: ${args.startDate} to ${args.endDate}; types=${finalTypes.join(',')}; cursor=${cursor || 'N/A'}`);

  const startedAt = Date.now();

  try {
    while (true) {
      const url = buildUrl(API_URL, {
        start_date: args.startDate,
        end_date: args.endDate,
        page_size: String(PAGE_SIZE),
        types: finalTypes.join(','),
        cursor: cursor || '',
      });

      const t0 = Date.now();
      const { json, headers, retriesUsed } = await fetchWithRetry(fetchFn, url, {
        headers: {
          Authorization: `Bearer ${token}`,
          'Accept': 'application/json',
        },
      });
      const t1 = Date.now();

      totalRequests += 1;
      totalDurationMs += (t1 - t0);
      retryCount += retriesUsed;

      const pageData = normalizePage(json);
      const events = Array.isArray(pageData.data) ? pageData.data : [];
      const nextCursor = pageData.next_cursor || null;
      totalPages += 1;

      // Process events (normalize, filter, validate, write)
      const tasks = events.map(ev => limit(async () => {
        const normalized = normalizeEvent(ev);
        if (!normalized || !normalized.type || !normalized.timestamp) {
          invalidCount += 1;
          invalidStream.write(JSON.stringify({ error: 'missing required fields', raw: ev }) + '\n');
          return;
        }

        if (!finalTypes.includes(normalized.type)) {
          // filtered by types
          return;
        }
        if (normalized.type === 'error' && minSeverity !== null) {
          const sev = Number(normalized.severity ?? 0);
          if (!Number.isFinite(sev) || sev < minSeverity) {
            return; // filtered out by severity
          }
        }

        const valid = validate(normalized);
        if (!valid) {
          invalidCount += 1;
          invalidStream.write(JSON.stringify({ error: ajv.errorsText(validate.errors), event: sanitized(normalized) }) + '\n');
          return;
        }

        const day = formatYYYYMMDD(normalized.timestamp);
        const filePath = path.join(OUTPUT_EVENTS_DIR, normalized.type, `${day}.jsonl`);
        const stream = getStream(streamMap, normalized.type, day, filePath);
        stream.write(JSON.stringify(normalized) + '\n');

        perTypeCounts[normalized.type] += 1;
        addFile(perDayFiles, day, filePath);
      }));

      await Promise.all(tasks);

      // Update checkpoint after each page
      cursor = nextCursor;
      await saveCheckpoint({
        batchId,
        cursor,
        startDate: args.startDate,
        endDate: args.endDate,
        types: finalTypes,
        updatedAt: new Date().toISOString(),
        perTypeCounts,
        invalidCount,
      });

      // Rate limit handling based on headers
      await maybeRespectRateLimit(headers);

      if (!nextCursor) break;
    }
  } catch (err) {
    console.error('FATAL:', err?.message || err);
    // Keep checkpoint for resume
    await closeStreams(streamMap, invalidStream);
    process.exit(1);
  }

  await closeStreams(streamMap, invalidStream);

  // Zip per-day files
  const archives = [];
  for (const [day, filesSet] of perDayFiles.entries()) {
    const files = Array.from(filesSet);
    if (!files.length) continue;
    const zipName = `events_${day}.zip`;
    const zipPath = path.join(OUTPUT_ARCHIVE_DIR, zipName);
    try {
      await zipFiles(zipPath, files);
      archives.push({ day, path: zipPath, files });
      console.log(`Archived ${files.length} files to ${zipPath}`);
    } catch (zipErr) {
      console.error(`WARN: Failed to create zip ${zipPath}:`, zipErr?.message || zipErr);
    }
  }

  const finishedAt = Date.now();

  // Write manifest
  const manifest = {
    batchId,
    startedAt: new Date(startedAt).toISOString(),
    finishedAt: new Date(finishedAt).toISOString(),
    timeWindow: { startDate: args.startDate, endDate: args.endDate },
    pageSize: PAGE_SIZE,
    types: finalTypes,
    filtersApplied: { minSeverity },
    cursor,
    totalPages,
    totalRequests,
    totalDurationMs,
    perTypeCounts,
    invalidCount,
    retryCount,
    archives,
  };
  await fsp.writeFile(MANIFEST_FILE, JSON.stringify(manifest, null, 2), 'utf-8');
  // Clear checkpoint after success
  await clearCheckpoint();

  console.log(`Done. Success counts: ${JSON.stringify(perTypeCounts)}; invalid=${invalidCount}; retries=${retryCount}`);
}

// ---- Helpers ----

function parseArgs(argv) {
  const out = { startDate: null, endDate: null, types: [] };
  for (let i = 0; i < argv.length; i++) {
    const a = argv[i];
    if (a === '--start-date') out.startDate = argv[++i];
    else if (a === '--end-date') out.endDate = argv[++i];
    else if (a === '--types') out.types = (argv[++i] || '').split(',').map(s => s.trim()).filter(Boolean);
  }
  return out;
}

function validateArgs({ startDate, endDate, types }) {
  if (!isValidDate(startDate) || !isValidDate(endDate)) {
    console.error('ERROR: --start-date and --end-date must be in YYYY-MM-DD format.');
    process.exit(1);
  }
  if (new Date(startDate) > new Date(endDate)) {
    console.error('ERROR: --start-date cannot be after --end-date.');
    process.exit(1);
  }
  if (!types.length) {
    console.error('ERROR: --types is required, comma-separated values: login,purchase,error');
    process.exit(1);
  }
  const invalid = types.filter(t => !TYPES_ENUM.includes(t));
  if (invalid.length) {
    console.error(`ERROR: invalid --types: ${invalid.join(',')}. Allowed: ${TYPES_ENUM.join(',')}`);
    process.exit(1);
  }
}

async function readFilters(filepath) {
  try {
    const raw = await fsp.readFile(filepath, 'utf-8');
    const json = JSON.parse(raw);
    const out = {};
    if (Array.isArray(json.types)) out.types = json.types.filter(t => TYPES_ENUM.includes(t));
    if (json.minSeverity !== undefined) out.minSeverity = Number(json.minSeverity);
    return out;
  } catch {
    return {};
  }
}

function intersectTypes(cliTypes, filterTypes) {
  if (!filterTypes) return cliTypes;
  const set = new Set(filterTypes);
  return cliTypes.filter(t => set.has(t));
}

async function ensureDirs(dirs) {
  for (const d of dirs) {
    await fsp.mkdir(d, { recursive: true });
  }
}

function eventSchema() {
  return {
    $id: 'Event',
    type: 'object',
    additionalProperties: true,
    required: ['id', 'type', 'timestamp'],
    properties: {
      id: { type: 'string' },
      type: { enum: TYPES_ENUM },
      timestamp: { type: 'string', format: 'date-time' },
      message: { type: 'string' },
      cursor: { type: 'string' },
      // login
      userId: { type: 'string' },
      ip: { type: 'string' },
      // purchase
      orderId: { type: 'string' },
      amount: { type: 'number' },
      currency: { type: 'string' },
      // error
      errorCode: { type: 'string' },
      severity: { type: 'integer', minimum: 0 },
      stack: { type: 'string' },
    },
    allOf: [
      {
        if: { properties: { type: { const: 'login' } }, required: ['type'] },
        then: { required: ['userId', 'ip'] },
      },
      {
        if: { properties: { type: { const: 'purchase' } }, required: ['type'] },
        then: { required: ['orderId', 'amount'] },
      },
      {
        if: { properties: { type: { const: 'error' } }, required: ['type'] },
        then: { required: ['errorCode', 'severity'] },
      },
    ],
  };
}

function normalizeEvent(ev) {
  if (!ev || typeof ev !== 'object') return null;
  const type = ev.type || ev.eventType || ev.category;
  if (!TYPES_ENUM.includes(type)) return null;

  const id = ev.id || ev.eventId || ev.uuid || String(ev._id || '');
  const timestamp = normalizeTimestamp(ev.timestamp ?? ev.ts ?? ev.time ?? ev.created_at ?? ev.occurred_at);
  if (!id || !timestamp) return null;

  const base = {
    id: String(id),
    type,
    timestamp,
    message: ev.message || ev.msg || '',
  };

  if (type === 'login') {
    base.userId = ev.userId || ev.user_id || ev.uid || '';
    base.ip = ev.ip || ev.ip_address || ev.remote_ip || '';
  } else if (type === 'purchase') {
    base.orderId = ev.orderId || ev.order_id || ev.oid || '';
    const amount = ev.amount ?? ev.total ?? ev.price;
    base.amount = amount !== undefined ? Number(amount) : NaN;
    base.currency = ev.currency || ev.curr || 'USD';
  } else if (type === 'error') {
    base.errorCode = ev.errorCode || ev.code || ev.err || '';
    const sev = ev.severity ?? ev.level ?? ev.priority;
    base.severity = sev !== undefined ? parseInt(sev, 10) : NaN;
    base.stack = ev.stack || ev.stacktrace || '';
  }

  // Ensure required fields present and sensible
  if (type === 'purchase' && !Number.isFinite(base.amount)) return null;
  if (type === 'error' && !Number.isInteger(base.severity)) return null;

  return base;
}

function normalizeTimestamp(ts) {
  if (!ts) return null;
  // ts could be ISO string, epoch seconds, epoch ms
  if (typeof ts === 'string') {
    // try parse
    const num = Number(ts);
    if (Number.isFinite(num) && ts.trim() !== '') {
      return toISOFromEpoch(num);
    }
    const d = new Date(ts);
    if (Number.isNaN(d.getTime())) return null;
    return d.toISOString();
  }
  if (typeof ts === 'number') {
    return toISOFromEpoch(ts);
  }
  return null;
}

function toISOFromEpoch(num) {
  // Heuristic: if num < 1e12 treat as seconds; else ms
  const ms = num < 1e12 ? num * 1000 : num;
  const d = new Date(ms);
  if (Number.isNaN(d.getTime())) return null;
  return d.toISOString();
}

function sanitized(obj) {
  try {
    return obj && JSON.parse(JSON.stringify(obj));
  } catch {
    return obj;
  }
}

function formatYYYYMMDD(isoTs) {
  const d = new Date(isoTs);
  const y = d.getUTCFullYear();
  const m = String(d.getUTCMonth() + 1).padStart(2, '0');
  const day = String(d.getUTCDate()).padStart(2, '0');
  return `${y}${m}${day}`;
}

function getStream(map, type, day, filePath) {
  const key = `${type}:${day}`;
  let s = map.get(key);
  if (!s) {
    s = fs.createWriteStream(filePath, { flags: 'a', encoding: 'utf-8' });
    map.set(key, s);
  }
  return s;
}

async function closeStreams(map, invalidStream) {
  const closers = [];
  for (const s of map.values()) {
    closers.push(new Promise((resolve) => s.end(resolve)));
  }
  if (invalidStream) {
    closers.push(new Promise((resolve) => invalidStream.end(resolve)));
  }
  await Promise.all(closers);
}

function addFile(map, day, filePath) {
  let set = map.get(day);
  if (!set) {
    set = new Set();
    map.set(day, set);
  }
  set.add(filePath);
}

function buildUrl(base, params) {
  const url = new URL(base);
  for (const [k, v] of Object.entries(params)) {
    if (v === undefined || v === null || v === '') continue;
    url.searchParams.set(k, v);
  }
  return url.toString();
}

async function maybeRespectRateLimit(headers) {
  try {
    const remaining = headers.get('X-RateLimit-Remaining');
    const reset = headers.get('X-RateLimit-Reset');
    if (remaining !== null && Number(remaining) <= 0) {
      const waitMs = reset ? Math.max(0, (Number(reset) || 0) * 1000) : 1000;
      await sleep(waitMs);
    }
  } catch {
    // ignore
  }
}

async function fetchWithRetry(fetchFn, url, options) {
  let attempt = 0;
  let lastErr = null;
  while (attempt <= MAX_RETRIES) {
    try {
      const res = await fetchFn(url, options);
      if (res.status === 429) {
        const retryAfter = res.headers.get('Retry-After');
        const waitMs = retryAfter ? Math.ceil(Number(retryAfter) * 1000) : backoffMs(attempt);
        await sleep(waitMs);
        attempt += 1;
        continue;
      }
      if (res.status >= 500) {
        await sleep(backoffMs(attempt));
        attempt += 1;
        continue;
      }
      if (res.status >= 400) {
        const text = await res.text();
        throw new Error(`HTTP ${res.status}: ${text}`);
      }
      const json = await res.json();
      return { json, headers: res.headers, retriesUsed: attempt };
    } catch (err) {
      lastErr = err;
      await sleep(backoffMs(attempt));
      attempt += 1;
    }
  }
  throw lastErr || new Error('Request failed after retries');
}

function backoffMs(attempt) {
  const base = 500; // ms
  const jitter = Math.floor(Math.random() * 250);
  return Math.min(30000, base * Math.pow(2, attempt)) + jitter;
}

function normalizePage(json) {
  // Assume API: { data: [...], next_cursor: '...' }
  if (!json || typeof json !== 'object') return { data: [], next_cursor: null };
  const data = Array.isArray(json.data) ? json.data : (Array.isArray(json.events) ? json.events : []);
  const next_cursor = json.next_cursor || json.nextCursor || null;
  return { data, next_cursor };
}

async function loadFetch() {
  try {
    const mod = await import('node-fetch');
    return mod.default;
  } catch {
    if (typeof fetch !== 'function') {
      throw new Error('Fetch API is not available and node-fetch@3 could not be imported.');
    }
    return fetch;
  }
}

async function zipFiles(zipPath, files) {
  await fsp.mkdir(path.dirname(zipPath), { recursive: true });
  const platform = process.platform;
  if (platform === 'win32') {
    // Use PowerShell Compress-Archive
    const psCmd = [
      '-NoProfile',
      '-Command',
      `$files = @(${files.map(f => `'${escapePwsh(f)}'`).join(',')});`,
      `$zip = '${escapePwsh(zipPath)}';`,
      `if (Test-Path $zip) { Remove-Item $zip }`,
      `Compress-Archive -Path $files -DestinationPath $zip`,
    ].join(' ');
    await execFileAsync('powershell.exe', [psCmd]);
  } else {
    // Use zip -j
    await execFileAsync('zip', ['-j', zipPath, ...files]);
  }
}

function escapePwsh(p) {
  return p.replace(/'/g, "''");
}

function execFileAsync(cmd, args) {
  return new Promise((resolve, reject) => {
    execFile(cmd, args, (err, stdout, stderr) => {
      if (err) {
        err.message = `${err.message}\nSTDOUT: ${stdout}\nSTDERR: ${stderr}`;
        return reject(err);
      }
      resolve({ stdout, stderr });
    });
  });
}

async function loadCheckpoint() {
  try {
    const raw = await fsp.readFile(CHECKPOINT_FILE, 'utf-8');
    return JSON.parse(raw);
  } catch {
    return null;
  }
}

async function saveCheckpoint(obj) {
  await fsp.writeFile(CHECKPOINT_FILE, JSON.stringify(obj, null, 2), 'utf-8');
}

async function clearCheckpoint() {
  try {
    await fsp.unlink(CHECKPOINT_FILE);
  } catch {
    // ignore
  }
}

function isValidDate(s) {
  if (typeof s !== 'string') return false;
  if (!/^\d{4}-\d{2}-\d{2}$/.test(s)) return false;
  const d = new Date(s + 'T00:00:00Z');
  return !Number.isNaN(d.getTime());
}

function arrayEqual(a, b) {
  if (!Array.isArray(a) || !Array.isArray(b)) return false;
  if (a.length !== b.length) return false;
  const sa = [...a].sort();
  const sb = [...b].sort();
  for (let i = 0; i < sa.length; i++) {
    if (sa[i] !== sb[i]) return false;
  }
  return true;
}

function cryptoRandomUUID() {
  if (typeof crypto !== 'undefined' && crypto.randomUUID) return crypto.randomUUID();
  // Fallback
  return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
    const r = (Math.random() * 16) | 0;
    const v = c === 'x' ? r : (r & 0x3) | 0x8;
    return v.toString(16);
  });
}

function sleep(ms) {
  return new Promise(res => setTimeout(res, ms));
}

// Entry
main().catch(err => {
  console.error('Unexpected error:', err);
  process.exit(1);
});
#!/usr/bin/env pwsh
# Requires PowerShell 7+
# Purpose: Parse combined access logs (with duration), aggregate per-minute metrics, generate CSV/Markdown/JSON reports.

[CmdletBinding()]
param(
    [string]$Window,                                 # ISO8601: "start,end" e.g. "2025-11-01T00:00:00Z,2025-11-01T23:59:59Z"
    [ValidateSet('2xx','3xx','4xx','5xx')]
    [string[]]$StatusCodes,                          # Enumerated categories to include in status distribution
    [string]$LogsDir = "logs",
    [string]$ExcludePathsFile = "exclude_paths.txt",
    [string]$SettingsFile = "settings.json",
    [string]$ReportsDir = "reports"
)

# --- Color logging via PSWriteColor ---
try {
    Import-Module -Name PSWriteColor -ErrorAction Stop
} catch {
    function Write-Color {
        param(
            [Parameter(Mandatory=$true)][string[]]$Text,
            [Parameter()][System.ConsoleColor[]]$Color = [System.ConsoleColor]::White
        )
        # Basic fallback: join text and print single color
        $joined = ($Text -join "")
        $fg = $Color[0]
        Write-Host $joined -ForegroundColor $fg
    }
}

# --- Helpers ---
function Get-TimeZoneFromSettings {
    param([string]$Path)
    $tzInfo = $null
    if (Test-Path -Path $Path) {
        try {
            $json = Get-Content -Path $Path -Raw | ConvertFrom-Json
            if ($json.timezone) {
                try {
                    $tzInfo = [System.TimeZoneInfo]::FindSystemTimeZoneById($json.timezone)
                } catch {
                    Write-Color -Text @("Warning: timezone '", $json.timezone, "' not found; using Local.") -Color Yellow
                }
            }
        } catch {
            Write-Color -Text @("Warning: failed to parse settings.json; using Local timezone.") -Color Yellow
        }
    }
    if (-not $tzInfo) { $tzInfo = [System.TimeZoneInfo]::Local }
    return $tzInfo
}

function Convert-CombinedTimeToDTO {
    param([string]$timeStr) # e.g. 10/Oct/2000:13:55:36 -0700
    # Normalize offset "-0700" -> "-07:00"
    $m = [regex]::Match($timeStr, '^(?<date>\d{2}/[A-Za-z]{3}/\d{4}:\d{2}:\d{2}:\d{2}) (?<off>[+-]\d{4})$')
    if (-not $m.Success) { return $null }
    $date = $m.Groups['date'].Value
    $off = $m.Groups['off'].Value
    $norm = "{0} {1}:{2}" -f $date, $off.Substring(0,3), $off.Substring(3,2)
    try {
        $culture = [System.Globalization.CultureInfo]::InvariantCulture
        return [DateTimeOffset]::ParseExact($norm, 'dd/MMM/yyyy:HH:mm:ss zzz', $culture)
    } catch {
        return $null
    }
}

function Get-StatusCategory {
    param([int]$code)
    switch ($code) {
        {$_ -ge 200 -and $_ -lt 300} { '2xx'; break }
        {$_ -ge 300 -and $_ -lt 400} { '3xx'; break }
        {$_ -ge 400 -and $_ -lt 500} { '4xx'; break }
        default { '5xx' }
    }
}

function Compute-Percentile {
    param([System.Collections.Generic.List[double]]$Values, [double]$p = 0.95)
    if (-not $Values -or $Values.Count -eq 0) { return $null }
    $arr = $Values.ToArray()
    [Array]::Sort($arr)
    $n = $arr.Length
    $idx = [math]::Floor($p * ($n - 1))
    return [double]$arr[$idx]
}

function Should-ExcludePath {
    param([string]$path, [string[]]$patterns)
    if (-not $patterns -or $patterns.Count -eq 0) { return $false }
    foreach ($pat in $patterns) {
        if (-not $pat) { continue }
        # wildcard-like match
        if ($path -like $pat) { return $true }
    }
    return $false
}

# Maintain Top N slow requests without storing all
function Update-TopSlow {
    param(
        [System.Collections.Generic.List[object]]$TopList,
        [pscustomobject]$record,
        [int]$N = 10
    )
    if ($null -eq $record.Duration) { return }
    if ($TopList.Count -lt $N) {
        $TopList.Add($record) | Out-Null
        return
    }
    # Replace the current minimum if the new record is slower
    $minIdx = 0
    $minVal = $TopList[0].Duration
    for ($i=1; $i -lt $TopList.Count; $i++) {
        if ($TopList[$i].Duration -lt $minVal) { $minVal = $TopList[$i].Duration; $minIdx = $i }
    }
    if ($record.Duration -gt $minVal) {
        $TopList[$minIdx] = $record
    }
}

# --- Initialization ---
$tz = Get-TimeZoneFromSettings -Path $SettingsFile
Write-Color -Text @("Using timezone: ", $tz.Id) -Color Green

# Window parsing
$winStart = $null
$winEnd = $null
$winSpecified = $false
if ($Window) {
    $parts = $Window.Split(',', 2, [System.StringSplitOptions]::RemoveEmptyEntries)
    if ($parts.Count -eq 2) {
        try {
            $winStart = [DateTimeOffset]::Parse($parts[0])
            $winEnd   = [DateTimeOffset]::Parse($parts[1])
            # normalize window to target timezone for consistent comparisons
            $winStart = [System.TimeZoneInfo]::ConvertTime($winStart, $tz)
            $winEnd   = [System.TimeZoneInfo]::ConvertTime($winEnd, $tz)
            $winSpecified = $true
            Write-Color -Text @("Window: ", $winStart.ToString("u"), " -> ", $winEnd.ToString("u")) -Color Cyan
        } catch {
            Write-Color -Text @("Warning: invalid --window format; ignoring.") -Color Yellow
        }
    } else {
        Write-Color -Text @("Warning: --window must be 'start,end' in ISO8601; ignoring.") -Color Yellow
    }
}

# Status categories selection
$selectedCats = if ($StatusCodes -and $StatusCodes.Count -gt 0) { $StatusCodes } else { @('2xx','3xx','4xx','5xx') }

# Exclude paths
$excludePatterns = @()
if (Test-Path -Path $ExcludePathsFile) {
    $excludePatterns = Get-Content -Path $ExcludePathsFile | Where-Object { $_ -and (-not $_.Trim().StartsWith('#')) } | ForEach-Object { $_.Trim() }
    if ($excludePatterns.Count -gt 0) {
        Write-Color -Text @("Loaded ", $excludePatterns.Count.ToString(), " exclude pattern(s).") -Color DarkGray
    }
}

# Output directory and permission check
if (-not (Test-Path -Path $ReportsDir)) {
    New-Item -ItemType Directory -Path $ReportsDir -Force | Out-Null
}
try {
    $testFile = Join-Path $ReportsDir "__write_test.tmp"
    Set-Content -Path $testFile -Value "ok" -Encoding utf8 -Force
    Remove-Item -Path $testFile -Force
} catch {
    Write-Color -Text @("Error: cannot write to reports directory '", $ReportsDir, "'. Exiting.") -Color Red
    exit 1
}

# Filenames by processing date in target timezone
$processDate = [System.TimeZoneInfo]::ConvertTime([DateTimeOffset]::Now, $tz).ToString('yyyyMMdd')
$metricsPath = Join-Path $ReportsDir "metrics-$processDate.csv"
$reportPath  = Join-Path $ReportsDir "report-$processDate.md"
$chartPath   = Join-Path $ReportsDir "chart-data-$processDate.json"

# Aggregators
$minuteAgg = @{}  # key: minute string 'yyyy-MM-dd HH:mm' -> {requests:int, durations:List[double], status: @{2xx,3xx,4xx,5xx}}
$pathCounts = New-Object 'System.Collections.Generic.Dictionary[string,int]'
$topSlow = New-Object 'System.Collections.Generic.List[object]'
$totalRequests = 0
$badLines = 0
$processedFiles = 0

# Precompiled regex for combined logs (with optional trailing duration)
$pattern = '^(?<ip>\S+)\s+\S+\s+\S+\s+\[(?<time>[^\]]+)\]\s+"(?<method>[A-Z]+)\s+(?<url>\S+)(?:\s+\S+)?"\s+(?<status>\d{3})\s+\S+\s+"(?<ref>.*?)"\s+"(?<ua>.*?)"(?:\s+(?<duration>[0-9]*\.?[0-9]+))?'
$rx = [regex]::new($pattern, [System.Text.RegularExpressions.RegexOptions]::Compiled)

# Process log files
$files = Get-ChildItem -Path $LogsDir -Filter *.log -File -ErrorAction SilentlyContinue
if (-not $files -or $files.Count -eq 0) {
    Write-Color -Text @("No log files found in '", $LogsDir, "'. Generating empty reports.") -Color Yellow
} else {
    foreach ($f in $files) {
        $processedFiles++
        $sizeLimit = 200MB
        if ($f.Length -gt $sizeLimit) {
            Write-Color -Text @("Warning: file '", $f.Name, "' is larger than 200MB (", [math]::Round($f.Length / 1MB, 2).ToString(), " MB). Parsing may be slow.") -Color Yellow
        }
        Write-Color -Text @("Processing ", $f.FullName) -Color DarkCyan

        # Stream parse to avoid high memory
        $reader = $null
        try {
            $reader = [System.IO.StreamReader]::new($f.FullName, [System.Text.Encoding]::UTF8, $true)
            while (-not $reader.EndOfStream) {
                $line = $reader.ReadLine()
                if ([string]::IsNullOrWhiteSpace($line)) { continue }
                $m = $rx.Match($line)
                if (-not $m.Success) {
                    # try to find duration via known tokens for robustness
                    $badLines++
                    continue
                }

                $timeStr = $m.Groups['time'].Value
                $dto = Convert-CombinedTimeToDTO -timeStr $timeStr
                if ($null -eq $dto) { $badLines++; continue }

                # Convert to target timezone
                $dtTz = [System.TimeZoneInfo]::ConvertTime($dto, $tz)

                # Window filter
                if ($winSpecified) {
                    if ($dtTz -lt $winStart -or $dtTz -gt $winEnd) { continue }
                }

                $method = $m.Groups['method'].Value
                $url = $m.Groups['url'].Value
                # Extract path without query
                $path = $url.Split('?', 2)[0]
                if (Should-ExcludePath -path $path -patterns $excludePatterns) { continue }

                $status = [int]$m.Groups['status'].Value
                # Duration from regex or known tokens in line
                $duration = $null
                if ($m.Groups['duration'].Success -and $m.Groups['duration'].Value) {
                    [double]::TryParse($m.Groups['duration'].Value, [ref]$duration) | Out-Null
                } else {
                    $dm = [regex]::Match($line, '(?:rt|request_time|response_time|time_taken)=(?<dur>[0-9]*\.?[0-9]+)')
                    if ($dm.Success) {
                        [double]::TryParse($dm.Groups['dur'].Value, [ref]$duration) | Out-Null
                    }
                }

                # Minute bucket
                $minuteKey = $dtTz.ToString('yyyy-MM-dd HH:mm')

                if (-not $minuteAgg.ContainsKey($minuteKey)) {
                    $minuteAgg[$minuteKey] = [pscustomobject]@{
                        requests  = 0
                        durations = New-Object 'System.Collections.Generic.List[double]'
                        status    = @{
                            '2xx' = 0; '3xx' = 0; '4xx' = 0; '5xx' = 0
                        }
                    }
                }

                $bucket = $minuteAgg[$minuteKey]
                $bucket.requests++
                $totalRequests++

                $cat = Get-StatusCategory -code $status
                $bucket.status[$cat]++

                if ($duration -ne $null) {
                    $bucket.durations.Add([double]$duration) | Out-Null
                    Update-TopSlow -TopList $topSlow -record ([pscustomobject]@{
                        Time     = $dtTz
                        Method   = $method
                        Path     = $path
                        Status   = $status
                        Duration = [double]$duration
                    }) -N 10
                }

                # Hot paths count
                if ($pathCounts.ContainsKey($path)) { $pathCounts[$path]++ } else { $pathCounts[$path] = 1 }
            }
        } finally {
            if ($reader) { $reader.Dispose() }
        }
    }
}

# --- Prepare outputs ---
# Sort minutes
$sortedMinutes = $minuteAgg.Keys | Sort-Object

# Metrics CSV
$csvEncoding = 'utf8' # PS7 default is UTF-8 (no BOM)
"minute,requests,status_counts,p95" | Set-Content -Path $metricsPath -Encoding $csvEncoding
foreach ($mk in $sortedMinutes) {
    $bucket = $minuteAgg[$mk]
    $p95 = Compute-Percentile -Values $bucket.durations -p 0.95
    $statusPairs = @()
    foreach ($c in $selectedCats) {
        $statusPairs += ("{0}={1}" -f $c, $bucket.status[$c])
    }
    $statusStr = ($statusPairs -join ';')
    $p95Str = if ($p95 -ne $null) { [string]::Format("{0:F3}", $p95) } else { "" }
    $line = "{0},{1},{2},{3}" -f $mk, $bucket.requests, $statusStr, $p95Str
    Add-Content -Path $metricsPath -Value $line -Encoding $csvEncoding
}

Write-Color -Text @("Written metrics CSV: ", $metricsPath) -Color Green

# Chart data JSON
$series = @()
foreach ($mk in $sortedMinutes) {
    $bucket = $minuteAgg[$mk]
    $p95 = Compute-Percentile -Values $bucket.durations -p 0.95
    $obj = [ordered]@{
        minute   = $mk
        requests = $bucket.requests
        p95      = if ($p95 -ne $null) { [math]::Round($p95,3) } else { $null }
        status   = [ordered]@{}
    }
    foreach ($c in $selectedCats) {
        $obj.status[$c] = $bucket.status[$c]
    }
    $series += $obj
}

$overallDurations = New-Object 'System.Collections.Generic.List[double]'
foreach ($mk in $sortedMinutes) {
    foreach ($d in $minuteAgg[$mk].durations) { $overallDurations.Add($d) | Out-Null }
}
$overallP95 = Compute-Percentile -Values $overallDurations -p 0.95

# Overall status distribution
$overallStatus = @{
    '2xx' = 0; '3xx' = 0; '4xx' = 0; '5xx' = 0
}
foreach ($mk in $sortedMinutes) {
    foreach ($k in $overallStatus.Keys) {
        $overallStatus[$k] += $minuteAgg[$mk].status[$k]
    }
}

$chart = [ordered]@{
    timezone     = $tz.Id
    processDate  = [System.TimeZoneInfo]::ConvertTime([DateTimeOffset]::Now, $tz).ToString('yyyy-MM-dd')
    window       = if ($winSpecified) { @{ start = $winStart.ToString('o'); end = $winEnd.ToString('o') } } else { $null }
    minutes      = $series
    summary      = @{
        totalRequests = $totalRequests
        overallP95    = if ($overallP95 -ne $null) { [math]::Round($overallP95,3) } else { $null }
        status        = $overallStatus
        filesProcessed= $processedFiles
        badLines      = $badLines
    }
}

$chartJson = $chart | ConvertTo-Json -Depth 6
Set-Content -Path $chartPath -Value $chartJson -Encoding $csvEncoding
Write-Color -Text @("Written chart data JSON: ", $chartPath) -Color Green

# Report.md
$reportLines = New-Object System.Collections.Generic.List[string]
$reportLines.Add("# Access Log Daily Report") | Out-Null
$reportLines.Add("") | Out-Null
$reportLines.Add(("Process Date: {0}" -f ([System.TimeZoneInfo]::ConvertTime([DateTimeOffset]::Now, $tz).ToString('yyyy-MM-dd')))) | Out-Null
$reportLines.Add(("Timezone: {0}" -f $tz.Id)) | Out-Null
$reportLines.Add(("Files Processed: {0}" -f $processedFiles)) | Out-Null
if ($winSpecified) { $reportLines.Add(("Window: {0} -> {1}" -f $winStart.ToString('u'), $winEnd.ToString('u'))) | Out-Null }
$reportLines.Add("") | Out-Null

$reportLines.Add("## Key Metrics") | Out-Null
$reportLines.Add(("- Total Requests: {0}" -f $totalRequests)) | Out-Null
$reportLines.Add(("- Overall P95 (s): {0}" -f (if ($overallP95 -ne $null) { [string]::Format("{0:F3}", $overallP95) } else { "N/A" }))) | Out-Null
$reportLines.Add(("- Status Distribution: 2xx={0}, 3xx={1}, 4xx={2}, 5xx={3}" -f $overallStatus['2xx'],$overallStatus['3xx'],$overallStatus['4xx'],$overallStatus['5xx'])) | Out-Null
$reportLines.Add(("- Bad Lines Skipped: {0}" -f $badLines)) | Out-Null
$reportLines.Add("") | Out-Null

# Trend overview
$reportLines.Add("## Trend Overview") | Out-Null
if ($sortedMinutes.Count -gt 0) {
    $firstMin = $sortedMinutes[0]
    $lastMin = $sortedMinutes[-1]
    $peak = $sortedMinutes | ForEach-Object { [pscustomobject]@{ minute=$_; req=$minuteAgg[$_].requests } } | Sort-Object -Property req -Descending | Select-Object -First 1
    $avgReqPerMin = if ($sortedMinutes.Count -gt 0) {
        [math]::Round(($totalRequests / $sortedMinutes.Count), 2)
    } else { 0 }
    $reportLines.Add(("- Range: {0} -> {1}" -f $firstMin, $lastMin)) | Out-Null
    $reportLines.Add(("- Avg Requests/Min: {0}" -f $avgReqPerMin)) | Out-Null
    $reportLines.Add(("- Peak: {0} requests at {1}" -f $peak.req, $peak.minute)) | Out-Null
} else {
    $reportLines.Add("- No data.") | Out-Null
}
$reportLines.Add("") | Out-Null

# Top 10 slow requests
$reportLines.Add("## Top 10 Slow Requests") | Out-Null
if ($topSlow.Count -gt 0) {
    $topSlowSorted = $topSlow | Sort-Object -Property Duration -Descending | Select-Object -First 10
    $reportLines.Add("| Time | Method | Path | Status | Duration (s) |") | Out-Null
    $reportLines.Add("|---|---|---|---|---:|") | Out-Null
    foreach ($r in $topSlowSorted) {
        $reportLines.Add(("| {0} | {1} | {2} | {3} | {4:F3} |" -f $r.Time.ToString('u'), $r.Method, $r.Path, $r.Status, $r.Duration)) | Out-Null
    }
} else {
    $reportLines.Add("- No slow requests captured.") | Out-Null
}
$reportLines.Add("") | Out-Null

# Hot paths
$reportLines.Add("## Top 10 Hot Paths") | Out-Null
if ($pathCounts.Count -gt 0) {
    $topPaths = $pathCounts.GetEnumerator() | Sort-Object -Property Value -Descending | Select-Object -First 10
    $reportLines.Add("| Path | Requests |") | Out-Null
    $reportLines.Add("|---|---:|") | Out-Null
    foreach ($p in $topPaths) {
        $reportLines.Add(("| {0} | {1} |" -f $p.Key, $p.Value)) | Out-Null
    }
} else {
    $reportLines.Add("- No paths found.") | Out-Null
}
$reportLines.Add("") | Out-Null

# Write report.md
Set-Content -Path $reportPath -Value ($reportLines -join [Environment]::NewLine) -Encoding utf8
Write-Color -Text @("Written Markdown report: ", $reportPath) -Color Green

# Final note
Write-Color -Text @("Done. Reports in '", $ReportsDir, "'") -Color DarkGreen

示例详情

解决的问题

帮助用户快速生成高质量的任务脚本,覆盖输入和输出方案设计,并明确操作约束,提升开发效率与准确性。

适用用户

开发工程师

需要快速实现特定功能脚本的开发者,通过提示词一键生成高质量代码,节省大量时间和精力。

非技术背景的产品经理

借助提示词生成可用的脚本样例,用于原型验证或者传递具体需求,跨越技术沟通的障碍。

数据分析师

需要简单脚本来高效处理数据的用户,通过提示节省手动编码的时间,集中精力在数据分析上。

特征总结

根据需求快速生成脚本代码,支持多种编程语言,让任务自动化开发变得更高效。
智能读取用户提供的输入源信息,并根据场景生成定制化脚本,提高开发效率。
精准输出符合指定格式的代码内容,确保结果匹配业务需求,降低手动修改成本。
灵活适配多种约束条件,包括指定依赖库、限制条件或复杂参数配置,让生成脚本更加精准。
一键生成完整的脚本逻辑,涵盖输入、输出、流程控制等核心部分,减少代码框架搭建时间。
支持多场景需求脚本开发,从数据处理到业务流程自动化,助力用户快速实践创意。
降低技术壁垒,即便是不熟悉领域或新手开发者也能通过友好的提示创建实用脚本。
为复杂编程任务提供智能辅助,大幅缩短开发周期,提升代码编写质量。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 153 tokens
- 5 个可调节参数
{ 脚本语言 } { 脚本功能描述 } { 输入来源 } { 输出目标及格式 } { 限制条件 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59