×
¥
查看详情
🔥 会员专享 文生文 数据处理

数据转换逻辑设计

👁️ 378 次查看
📅 Nov 5, 2025
💡 核心价值: 提供数据从一种格式转换为另一种格式的逻辑解决方案。

🎯 可自定义参数(3个)

当前数据格式
当前数据格式,例如:JSON。
目标数据格式
目标数据格式,例如:CSV。
输出语言
输出语言,例如:中文。

🎨 效果示例

以下为将 NDJSON(每行一个 JSON 对象)转换为 CSV 的标准化转换逻辑及参考实现。重点覆盖模式推断、嵌套展开、数组处理、缺失值、类型与转义、错误与性能。

一、输入与输出定义

  • 输入:UTF-8 编码 NDJSON 文件;每行一个合法 JSON 对象。
  • 输出:UTF-8 编码 CSV 文件;第一行为表头;后续为数据行。

二、模式与列顺序(Schema)

  • 优先使用预定义模式(推荐):提前声明要输出的列及其路径映射,确保一次扫描即可输出。
  • 无预定义模式时:
    • 两次扫描策略:第一次扫描收集所有行扁平化后的键集合,生成表头;第二次扫描写数据。
    • 列顺序建议:稳定排序(例如字典序),或按照业务优先级固定顺序。
  • 键名规范化:若原始键含有分隔符(如“.”),建议统一替换为“_”,避免与路径分隔符冲突。

三、嵌套结构展开规则(对象/字典)

  • 扁平化策略(dot path,默认):
    • 嵌套对象按 “父.子” 路径生成列名。例如 {"a":{"b":1}} → 列 "a.b" = 1。
  • 键冲突处理:
    • 若不同路径产生相同列名(因键名重复或替换规则),需加前缀或使用枚举策略(如 "a.b" 与 "a_b" 冲突时,优先原始 dot path,并将替换后的冲突键重命名为 "a_b__dup")。

四、数组处理策略 数组是 CSV 转换的关键,需明确策略(可按字段配置):

  • 连接(join,单行):将数组按分隔符连接为字符串。
    • 原子类型数组(string/number/bool)可直接连接,如 tags=["a","b"] → "a|b"。
    • 对象数组:可选择将每个元素序列化为紧凑 JSON 再连接,如 items=[{"id":1},{"id":2}] → {"id":1}|{"id":2}。
  • 展开(explode,多行):数组产生多行;如一个字段配置为 explode,则每个元素生成一行。
    • 多数组字段同时 explode 时需生成笛卡尔积(行数可能急剧膨胀,需审慎)。
  • 选择策略建议:
    • 明确指定需要分析明细的数组字段 explode;其它字段 join。
    • 若存在多个数组字段,仅允许一个 explode,或限制在小规模的笛卡尔积范围(如设定最大乘积阈值)。

五、数据类型与格式

  • 字符串保留原样,必要时进行转义。
  • 数字以十进制字符串输出;避免科学计数法(如需要,可格式化)。
  • 布尔转为 true/false。
  • 时间戳:若有标准格式,统一输出为 ISO 8601(建议在预处理/映射层完成);否则原样。
  • 二进制/超长字段:建议 base64 或摘要化(取决于业务),避免污染 CSV。

六、空值与缺失

  • JSON null → CSV 空单元格。
  • 缺失字段(行中不存在某列) → CSV 空单元格。
  • 明确区分空字符串 "" 与缺失/NULL。

七、CSV 写出与转义

  • 分隔符:逗号,或根据业务使用制表符(TSV)。
  • 引号策略:最小引号(仅当字段包含分隔符、引号或换行时加双引号),内部双引号转义为两个双引号。
  • 换行:禁止字段跨行;若数据包含换行,需被引号包裹。
  • 编码:UTF-8,无 BOM;必要时统一规范化为 NFC。
  • 表头固定且写一次。

八、错误与数据质量

  • 每行独立解析;遇到无效 JSON 行:
    • 记录至错误文件(包含行号与内容),跳过或中止(可配置)。
  • 类型不一致(同一列出现多种类型):
    • 默认转字符串;可记录类型冲突警告。
  • 超长字段或笛卡尔膨胀:
    • 设置行长度和最大展开行数的上限;超过则丢入隔离区。

九、性能与可扩展性

  • 流式处理:按行读取与写出,降低内存占用。
  • 压缩:支持 .gz 输入;输出按需压缩。
  • 大文件:两次扫描时避免将整文件载入内存;使用临时文件缓存或先收集 schema 再二次读取。
  • 并行:可按文件分片或行块并行处理,合并时确保表头一致。
  • 大数据场景:优先使用分布式引擎(Spark/Flink),通过 DataFrame 展开与写出 CSV。

十、参考实现(Python,流式,支持 join 和可选单字段 explode) 说明:

  • 默认扁平化对象并将数组 join 为字符串。
  • 可配置一个字段进行 explode(多字段 explode 需扩展笛卡尔逻辑)。
  • 两次扫描推断表头;若有预定义 schema,可跳过第一次扫描。

示例代码:

  • 依赖标准库;如需更高性能,json 可替换为 orjson。
import json
import csv
import gzip
import os
from typing import Any, Dict, List, Iterable, Tuple, Optional

def open_maybe_gzip(path: str, mode: str = 'rt', encoding: str = 'utf-8'):
    if path.endswith('.gz'):
        return gzip.open(path, mode, encoding=encoding)
    return open(path, mode, encoding=encoding)

def sanitize_key(key: str) -> str:
    # 避免路径分隔冲突
    return key.replace('.', '_')

def flatten(obj: Any, parent_key: str = '', sep: str = '.') -> Dict[str, Any]:
    """对象扁平化;数组按原样保留为 Python list(后续统一处理)。"""
    items: Dict[str, Any] = {}
    if isinstance(obj, dict):
        for k, v in obj.items():
            k2 = sanitize_key(k)
            new_key = f"{parent_key}{sep}{k2}" if parent_key else k2
            items.update(flatten(v, new_key, sep))
    elif isinstance(obj, list):
        # 暂存 list,后续根据策略处理
        items[parent_key] = obj
    else:
        items[parent_key] = obj
    return items

def to_str(value: Any) -> str:
    if value is None:
        return ''
    if isinstance(value, (int, float, bool)):
        return str(value)
    if isinstance(value, str):
        return value
    # 其他类型(如对象、列表)统一序列化为紧凑 JSON
    return json.dumps(value, ensure_ascii=False, separators=(',', ':'))

def process_arrays(row: Dict[str, Any],
                   join_delim: str = '|',
                   explode_field: Optional[str] = None) -> Iterable[Dict[str, Any]]:
    """
    将数组处理为 join 或对指定字段进行 explode。
    explode_field 为扁平化后的列名(例如 'items' 或 'a.b.items')。
    如果 explode_field 不存在或不是数组,则按 join 输出单行。
    """
    # 准备一个可变副本
    base = dict(row)

    # 先对非 explode 的数组进行 join
    for key, val in list(base.items()):
        if isinstance(val, list):
            if explode_field and key == explode_field:
                # 留待后续处理
                continue
            # 原子类型数组直接 join;对象数组先序列化每个元素再 join
            joined = []
            for el in val:
                if isinstance(el, (str, int, float, bool)) or el is None:
                    joined.append(to_str(el))
                else:
                    joined.append(json.dumps(el, ensure_ascii=False, separators=(',', ':')))
            base[key] = join_delim.join(joined)

    # 处理 explode 字段
    if explode_field and isinstance(row.get(explode_field), list):
        arr = row[explode_field]
        if len(arr) == 0:
            # 空数组 → 生成一行,对该列留空
            out = dict(base)
            out[explode_field] = ''
            yield out
            return
        for el in arr:
            out = dict(base)
            # 元素为对象时可进一步扁平化到子列;否则直接字符串化
            if isinstance(el, dict):
                sub = flatten(el, explode_field)
                out.update({k: to_str(v) for k, v in sub.items()})
                # 同时保留原 explode 列的摘要(可选)
                out[explode_field] = json.dumps(el, ensure_ascii=False, separators=(',', ':'))
            else:
                out[explode_field] = to_str(el)
            yield out
    else:
        yield base

def infer_header(ndjson_path: str,
                 explode_field: Optional[str] = None) -> List[str]:
    cols = set()
    with open_maybe_gzip(ndjson_path, 'rt') as f:
        for lineno, line in enumerate(f, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
            except Exception:
                # 记录错误,跳过
                # 生产环境应写入错误通道
                continue
            flat = flatten(obj)
            # 先进行数组处理(为了捕获因对象数组展开产生的子列)
            for processed in process_arrays(flat, explode_field=explode_field):
                for k in processed.keys():
                    cols.add(k)
    return sorted(cols)

def ndjson_to_csv(in_path: str,
                  out_path: str,
                  predefined_header: Optional[List[str]] = None,
                  explode_field: Optional[str] = None,
                  join_delim: str = '|'):
    header = predefined_header or infer_header(in_path, explode_field=explode_field)

    # 写 CSV
    with open_maybe_gzip(in_path, 'rt') as fin, open(out_path, 'w', encoding='utf-8', newline='') as fout:
        writer = csv.DictWriter(fout, fieldnames=header, extrasaction='ignore', quoting=csv.QUOTE_MINIMAL)
        writer.writeheader()
        for lineno, line in enumerate(fin, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
            except Exception:
                # 记录并跳过
                # 可将错误行写入 out_path + '.error'
                continue
            flat = flatten(obj)
            for processed in process_arrays(flat, join_delim=join_delim, explode_field=explode_field):
                # 缺失列填空
                row = {k: to_str(processed.get(k)) for k in header}
                writer.writerow(row)

# 使用示例:
# ndjson_to_csv('input.ndjson', 'output.csv', explode_field='items', join_delim='|')
# 若有预定义列:ndjson_to_csv('input.ndjson', 'output.csv', predefined_header=['id','a.b','items','tags'])

十一、Spark 转换(适用于大规模数据)

  • 读入 NDJSON:spark.read.json("path")。
  • 扁平化:对 struct 使用 selectExpr 展开为列;对数组:
    • join:使用 concat_ws('|', col('tags'))
    • explode:使用 explode(col('items')) 并按需对元素 struct 再次展开。
  • 写出:df.write.option("header", True).csv("out_path").

十二、配置建议

  • 配置文件声明:
    • 列映射:JSONPath 或 dot path → 列名。
    • 数组策略:每列选择 join/explode,join 的分隔符。
    • 键名清洗规则与冲突策略。
    • 错误行处理策略(跳过/隔离/中止)。
    • 性能阈值(最大展开乘积、最大字段长度)。

该逻辑在数据工程实践中具备可移植性与可扩展性,既能快速在单机脚本中落地,也能在分布式计算框架中保持一致的语义与质量控制。

TSV conversion logic for logs

Scope

  • Input: arbitrary application logs in either text or JSON-lines.
  • Output: tab-separated values (TSV) with a defined header. One record per log event.
  • Constraints: streaming-friendly, robust to missing fields, tabs/newlines in values, mixed formats, and occasional multi-line entries.
  1. Target TSV schema Use a fixed schema to maintain streaming compatibility. Recommended minimal columns:
  • ts_iso: normalized timestamp in ISO 8601 (UTC if tz provided; otherwise keep naive).
  • level: normalized log level (TRACE/DEBUG/INFO/WARN/ERROR/FATAL).
  • service: service or application name (if discoverable).
  • host: hostname or source (if discoverable).
  • logger: logger name/category (if present).
  • thread: thread identifier (if present).
  • request_id: request or correlation id (if present).
  • user_id: user identifier (if present).
  • client_ip: client IP (if present).
  • method: HTTP method (if present).
  • path: URL path (if present).
  • status: HTTP status (if present).
  • latency_ms: request latency in milliseconds (if parseable).
  • message: human-readable message portion.
  • extras_json: JSON-encoded string of any remaining key-value pairs.
  • raw: original log line for auditability.

Notes:

  • If your environment requires a different set of fixed columns, adjust accordingly. Keep an extras_json catch-all for forward compatibility.
  1. Detection and parsing strategy The parser operates per line and chooses one of two branches:

A) JSON log line

  • Condition: line starts with “{” after leading whitespace.
  • Action:
    • Parse JSON into a dict; if parsing fails, treat as text log (branch B).
    • Flatten to a single level: nested keys joined by dots (e.g., http.request.method → http.request.method) or use a predetermined mapping.
    • Map known fields into the target schema (see section 3).
    • Put unmapped fields into extras_json.

B) Text log line with optional key=value pairs

  • Identify timestamp:
    • Try common patterns via regex and datetime parsing:
      • YYYY-MM-DD HH:MM:SS,mmm
      • YYYY-MM-DD HH:MM:SS
      • YYYY-MM-DDTHH:MM:SSZ
      • YYYY-MM-DDTHH:MM:SS.sssZ
      • Syslog-like: MMM dd HH:MM:SS (year unknown; if no year, keep as original string)
  • Identify level token (case-insensitive): TRACE|DEBUG|INFO|WARN|WARNING|ERROR|CRITICAL|FATAL.
  • Extract key=value pairs:
    • Regex: (\w[\w.-])=("([^"\]|\.)"|\S+)
      • Supports quoted values with spaces; unescape quotes and backslashes.
      • Key may include dot or dash.
  • Message extraction:
    • Take the substring after [timestamp][optional logger/thread][level] up to the first key=value pair; trim.
    • If no key=value pairs, the remainder is the message.
  • Map known keys to schema; place remaining pairs in extras_json.
  1. Field mapping and normalization Mapping rules (examples; adapt to your environment):
  • Timestamp (ts_iso):

    • For JSON logs: prefer fields like time, timestamp, ts, @timestamp.
    • For text logs: use the first recognized timestamp token.
    • Normalization: convert to ISO 8601. If timezone provided, convert to UTC (e.g., 2025-09-26T12:05:34.123Z). If no timezone, output naive ISO 8601 without Z.
    • If parsing fails, leave ts_iso empty and retain the original in extras_json.timestamp_raw.
  • Level:

    • Map aliases: WARNING→WARN, CRITICAL→ERROR (or FATAL if preferred), VERBOSE→DEBUG.
    • Output uppercase canonical values.
  • Service, host, logger, thread:

    • From keys: service, app, application; host, hostname; logger, category; thread, tid. If not present, leave empty.
  • HTTP-related:

    • method from method, http_method, http.request.method.
    • path from path, url, uri, http.request.path.
    • status from status, http.status, response.status.
    • client_ip from client_ip, ip, remote_addr, http.client_ip.
  • Request/user identifiers:

    • request_id from request_id, req_id, correlation_id, trace_id, x_request_id.
    • user_id from user_id, uid, user.id, principal.
  • Latency:

    • Accept values like “123ms”, “1.5s”, “800us”, numeric milliseconds.
    • Normalize:
      • ms → value
      • s → value × 1000
      • us/µs → value / 1000
      • ns → value / 1_000_000
    • If unitless integer/float, treat as milliseconds.
    • If unparsable, leave latency_ms empty and persist original under extras_json.latency_raw.
  • Message:

    • If JSON logs: use message, msg, log, event as primary; else let message be empty.
    • For text logs: the extracted message substring.
  • extras_json:

    • JSON-encode remaining unmapped key-value pairs.
    • Ensure tab/newline escaping (see section 4).
  • raw:

    • The original line, unmodified (except escaping tabs/newlines when outputting TSV).
  1. Escaping and TSV output rules
  • Separator: single tab character between fields.
  • Header: emit once at the start in the specified column order.
  • Value sanitation:
    • Replace literal tab with \t.
    • Replace newline with \n and carriage return with \r.
    • Remove or escape other control characters as needed.
    • For extras_json, ensure it is a compact JSON string with the same escaping rules applied to embedded tabs/newlines.
  • Missing values: output as empty string.
  • No quoting in TSV; rely on escaping.
  1. Multi-line log handling
  • If a line lacks a timestamp and level and does not look like JSON, treat it as a continuation of the previous record’s message.
  • Append to previous message with a literal \n inserted.
  • If strict one-line-per-record is required, disable multi-line merge and include the entire raw line as a separate row with empty fields.
  1. Example

Input lines:

  1. 2025-09-26 12:05:34,123 INFO request_id=abc123 user_id=42 method=GET path=/api/v1/items latency=12ms Completed request
  2. {"time":"2025-09-26T12:05:35.001+00:00","level":"warn","service":"checkout","http":{"method":"POST","path":"/orders"},"latency_ms":75,"message":"Payment pending"}

Output TSV (header + rows): ts_iso level service host logger thread request_id user_id client_ip method path status latency_ms message extras_json raw 2025-09-26T12:05:34.123 INFO abc123 42 GET /api/v1/items 12 Completed request {} 2025-09-26 12:05:34,123 INFO request_id=abc123 user_id=42 method=GET path=/api/v1/items latency=12ms Completed request 2025-09-26T12:05:35.001Z WARN checkout {"http.method":"POST","http.path":"/orders"} 75 Payment pending {"http.method":"POST","http.path":"/orders"} {"time":"2025-09-26T12:05:35.001+00:00","level":"warn","service":"checkout","http":{"method":"POST","path":"/orders"},"latency_ms":75,"message":"Payment pending"}

Note: Tabs are shown conceptually above; actual output will use tab characters. Newlines/tabs inside fields must be escaped.

  1. Reference implementation (Python, streaming)

This implementation handles both JSON and text logs, performs normalization, and writes TSV to stdout. It avoids non-standard libraries.

import sys
import json
import re
from datetime import datetime, timezone

# Precompiled regexes
TS_PATTERNS = [
    ("%Y-%m-%d %H:%M:%S,%f", None),
    ("%Y-%m-%d %H:%M:%S", None),
    ("%Y-%m-%dT%H:%M:%S.%f%z", "utc"),
    ("%Y-%m-%dT%H:%M:%S%z", "utc"),
    ("%Y-%m-%dT%H:%M:%S.%fZ", "z"),
    ("%Y-%m-%dT%H:%M:%SZ", "z"),
]
LEVEL_RE = re.compile(r'\b(trace|debug|info|warn|warning|error|critical|fatal)\b', re.IGNORECASE)
KV_RE = re.compile(r'(\w[\w\.\-]*)=("([^"\\]|\\.)*"|\S+)')

HEADER = [
    "ts_iso","level","service","host","logger","thread","request_id","user_id","client_ip",
    "method","path","status","latency_ms","message","extras_json","raw"
]

def normalize_level(lvl):
    if not lvl: return ""
    l = lvl.upper()
    if l == "WARNING": return "WARN"
    if l == "CRITICAL": return "ERROR"  # adjust to FATAL if preferred
    return l

def parse_ts(s):
    s = s.strip()
    for fmt, mode in TS_PATTERNS:
        try:
            if "%z" in fmt:
                dt = datetime.strptime(s, fmt)
                if mode == "utc":
                    return dt.astimezone(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
                elif mode == "z":
                    # Already Zulu; ensure milliseconds where possible
                    if dt.microsecond:
                        return dt.isoformat(timespec="milliseconds").replace("+00:00", "Z")
                    return dt.replace(tzinfo=timezone.utc).isoformat().replace("+00:00","Z")
            else:
                dt = datetime.strptime(s, fmt)
                # naive timestamp
                if "%f" in fmt:
                    return dt.isoformat(timespec="milliseconds")
                else:
                    return dt.isoformat()
        except ValueError:
            continue
    return ""  # failed

def sanitize(v):
    if v is None: return ""
    s = str(v)
    return s.replace("\t","\\t").replace("\n","\\n").replace("\r","\\r")

def unquote(value):
    # Remove surrounding quotes and unescape \" and \\ inside
    if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
        inner = value[1:-1]
        inner = inner.replace('\\"', '"').replace('\\\\', '\\')
        return inner
    return value

def normalize_latency(v):
    if v is None: return ""
    s = str(v).strip().lower()
    try:
        if s.endswith("ms"):
            return str(float(s[:-2]))
        if s.endswith("s"):
            return str(float(s[:-1]) * 1000.0)
        if s.endswith("us") or s.endswith("µs"):
            return str(float(s[:-2]) / 1000.0)
        if s.endswith("ns"):
            return str(float(s[:-2]) / 1_000_000.0)
        # unitless: assume milliseconds
        return str(float(s))
    except ValueError:
        return ""  # leave raw in extras_json if needed

def parse_json_line(line):
    try:
        obj = json.loads(line)
    except json.JSONDecodeError:
        return None
    flat = {}

    def flatten(prefix, val):
        if isinstance(val, dict):
            for k, v in val.items():
                nk = f"{prefix}.{k}" if prefix else k
                flatten(nk, v)
        else:
            flat[prefix] = val

    flatten("", obj)

    row = {k: "" for k in HEADER}
    row["raw"] = line.rstrip("\n")

    # Timestamp candidates
    for key in ("@timestamp","time","timestamp","ts"):
        if key in flat:
            row["ts_iso"] = parse_ts(str(flat[key]))
            if not row["ts_iso"]:
                row["extras_json"] = json.dumps({"timestamp_raw": flat[key]}, separators=(",",":"))
            break

    row["level"]   = normalize_level(flat.get("level") or flat.get("log.level") or flat.get("severity"))
    row["service"] = str(flat.get("service") or flat.get("app") or flat.get("application") or "")

    row["host"]    = str(flat.get("host") or flat.get("hostname") or "")
    row["logger"]  = str(flat.get("logger") or flat.get("log.logger") or "")
    row["thread"]  = str(flat.get("thread") or flat.get("tid") or "")
    row["request_id"] = str(flat.get("request_id") or flat.get("req_id") or flat.get("correlation_id") or flat.get("trace_id") or "")
    row["user_id"] = str(flat.get("user_id") or flat.get("user.id") or flat.get("uid") or "")
    row["client_ip"] = str(flat.get("client_ip") or flat.get("ip") or flat.get("remote_addr") or flat.get("http.client_ip") or "")

    row["method"]  = str(flat.get("method") or flat.get("http.method") or flat.get("http.request.method") or "")
    row["path"]    = str(flat.get("path") or flat.get("url") or flat.get("uri") or flat.get("http.path") or flat.get("http.request.path") or "")
    row["status"]  = str(flat.get("status") or flat.get("http.status") or flat.get("response.status") or "")

    latency = flat.get("latency_ms") or flat.get("latency") or flat.get("http.latency") or None
    row["latency_ms"] = normalize_latency(latency)

    row["message"] = str(flat.get("message") or flat.get("msg") or flat.get("log") or flat.get("event") or "")

    # Build extras from unmapped keys
    mapped = set([
        "@timestamp","time","timestamp","ts","level","log.level","severity","service","app","application",
        "host","hostname","logger","log.logger","thread","tid","request_id","req_id","correlation_id","trace_id",
        "user_id","user.id","uid","client_ip","ip","remote_addr","http.client_ip",
        "method","http.method","http.request.method","path","url","uri","http.path","http.request.path",
        "status","http.status","response.status","latency","latency_ms","http.latency","message","msg","log","event"
    ])
    extras = {k: v for k, v in flat.items() if k not in mapped}
    row["extras_json"] = json.dumps(extras, separators=(",",":")) if extras else "{}"

    return row

def parse_text_line(line):
    row = {k: "" for k in HEADER}
    row["raw"] = line.rstrip("\n")

    s = line.strip()

    # Attempt to find timestamp at start substrings
    ts_match = None
    # Heuristic: check first ~40 chars for timestamp tokens separated by space or 'T'
    head = s[:40]
    # Try direct parsing by slicing common lengths
    for token_len in (23, 19, 20, 24):  # rough candidates
        token = head[:token_len]
        ts_iso = parse_ts(token)
        if ts_iso:
            row["ts_iso"] = ts_iso
            ts_match = token
            break

    # Level
    level_m = LEVEL_RE.search(s)
    if level_m:
        row["level"] = normalize_level(level_m.group(0))

    # KV pairs
    kvs = []
    for m in KV_RE.finditer(s):
        key = m.group(1)
        val = unquote(m.group(2))
        kvs.append((key, val))

    # Message: substring after level up to first kv pair
    msg = s
    if level_m:
        start = level_m.end()
        msg = s[start:].strip()
    if kvs:
        first_kv_start = s.find(kvs[0][0] + "=")
        if first_kv_start != -1:
            msg = s[:first_kv_start].strip()
    row["message"] = msg

    # Map common keys
    def get(klist):
        for k in klist:
            for kk, vv in kvs:
                if kk == k:
                    return vv
        return None

    row["service"]    = get(["service","app","application"]) or ""
    row["host"]       = get(["host","hostname"]) or ""
    row["logger"]     = get(["logger","category"]) or ""
    row["thread"]     = get(["thread","tid"]) or ""
    row["request_id"] = get(["request_id","req_id","correlation_id","trace_id","x_request_id"]) or ""
    row["user_id"]    = get(["user_id","uid","user.id","principal"]) or ""
    row["client_ip"]  = get(["client_ip","ip","remote_addr"]) or ""
    row["method"]     = get(["method","http_method"]) or ""
    row["path"]       = get(["path","url","uri"]) or ""
    row["status"]     = get(["status","http_status"]) or ""
    row["latency_ms"] = normalize_latency(get(["latency","latency_ms","duration","elapsed"]))

    # Build extras
    mapped_keys = set([
        "service","app","application","host","hostname","logger","category","thread","tid",
        "request_id","req_id","correlation_id","trace_id","x_request_id",
        "user_id","uid","user.id","principal","client_ip","ip","remote_addr",
        "method","http_method","path","url","uri","status","http_status","latency","latency_ms","duration","elapsed"
    ])
    extras = {k: v for (k, v) in kvs if k not in mapped_keys}
    row["extras_json"] = json.dumps(extras, separators=(",",":")) if extras else "{}"

    return row

def emit_tsv_row(row):
    fields = [sanitize(row.get(col, "")) for col in HEADER]
    sys.stdout.write("\t".join(fields) + "\n")

def main():
    # Write header
    sys.stdout.write("\t".join(HEADER) + "\n")
    for line in sys.stdin:
        stripped = line.lstrip()
        parsed = None
        if stripped.startswith("{"):
            parsed = parse_json_line(line)
        if parsed is None:
            parsed = parse_text_line(line)
        emit_tsv_row(parsed)

if __name__ == "__main__":
    main()
  1. Operational guidance
  • Run as: python log_to_tsv.py < input.log > output.tsv
  • For large files, consider:
    • Using buffered I/O (default in Python is sufficient for most cases).
    • Precompiling all regexes (done).
    • Avoiding per-line JSON dumps for empty extras (use "{}" constant as shown).
    • Parallelization: sharding input files and merging outputs; ensure identical headers.
  • Monitoring and testing:
    • Create unit tests with representative samples for each log format you encounter.
    • Validate TSV with a reader that checks column counts and escaping.
    • Track parse failure rates (e.g., empty ts_iso or level) and revisit patterns accordingly.

This logic establishes a deterministic, streaming-safe conversion of diverse logs to TSV suitable for ingestion into data warehouses or lakehouse tables. Adjust field mappings to your domain, and keep extras_json for schema evolution.

以下は、JSONをYAMLへ安全かつ忠実に変換するための実務的な変換ロジックです。目的は「JSONの型・値の意味を保ち、YAMLの解釈による型誤推測を回避する」ことです。YAML 1.2準拠を前提に記述します。

前提

  • JSONはYAML 1.2の厳密な部分集合であり、原則として型対応は1対1で可能。
  • ただしYAMLパーサ(特にYAML 1.1系)による曖昧なスカラー解釈(例: yes/no, on/off のブーリアン解釈)を避けるため、文字列の引用規則を設ける。
  • JSONのオブジェクトのキーは文字列のみ。YAMLのマッピングキーも文字列で出力する。
  • 順序は入力順を維持(一般的なJSONパーサでは挿入順を保持できる)。

型マッピング規則

  • Object(JSON)→ Mapping(YAML)
    • { "k": v } → k: v
    • インデントは2スペース推奨。
    • ドキュメント開始記号(---)は任意。複数ドキュメント扱いが不要なら省略可。
  • Array(JSON)→ Sequence(YAML)
    • [a, b] →
      • a
      • b
    • 順序はそのまま保持。
  • String(JSON)→ Scalar(YAML)
    • 文字列は「プレーン」「ダブルクォート」「リテラルブロック(|)」のいずれかで表現。
    • 判定ロジックは後述。
  • Number(JSON)→ Number(YAML)
    • 整数・浮動小数をそのまま文字列表現へ。
    • JSONにNaN/Infinityは存在しない前提(出力しない)。
  • Boolean(JSON)→ Boolean(YAML)
    • true → true、false → false(小文字)
  • Null(JSON)→ Null(YAML)
    • null → null(~は使わない方針)
  • 日付/時刻
    • JSONでは文字列。YAML側で型推測されないよう文字列のまま出力(必要に応じて引用)。

文字列の引用判定ロジック(YAML 1.2を前提に安全運用)

  • 以下のいずれかに該当する場合はダブルクォートで囲む("...")。
    • 先頭または末尾に空白がある
    • 改行を含まないが、次の文字を含む/先頭が危険文字
      • 「#」(コメント開始になりうる)
      • 「: 」(コロンに続くスペースはマッピング記号と衝突)
      • 先頭が「-」「?」「@」「`」「!」「*」「&」「%」「|」「>」
    • JSONの文字列値が次の曖昧トークンに完全一致(大小問わず)
      • "true", "false", "null", "~", "yes", "no", "on", "off"
      • 注: YAML 1.2では yes/no/on/off はブーリアンではないが、1.1系を避けるため安全に引用する
    • ISO8601風の日時文字列(例: 2024-01-01T12:00:00Z 等)はパーサ差異対策として引用推奨
    • 制御文字や非表示文字を含む(JSONのエスケープをYAMLのダブルクォート内で保持)
  • 文字列に改行が含まれる場合はリテラルブロック(|)で出力し、改行を忠実に保持
    • 末尾改行の有無も正確に維持
  • 上記に該当しない場合のみプレーンスカラー(引用なし)で出力
    • プレーン許容文字の目安: 英数字、ハイフン(-)、アンダースコア(_)、ドット(.)、スラッシュ(/)
    • 空文字はダブルクォートで "" とする

出力フォーマット規則

  • インデントは2スペース
  • シーケンスは「- 」で要素を列挙
  • マッピングは「key: value」形式
  • 必要に応じてドキュメント開始記号「---」を付与
  • アンカー(&)やエイリアス(*)、タグ(!!)は生成しない(JSON互換の維持)

変換アルゴリズム(擬似コード)

  • function to_yaml(value, indent=0):
    • switch type(value):
      • dict:
        • for (k, v) in insertion_order(value):
          • k_out = quote_key_if_needed(k) // キーも文字列。通常はプレーンだが危険時はダブルクォート
          • print(indent_spaces(indent) + k_out + ": " + inline_or_child(v, indent))
      • list:
        • for item in value:
          • print(indent_spaces(indent) + "- " + inline_or_child(item, indent))
      • string:
        • if contains_newline(value):
          • print(block_literal("|", value, indent)) // 改行をそのまま保持
        • else if needs_double_quote(value):
          • print(double_quoted(value)) // JSONエスケープはYAMLダブルクォートへ移植
        • else:
          • print(plain(value))
      • bool:
        • print("true" if value else "false")
      • null:
        • print("null")
      • number:
        • print(number_to_string(value)) // 整数/浮動小数を文字列化(指数表記等はそのまま)
  • function needs_double_quote(s):
    • if s == "" return true
    • if leading_or_trailing_space(s) return true
    • if contains_any(s, ["#"]) return true
    • if contains_substring(s, ": ") return true
    • if starts_with_any(s, ["-", "?", "@", "`", "!", "*", "&", "%", "|", ">"]) return true
    • if is_ambiguous_token_ci(s, ["true","false","null","~","yes","no","on","off"]) return true
    • if looks_like_iso8601_timestamp(s) return true
    • if contains_control_char(s) return true
    • return false

Python実装例(ruamel.yamlを使用、YAML 1.2想定)

  • ポイント
    • ruamel.yaml.scalarstring の LiteralScalarString(|)、DoubleQuotedScalarString(")を使ってスタイルを明示。
    • 再帰的にJSON値を走査してスタイル化したオブジェクトへ変換後、YAMLへダンプ。
  • コード
    • from json import loads
    • from ruamel.yaml import YAML
    • from ruamel.yaml.scalarstring import LiteralScalarString, DoubleQuotedScalarString
    • def convert_json_to_yaml_str(json_str):
      • data = loads(json_str)
      • def style_string(s):
        • if "\n" in s:
          • return LiteralScalarString(s) # 改行保持
        • if needs_double_quote(s):
          • return DoubleQuotedScalarString(s)
        • return s # プレーン
      • def transform(node):
        • if isinstance(node, dict):
          • return { (style_string(k) if isinstance(k, str) else k): transform(v) for k, v in node.items() }
        • if isinstance(node, list):
          • return [ transform(x) for x in node ]
        • if isinstance(node, str):
          • return style_string(node)
        • return node # bool, None, number はそのまま
      • yaml = YAML()
      • yaml.version = (1, 2)
      • yaml.indent(mapping=2, sequence=2, offset=0)
      • out = transform(data)
      • import io
      • buf = io.StringIO()
      • yaml.dump(out, buf)
      • return buf.getvalue()
    • 上記の needs_double_quote(s) は前述ロジックをそのままPythonで実装。

検証・テスト観点

  • 型保持テスト
    • JSONのboolean/null/numberがYAMLで同型として再解釈されることを確認。
  • 文字列の安全性テスト
    • "yes", "on", "#comment", ": value" 等が文字列として保持される(引用される)ことを確認。
    • 改行文字列がリテラルブロックで出力され、改行・末尾改行が一致することを確認。
  • 構造・順序
    • 配列順序・オブジェクト挿入順を保持。
  • エッジケース
    • 空文字列、先頭/末尾空白、長大文字列(折り畳み不要方針なら幅制限を外す)、Unicode絵文字・サロゲートペア。

このロジックにより、JSONの意味を損なわず、YAMLのパーサ差異による型誤解釈を最小化した安全な変換が可能です。

示例详情

📖 如何使用

30秒出活:复制 → 粘贴 → 搞定
与其花几十分钟和AI聊天、试错,不如直接复制这些经过千人验证的模板,修改几个 {{变量}} 就能立刻获得专业级输出。省下来的时间,足够你轻松享受两杯咖啡!
加载中...
💬 不会填参数?让 AI 反过来问你
不确定变量该填什么?一键转为对话模式,AI 会像资深顾问一样逐步引导你,问几个问题就能自动生成完美匹配你需求的定制结果。零门槛,开口就行。
转为对话模式
🚀 告别复制粘贴,Chat 里直接调用
无需切换,输入 / 唤醒 8000+ 专家级提示词。 插件将全站提示词库深度集成于 Chat 输入框。基于当前对话语境,系统智能推荐最契合的 Prompt 并自动完成参数化,让海量资源触手可及,从此彻底告别"手动搬运"。
即将推出
🔌 接口一调,提示词自己会进化
手动跑一次还行,跑一百次呢?通过 API 接口动态注入变量,接入批量评价引擎,让程序自动迭代出更高质量的提示词方案。Prompt 会自己进化,你只管收结果。
发布 API
🤖 一键变成你的专属 Agent 应用
不想每次都配参数?把这条提示词直接发布成独立 Agent,内嵌图片生成、参数优化等工具,分享链接就能用。给团队或客户一个"开箱即用"的完整方案。
创建 Agent

✅ 特性总结

一键生成跨格式数据转换思路与步骤,帮助你从现状到目标格式快速落地
自动分析源数据与目标需求,给出清晰字段对应与规则,避免遗漏与偏差
支持多语言输出说明文档,便于团队内外沟通、审阅与交付
提供可复制的转换清单与注意事项,让实施更顺畅、复用更简单
针对不同业务场景给出示例与变体,缩短试错时间,提升上线速度
内置质量校验与异常处理建议,降低数据错误风险,保障结果可靠
可按模板与参数定制转换逻辑,全面适配你独特流程与约束
结合摄取、转换、存储全流程视角,让管道设计更完整、更可维护
输出结构化技术写作风格说明,便于审阅、交接与合规备案
快速比较多种转换方案利弊与适用场景,帮助你做出更稳妥选择

🎯 解决的问题

帮助团队快速将“现有数据格式”无缝转换为“目标数据格式”,在几分钟内生成可执行的转换方案与规则清单,配套质量校验与落地步骤,支持多语言输出,显著降低人为错误与返工,提升数据接入、系统迁移与报表统一的交付速度。

🕒 版本历史

当前版本
v2.1 2024-01-15
优化输出结构,增强情节连贯性
  • ✨ 新增章节节奏控制参数
  • 🔧 优化人物关系描述逻辑
  • 📝 改进主题深化引导语
  • 🎯 增强情节转折点设计
v2.0 2023-12-20
重构提示词架构,提升生成质量
  • 🚀 全新的提示词结构设计
  • 📊 增加输出格式化选项
  • 💡 优化角色塑造引导
v1.5 2023-11-10
修复已知问题,提升稳定性
  • 🐛 修复长文本处理bug
  • ⚡ 提升响应速度
v1.0 2023-10-01
首次发布
  • 🎉 初始版本上线
COMING SOON
版本历史追踪,即将启航
记录每一次提示词的进化与升级,敬请期待。

💬 用户评价

4.8
⭐⭐⭐⭐⭐
基于 28 条评价
5星
85%
4星
12%
3星
3%
👤
电商运营 - 张先生
⭐⭐⭐⭐⭐ 2025-01-15
双十一用这个提示词生成了20多张海报,效果非常好!点击率提升了35%,节省了大量设计时间。参数调整很灵活,能快速适配不同节日。
效果好 节省时间
👤
品牌设计师 - 李女士
⭐⭐⭐⭐⭐ 2025-01-10
作为设计师,这个提示词帮我快速生成创意方向,大大提升了工作效率。生成的海报氛围感很强,稍作调整就能直接使用。
创意好 专业
COMING SOON
用户评价与反馈系统,即将上线
倾听真实反馈,在这里留下您的使用心得,敬请期待。
加载中...