数据转换逻辑设计

179 浏览
17 试用
4 购买
Nov 5, 2025更新

提供数据从一种格式转换为另一种格式的逻辑解决方案。

以下为将 NDJSON(每行一个 JSON 对象)转换为 CSV 的标准化转换逻辑及参考实现。重点覆盖模式推断、嵌套展开、数组处理、缺失值、类型与转义、错误与性能。

一、输入与输出定义

  • 输入:UTF-8 编码 NDJSON 文件;每行一个合法 JSON 对象。
  • 输出:UTF-8 编码 CSV 文件;第一行为表头;后续为数据行。

二、模式与列顺序(Schema)

  • 优先使用预定义模式(推荐):提前声明要输出的列及其路径映射,确保一次扫描即可输出。
  • 无预定义模式时:
    • 两次扫描策略:第一次扫描收集所有行扁平化后的键集合,生成表头;第二次扫描写数据。
    • 列顺序建议:稳定排序(例如字典序),或按照业务优先级固定顺序。
  • 键名规范化:若原始键含有分隔符(如“.”),建议统一替换为“_”,避免与路径分隔符冲突。

三、嵌套结构展开规则(对象/字典)

  • 扁平化策略(dot path,默认):
    • 嵌套对象按 “父.子” 路径生成列名。例如 {"a":{"b":1}} → 列 "a.b" = 1。
  • 键冲突处理:
    • 若不同路径产生相同列名(因键名重复或替换规则),需加前缀或使用枚举策略(如 "a.b" 与 "a_b" 冲突时,优先原始 dot path,并将替换后的冲突键重命名为 "a_b__dup")。

四、数组处理策略 数组是 CSV 转换的关键,需明确策略(可按字段配置):

  • 连接(join,单行):将数组按分隔符连接为字符串。
    • 原子类型数组(string/number/bool)可直接连接,如 tags=["a","b"] → "a|b"。
    • 对象数组:可选择将每个元素序列化为紧凑 JSON 再连接,如 items=[{"id":1},{"id":2}] → {"id":1}|{"id":2}。
  • 展开(explode,多行):数组产生多行;如一个字段配置为 explode,则每个元素生成一行。
    • 多数组字段同时 explode 时需生成笛卡尔积(行数可能急剧膨胀,需审慎)。
  • 选择策略建议:
    • 明确指定需要分析明细的数组字段 explode;其它字段 join。
    • 若存在多个数组字段,仅允许一个 explode,或限制在小规模的笛卡尔积范围(如设定最大乘积阈值)。

五、数据类型与格式

  • 字符串保留原样,必要时进行转义。
  • 数字以十进制字符串输出;避免科学计数法(如需要,可格式化)。
  • 布尔转为 true/false。
  • 时间戳:若有标准格式,统一输出为 ISO 8601(建议在预处理/映射层完成);否则原样。
  • 二进制/超长字段:建议 base64 或摘要化(取决于业务),避免污染 CSV。

六、空值与缺失

  • JSON null → CSV 空单元格。
  • 缺失字段(行中不存在某列) → CSV 空单元格。
  • 明确区分空字符串 "" 与缺失/NULL。

七、CSV 写出与转义

  • 分隔符:逗号,或根据业务使用制表符(TSV)。
  • 引号策略:最小引号(仅当字段包含分隔符、引号或换行时加双引号),内部双引号转义为两个双引号。
  • 换行:禁止字段跨行;若数据包含换行,需被引号包裹。
  • 编码:UTF-8,无 BOM;必要时统一规范化为 NFC。
  • 表头固定且写一次。

八、错误与数据质量

  • 每行独立解析;遇到无效 JSON 行:
    • 记录至错误文件(包含行号与内容),跳过或中止(可配置)。
  • 类型不一致(同一列出现多种类型):
    • 默认转字符串;可记录类型冲突警告。
  • 超长字段或笛卡尔膨胀:
    • 设置行长度和最大展开行数的上限;超过则丢入隔离区。

九、性能与可扩展性

  • 流式处理:按行读取与写出,降低内存占用。
  • 压缩:支持 .gz 输入;输出按需压缩。
  • 大文件:两次扫描时避免将整文件载入内存;使用临时文件缓存或先收集 schema 再二次读取。
  • 并行:可按文件分片或行块并行处理,合并时确保表头一致。
  • 大数据场景:优先使用分布式引擎(Spark/Flink),通过 DataFrame 展开与写出 CSV。

十、参考实现(Python,流式,支持 join 和可选单字段 explode) 说明:

  • 默认扁平化对象并将数组 join 为字符串。
  • 可配置一个字段进行 explode(多字段 explode 需扩展笛卡尔逻辑)。
  • 两次扫描推断表头;若有预定义 schema,可跳过第一次扫描。

示例代码:

  • 依赖标准库;如需更高性能,json 可替换为 orjson。
import json
import csv
import gzip
import os
from typing import Any, Dict, List, Iterable, Tuple, Optional

def open_maybe_gzip(path: str, mode: str = 'rt', encoding: str = 'utf-8'):
    if path.endswith('.gz'):
        return gzip.open(path, mode, encoding=encoding)
    return open(path, mode, encoding=encoding)

def sanitize_key(key: str) -> str:
    # 避免路径分隔冲突
    return key.replace('.', '_')

def flatten(obj: Any, parent_key: str = '', sep: str = '.') -> Dict[str, Any]:
    """对象扁平化;数组按原样保留为 Python list(后续统一处理)。"""
    items: Dict[str, Any] = {}
    if isinstance(obj, dict):
        for k, v in obj.items():
            k2 = sanitize_key(k)
            new_key = f"{parent_key}{sep}{k2}" if parent_key else k2
            items.update(flatten(v, new_key, sep))
    elif isinstance(obj, list):
        # 暂存 list,后续根据策略处理
        items[parent_key] = obj
    else:
        items[parent_key] = obj
    return items

def to_str(value: Any) -> str:
    if value is None:
        return ''
    if isinstance(value, (int, float, bool)):
        return str(value)
    if isinstance(value, str):
        return value
    # 其他类型(如对象、列表)统一序列化为紧凑 JSON
    return json.dumps(value, ensure_ascii=False, separators=(',', ':'))

def process_arrays(row: Dict[str, Any],
                   join_delim: str = '|',
                   explode_field: Optional[str] = None) -> Iterable[Dict[str, Any]]:
    """
    将数组处理为 join 或对指定字段进行 explode。
    explode_field 为扁平化后的列名(例如 'items' 或 'a.b.items')。
    如果 explode_field 不存在或不是数组,则按 join 输出单行。
    """
    # 准备一个可变副本
    base = dict(row)

    # 先对非 explode 的数组进行 join
    for key, val in list(base.items()):
        if isinstance(val, list):
            if explode_field and key == explode_field:
                # 留待后续处理
                continue
            # 原子类型数组直接 join;对象数组先序列化每个元素再 join
            joined = []
            for el in val:
                if isinstance(el, (str, int, float, bool)) or el is None:
                    joined.append(to_str(el))
                else:
                    joined.append(json.dumps(el, ensure_ascii=False, separators=(',', ':')))
            base[key] = join_delim.join(joined)

    # 处理 explode 字段
    if explode_field and isinstance(row.get(explode_field), list):
        arr = row[explode_field]
        if len(arr) == 0:
            # 空数组 → 生成一行,对该列留空
            out = dict(base)
            out[explode_field] = ''
            yield out
            return
        for el in arr:
            out = dict(base)
            # 元素为对象时可进一步扁平化到子列;否则直接字符串化
            if isinstance(el, dict):
                sub = flatten(el, explode_field)
                out.update({k: to_str(v) for k, v in sub.items()})
                # 同时保留原 explode 列的摘要(可选)
                out[explode_field] = json.dumps(el, ensure_ascii=False, separators=(',', ':'))
            else:
                out[explode_field] = to_str(el)
            yield out
    else:
        yield base

def infer_header(ndjson_path: str,
                 explode_field: Optional[str] = None) -> List[str]:
    cols = set()
    with open_maybe_gzip(ndjson_path, 'rt') as f:
        for lineno, line in enumerate(f, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
            except Exception:
                # 记录错误,跳过
                # 生产环境应写入错误通道
                continue
            flat = flatten(obj)
            # 先进行数组处理(为了捕获因对象数组展开产生的子列)
            for processed in process_arrays(flat, explode_field=explode_field):
                for k in processed.keys():
                    cols.add(k)
    return sorted(cols)

def ndjson_to_csv(in_path: str,
                  out_path: str,
                  predefined_header: Optional[List[str]] = None,
                  explode_field: Optional[str] = None,
                  join_delim: str = '|'):
    header = predefined_header or infer_header(in_path, explode_field=explode_field)

    # 写 CSV
    with open_maybe_gzip(in_path, 'rt') as fin, open(out_path, 'w', encoding='utf-8', newline='') as fout:
        writer = csv.DictWriter(fout, fieldnames=header, extrasaction='ignore', quoting=csv.QUOTE_MINIMAL)
        writer.writeheader()
        for lineno, line in enumerate(fin, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
            except Exception:
                # 记录并跳过
                # 可将错误行写入 out_path + '.error'
                continue
            flat = flatten(obj)
            for processed in process_arrays(flat, join_delim=join_delim, explode_field=explode_field):
                # 缺失列填空
                row = {k: to_str(processed.get(k)) for k in header}
                writer.writerow(row)

# 使用示例:
# ndjson_to_csv('input.ndjson', 'output.csv', explode_field='items', join_delim='|')
# 若有预定义列:ndjson_to_csv('input.ndjson', 'output.csv', predefined_header=['id','a.b','items','tags'])

十一、Spark 转换(适用于大规模数据)

  • 读入 NDJSON:spark.read.json("path")。
  • 扁平化:对 struct 使用 selectExpr 展开为列;对数组:
    • join:使用 concat_ws('|', col('tags'))
    • explode:使用 explode(col('items')) 并按需对元素 struct 再次展开。
  • 写出:df.write.option("header", True).csv("out_path").

十二、配置建议

  • 配置文件声明:
    • 列映射:JSONPath 或 dot path → 列名。
    • 数组策略:每列选择 join/explode,join 的分隔符。
    • 键名清洗规则与冲突策略。
    • 错误行处理策略(跳过/隔离/中止)。
    • 性能阈值(最大展开乘积、最大字段长度)。

该逻辑在数据工程实践中具备可移植性与可扩展性,既能快速在单机脚本中落地,也能在分布式计算框架中保持一致的语义与质量控制。

TSV conversion logic for logs

Scope

  • Input: arbitrary application logs in either text or JSON-lines.
  • Output: tab-separated values (TSV) with a defined header. One record per log event.
  • Constraints: streaming-friendly, robust to missing fields, tabs/newlines in values, mixed formats, and occasional multi-line entries.
  1. Target TSV schema Use a fixed schema to maintain streaming compatibility. Recommended minimal columns:
  • ts_iso: normalized timestamp in ISO 8601 (UTC if tz provided; otherwise keep naive).
  • level: normalized log level (TRACE/DEBUG/INFO/WARN/ERROR/FATAL).
  • service: service or application name (if discoverable).
  • host: hostname or source (if discoverable).
  • logger: logger name/category (if present).
  • thread: thread identifier (if present).
  • request_id: request or correlation id (if present).
  • user_id: user identifier (if present).
  • client_ip: client IP (if present).
  • method: HTTP method (if present).
  • path: URL path (if present).
  • status: HTTP status (if present).
  • latency_ms: request latency in milliseconds (if parseable).
  • message: human-readable message portion.
  • extras_json: JSON-encoded string of any remaining key-value pairs.
  • raw: original log line for auditability.

Notes:

  • If your environment requires a different set of fixed columns, adjust accordingly. Keep an extras_json catch-all for forward compatibility.
  1. Detection and parsing strategy The parser operates per line and chooses one of two branches:

A) JSON log line

  • Condition: line starts with “{” after leading whitespace.
  • Action:
    • Parse JSON into a dict; if parsing fails, treat as text log (branch B).
    • Flatten to a single level: nested keys joined by dots (e.g., http.request.method → http.request.method) or use a predetermined mapping.
    • Map known fields into the target schema (see section 3).
    • Put unmapped fields into extras_json.

B) Text log line with optional key=value pairs

  • Identify timestamp:
    • Try common patterns via regex and datetime parsing:
      • YYYY-MM-DD HH:MM:SS,mmm
      • YYYY-MM-DD HH:MM:SS
      • YYYY-MM-DDTHH:MM:SSZ
      • YYYY-MM-DDTHH:MM:SS.sssZ
      • Syslog-like: MMM dd HH:MM:SS (year unknown; if no year, keep as original string)
  • Identify level token (case-insensitive): TRACE|DEBUG|INFO|WARN|WARNING|ERROR|CRITICAL|FATAL.
  • Extract key=value pairs:
    • Regex: (\w[\w.-])=("([^"\]|\.)"|\S+)
      • Supports quoted values with spaces; unescape quotes and backslashes.
      • Key may include dot or dash.
  • Message extraction:
    • Take the substring after [timestamp][optional logger/thread][level] up to the first key=value pair; trim.
    • If no key=value pairs, the remainder is the message.
  • Map known keys to schema; place remaining pairs in extras_json.
  1. Field mapping and normalization Mapping rules (examples; adapt to your environment):
  • Timestamp (ts_iso):

    • For JSON logs: prefer fields like time, timestamp, ts, @timestamp.
    • For text logs: use the first recognized timestamp token.
    • Normalization: convert to ISO 8601. If timezone provided, convert to UTC (e.g., 2025-09-26T12:05:34.123Z). If no timezone, output naive ISO 8601 without Z.
    • If parsing fails, leave ts_iso empty and retain the original in extras_json.timestamp_raw.
  • Level:

    • Map aliases: WARNING→WARN, CRITICAL→ERROR (or FATAL if preferred), VERBOSE→DEBUG.
    • Output uppercase canonical values.
  • Service, host, logger, thread:

    • From keys: service, app, application; host, hostname; logger, category; thread, tid. If not present, leave empty.
  • HTTP-related:

    • method from method, http_method, http.request.method.
    • path from path, url, uri, http.request.path.
    • status from status, http.status, response.status.
    • client_ip from client_ip, ip, remote_addr, http.client_ip.
  • Request/user identifiers:

    • request_id from request_id, req_id, correlation_id, trace_id, x_request_id.
    • user_id from user_id, uid, user.id, principal.
  • Latency:

    • Accept values like “123ms”, “1.5s”, “800us”, numeric milliseconds.
    • Normalize:
      • ms → value
      • s → value × 1000
      • us/µs → value / 1000
      • ns → value / 1_000_000
    • If unitless integer/float, treat as milliseconds.
    • If unparsable, leave latency_ms empty and persist original under extras_json.latency_raw.
  • Message:

    • If JSON logs: use message, msg, log, event as primary; else let message be empty.
    • For text logs: the extracted message substring.
  • extras_json:

    • JSON-encode remaining unmapped key-value pairs.
    • Ensure tab/newline escaping (see section 4).
  • raw:

    • The original line, unmodified (except escaping tabs/newlines when outputting TSV).
  1. Escaping and TSV output rules
  • Separator: single tab character between fields.
  • Header: emit once at the start in the specified column order.
  • Value sanitation:
    • Replace literal tab with \t.
    • Replace newline with \n and carriage return with \r.
    • Remove or escape other control characters as needed.
    • For extras_json, ensure it is a compact JSON string with the same escaping rules applied to embedded tabs/newlines.
  • Missing values: output as empty string.
  • No quoting in TSV; rely on escaping.
  1. Multi-line log handling
  • If a line lacks a timestamp and level and does not look like JSON, treat it as a continuation of the previous record’s message.
  • Append to previous message with a literal \n inserted.
  • If strict one-line-per-record is required, disable multi-line merge and include the entire raw line as a separate row with empty fields.
  1. Example

Input lines:

  1. 2025-09-26 12:05:34,123 INFO request_id=abc123 user_id=42 method=GET path=/api/v1/items latency=12ms Completed request
  2. {"time":"2025-09-26T12:05:35.001+00:00","level":"warn","service":"checkout","http":{"method":"POST","path":"/orders"},"latency_ms":75,"message":"Payment pending"}

Output TSV (header + rows): ts_iso level service host logger thread request_id user_id client_ip method path status latency_ms message extras_json raw 2025-09-26T12:05:34.123 INFO abc123 42 GET /api/v1/items 12 Completed request {} 2025-09-26 12:05:34,123 INFO request_id=abc123 user_id=42 method=GET path=/api/v1/items latency=12ms Completed request 2025-09-26T12:05:35.001Z WARN checkout {"http.method":"POST","http.path":"/orders"} 75 Payment pending {"http.method":"POST","http.path":"/orders"} {"time":"2025-09-26T12:05:35.001+00:00","level":"warn","service":"checkout","http":{"method":"POST","path":"/orders"},"latency_ms":75,"message":"Payment pending"}

Note: Tabs are shown conceptually above; actual output will use tab characters. Newlines/tabs inside fields must be escaped.

  1. Reference implementation (Python, streaming)

This implementation handles both JSON and text logs, performs normalization, and writes TSV to stdout. It avoids non-standard libraries.

import sys
import json
import re
from datetime import datetime, timezone

# Precompiled regexes
TS_PATTERNS = [
    ("%Y-%m-%d %H:%M:%S,%f", None),
    ("%Y-%m-%d %H:%M:%S", None),
    ("%Y-%m-%dT%H:%M:%S.%f%z", "utc"),
    ("%Y-%m-%dT%H:%M:%S%z", "utc"),
    ("%Y-%m-%dT%H:%M:%S.%fZ", "z"),
    ("%Y-%m-%dT%H:%M:%SZ", "z"),
]
LEVEL_RE = re.compile(r'\b(trace|debug|info|warn|warning|error|critical|fatal)\b', re.IGNORECASE)
KV_RE = re.compile(r'(\w[\w\.\-]*)=("([^"\\]|\\.)*"|\S+)')

HEADER = [
    "ts_iso","level","service","host","logger","thread","request_id","user_id","client_ip",
    "method","path","status","latency_ms","message","extras_json","raw"
]

def normalize_level(lvl):
    if not lvl: return ""
    l = lvl.upper()
    if l == "WARNING": return "WARN"
    if l == "CRITICAL": return "ERROR"  # adjust to FATAL if preferred
    return l

def parse_ts(s):
    s = s.strip()
    for fmt, mode in TS_PATTERNS:
        try:
            if "%z" in fmt:
                dt = datetime.strptime(s, fmt)
                if mode == "utc":
                    return dt.astimezone(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
                elif mode == "z":
                    # Already Zulu; ensure milliseconds where possible
                    if dt.microsecond:
                        return dt.isoformat(timespec="milliseconds").replace("+00:00", "Z")
                    return dt.replace(tzinfo=timezone.utc).isoformat().replace("+00:00","Z")
            else:
                dt = datetime.strptime(s, fmt)
                # naive timestamp
                if "%f" in fmt:
                    return dt.isoformat(timespec="milliseconds")
                else:
                    return dt.isoformat()
        except ValueError:
            continue
    return ""  # failed

def sanitize(v):
    if v is None: return ""
    s = str(v)
    return s.replace("\t","\\t").replace("\n","\\n").replace("\r","\\r")

def unquote(value):
    # Remove surrounding quotes and unescape \" and \\ inside
    if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
        inner = value[1:-1]
        inner = inner.replace('\\"', '"').replace('\\\\', '\\')
        return inner
    return value

def normalize_latency(v):
    if v is None: return ""
    s = str(v).strip().lower()
    try:
        if s.endswith("ms"):
            return str(float(s[:-2]))
        if s.endswith("s"):
            return str(float(s[:-1]) * 1000.0)
        if s.endswith("us") or s.endswith("µs"):
            return str(float(s[:-2]) / 1000.0)
        if s.endswith("ns"):
            return str(float(s[:-2]) / 1_000_000.0)
        # unitless: assume milliseconds
        return str(float(s))
    except ValueError:
        return ""  # leave raw in extras_json if needed

def parse_json_line(line):
    try:
        obj = json.loads(line)
    except json.JSONDecodeError:
        return None
    flat = {}

    def flatten(prefix, val):
        if isinstance(val, dict):
            for k, v in val.items():
                nk = f"{prefix}.{k}" if prefix else k
                flatten(nk, v)
        else:
            flat[prefix] = val

    flatten("", obj)

    row = {k: "" for k in HEADER}
    row["raw"] = line.rstrip("\n")

    # Timestamp candidates
    for key in ("@timestamp","time","timestamp","ts"):
        if key in flat:
            row["ts_iso"] = parse_ts(str(flat[key]))
            if not row["ts_iso"]:
                row["extras_json"] = json.dumps({"timestamp_raw": flat[key]}, separators=(",",":"))
            break

    row["level"]   = normalize_level(flat.get("level") or flat.get("log.level") or flat.get("severity"))
    row["service"] = str(flat.get("service") or flat.get("app") or flat.get("application") or "")

    row["host"]    = str(flat.get("host") or flat.get("hostname") or "")
    row["logger"]  = str(flat.get("logger") or flat.get("log.logger") or "")
    row["thread"]  = str(flat.get("thread") or flat.get("tid") or "")
    row["request_id"] = str(flat.get("request_id") or flat.get("req_id") or flat.get("correlation_id") or flat.get("trace_id") or "")
    row["user_id"] = str(flat.get("user_id") or flat.get("user.id") or flat.get("uid") or "")
    row["client_ip"] = str(flat.get("client_ip") or flat.get("ip") or flat.get("remote_addr") or flat.get("http.client_ip") or "")

    row["method"]  = str(flat.get("method") or flat.get("http.method") or flat.get("http.request.method") or "")
    row["path"]    = str(flat.get("path") or flat.get("url") or flat.get("uri") or flat.get("http.path") or flat.get("http.request.path") or "")
    row["status"]  = str(flat.get("status") or flat.get("http.status") or flat.get("response.status") or "")

    latency = flat.get("latency_ms") or flat.get("latency") or flat.get("http.latency") or None
    row["latency_ms"] = normalize_latency(latency)

    row["message"] = str(flat.get("message") or flat.get("msg") or flat.get("log") or flat.get("event") or "")

    # Build extras from unmapped keys
    mapped = set([
        "@timestamp","time","timestamp","ts","level","log.level","severity","service","app","application",
        "host","hostname","logger","log.logger","thread","tid","request_id","req_id","correlation_id","trace_id",
        "user_id","user.id","uid","client_ip","ip","remote_addr","http.client_ip",
        "method","http.method","http.request.method","path","url","uri","http.path","http.request.path",
        "status","http.status","response.status","latency","latency_ms","http.latency","message","msg","log","event"
    ])
    extras = {k: v for k, v in flat.items() if k not in mapped}
    row["extras_json"] = json.dumps(extras, separators=(",",":")) if extras else "{}"

    return row

def parse_text_line(line):
    row = {k: "" for k in HEADER}
    row["raw"] = line.rstrip("\n")

    s = line.strip()

    # Attempt to find timestamp at start substrings
    ts_match = None
    # Heuristic: check first ~40 chars for timestamp tokens separated by space or 'T'
    head = s[:40]
    # Try direct parsing by slicing common lengths
    for token_len in (23, 19, 20, 24):  # rough candidates
        token = head[:token_len]
        ts_iso = parse_ts(token)
        if ts_iso:
            row["ts_iso"] = ts_iso
            ts_match = token
            break

    # Level
    level_m = LEVEL_RE.search(s)
    if level_m:
        row["level"] = normalize_level(level_m.group(0))

    # KV pairs
    kvs = []
    for m in KV_RE.finditer(s):
        key = m.group(1)
        val = unquote(m.group(2))
        kvs.append((key, val))

    # Message: substring after level up to first kv pair
    msg = s
    if level_m:
        start = level_m.end()
        msg = s[start:].strip()
    if kvs:
        first_kv_start = s.find(kvs[0][0] + "=")
        if first_kv_start != -1:
            msg = s[:first_kv_start].strip()
    row["message"] = msg

    # Map common keys
    def get(klist):
        for k in klist:
            for kk, vv in kvs:
                if kk == k:
                    return vv
        return None

    row["service"]    = get(["service","app","application"]) or ""
    row["host"]       = get(["host","hostname"]) or ""
    row["logger"]     = get(["logger","category"]) or ""
    row["thread"]     = get(["thread","tid"]) or ""
    row["request_id"] = get(["request_id","req_id","correlation_id","trace_id","x_request_id"]) or ""
    row["user_id"]    = get(["user_id","uid","user.id","principal"]) or ""
    row["client_ip"]  = get(["client_ip","ip","remote_addr"]) or ""
    row["method"]     = get(["method","http_method"]) or ""
    row["path"]       = get(["path","url","uri"]) or ""
    row["status"]     = get(["status","http_status"]) or ""
    row["latency_ms"] = normalize_latency(get(["latency","latency_ms","duration","elapsed"]))

    # Build extras
    mapped_keys = set([
        "service","app","application","host","hostname","logger","category","thread","tid",
        "request_id","req_id","correlation_id","trace_id","x_request_id",
        "user_id","uid","user.id","principal","client_ip","ip","remote_addr",
        "method","http_method","path","url","uri","status","http_status","latency","latency_ms","duration","elapsed"
    ])
    extras = {k: v for (k, v) in kvs if k not in mapped_keys}
    row["extras_json"] = json.dumps(extras, separators=(",",":")) if extras else "{}"

    return row

def emit_tsv_row(row):
    fields = [sanitize(row.get(col, "")) for col in HEADER]
    sys.stdout.write("\t".join(fields) + "\n")

def main():
    # Write header
    sys.stdout.write("\t".join(HEADER) + "\n")
    for line in sys.stdin:
        stripped = line.lstrip()
        parsed = None
        if stripped.startswith("{"):
            parsed = parse_json_line(line)
        if parsed is None:
            parsed = parse_text_line(line)
        emit_tsv_row(parsed)

if __name__ == "__main__":
    main()
  1. Operational guidance
  • Run as: python log_to_tsv.py < input.log > output.tsv
  • For large files, consider:
    • Using buffered I/O (default in Python is sufficient for most cases).
    • Precompiling all regexes (done).
    • Avoiding per-line JSON dumps for empty extras (use "{}" constant as shown).
    • Parallelization: sharding input files and merging outputs; ensure identical headers.
  • Monitoring and testing:
    • Create unit tests with representative samples for each log format you encounter.
    • Validate TSV with a reader that checks column counts and escaping.
    • Track parse failure rates (e.g., empty ts_iso or level) and revisit patterns accordingly.

This logic establishes a deterministic, streaming-safe conversion of diverse logs to TSV suitable for ingestion into data warehouses or lakehouse tables. Adjust field mappings to your domain, and keep extras_json for schema evolution.

以下は、JSONをYAMLへ安全かつ忠実に変換するための実務的な変換ロジックです。目的は「JSONの型・値の意味を保ち、YAMLの解釈による型誤推測を回避する」ことです。YAML 1.2準拠を前提に記述します。

前提

  • JSONはYAML 1.2の厳密な部分集合であり、原則として型対応は1対1で可能。
  • ただしYAMLパーサ(特にYAML 1.1系)による曖昧なスカラー解釈(例: yes/no, on/off のブーリアン解釈)を避けるため、文字列の引用規則を設ける。
  • JSONのオブジェクトのキーは文字列のみ。YAMLのマッピングキーも文字列で出力する。
  • 順序は入力順を維持(一般的なJSONパーサでは挿入順を保持できる)。

型マッピング規則

  • Object(JSON)→ Mapping(YAML)
    • { "k": v } → k: v
    • インデントは2スペース推奨。
    • ドキュメント開始記号(---)は任意。複数ドキュメント扱いが不要なら省略可。
  • Array(JSON)→ Sequence(YAML)
    • [a, b] →
      • a
      • b
    • 順序はそのまま保持。
  • String(JSON)→ Scalar(YAML)
    • 文字列は「プレーン」「ダブルクォート」「リテラルブロック(|)」のいずれかで表現。
    • 判定ロジックは後述。
  • Number(JSON)→ Number(YAML)
    • 整数・浮動小数をそのまま文字列表現へ。
    • JSONにNaN/Infinityは存在しない前提(出力しない)。
  • Boolean(JSON)→ Boolean(YAML)
    • true → true、false → false(小文字)
  • Null(JSON)→ Null(YAML)
    • null → null(~は使わない方針)
  • 日付/時刻
    • JSONでは文字列。YAML側で型推測されないよう文字列のまま出力(必要に応じて引用)。

文字列の引用判定ロジック(YAML 1.2を前提に安全運用)

  • 以下のいずれかに該当する場合はダブルクォートで囲む("...")。
    • 先頭または末尾に空白がある
    • 改行を含まないが、次の文字を含む/先頭が危険文字
      • 「#」(コメント開始になりうる)
      • 「: 」(コロンに続くスペースはマッピング記号と衝突)
      • 先頭が「-」「?」「@」「`」「!」「*」「&」「%」「|」「>」
    • JSONの文字列値が次の曖昧トークンに完全一致(大小問わず)
      • "true", "false", "null", "~", "yes", "no", "on", "off"
      • 注: YAML 1.2では yes/no/on/off はブーリアンではないが、1.1系を避けるため安全に引用する
    • ISO8601風の日時文字列(例: 2024-01-01T12:00:00Z 等)はパーサ差異対策として引用推奨
    • 制御文字や非表示文字を含む(JSONのエスケープをYAMLのダブルクォート内で保持)
  • 文字列に改行が含まれる場合はリテラルブロック(|)で出力し、改行を忠実に保持
    • 末尾改行の有無も正確に維持
  • 上記に該当しない場合のみプレーンスカラー(引用なし)で出力
    • プレーン許容文字の目安: 英数字、ハイフン(-)、アンダースコア(_)、ドット(.)、スラッシュ(/)
    • 空文字はダブルクォートで "" とする

出力フォーマット規則

  • インデントは2スペース
  • シーケンスは「- 」で要素を列挙
  • マッピングは「key: value」形式
  • 必要に応じてドキュメント開始記号「---」を付与
  • アンカー(&)やエイリアス(*)、タグ(!!)は生成しない(JSON互換の維持)

変換アルゴリズム(擬似コード)

  • function to_yaml(value, indent=0):
    • switch type(value):
      • dict:
        • for (k, v) in insertion_order(value):
          • k_out = quote_key_if_needed(k) // キーも文字列。通常はプレーンだが危険時はダブルクォート
          • print(indent_spaces(indent) + k_out + ": " + inline_or_child(v, indent))
      • list:
        • for item in value:
          • print(indent_spaces(indent) + "- " + inline_or_child(item, indent))
      • string:
        • if contains_newline(value):
          • print(block_literal("|", value, indent)) // 改行をそのまま保持
        • else if needs_double_quote(value):
          • print(double_quoted(value)) // JSONエスケープはYAMLダブルクォートへ移植
        • else:
          • print(plain(value))
      • bool:
        • print("true" if value else "false")
      • null:
        • print("null")
      • number:
        • print(number_to_string(value)) // 整数/浮動小数を文字列化(指数表記等はそのまま)
  • function needs_double_quote(s):
    • if s == "" return true
    • if leading_or_trailing_space(s) return true
    • if contains_any(s, ["#"]) return true
    • if contains_substring(s, ": ") return true
    • if starts_with_any(s, ["-", "?", "@", "`", "!", "*", "&", "%", "|", ">"]) return true
    • if is_ambiguous_token_ci(s, ["true","false","null","~","yes","no","on","off"]) return true
    • if looks_like_iso8601_timestamp(s) return true
    • if contains_control_char(s) return true
    • return false

Python実装例(ruamel.yamlを使用、YAML 1.2想定)

  • ポイント
    • ruamel.yaml.scalarstring の LiteralScalarString(|)、DoubleQuotedScalarString(")を使ってスタイルを明示。
    • 再帰的にJSON値を走査してスタイル化したオブジェクトへ変換後、YAMLへダンプ。
  • コード
    • from json import loads
    • from ruamel.yaml import YAML
    • from ruamel.yaml.scalarstring import LiteralScalarString, DoubleQuotedScalarString
    • def convert_json_to_yaml_str(json_str):
      • data = loads(json_str)
      • def style_string(s):
        • if "\n" in s:
          • return LiteralScalarString(s) # 改行保持
        • if needs_double_quote(s):
          • return DoubleQuotedScalarString(s)
        • return s # プレーン
      • def transform(node):
        • if isinstance(node, dict):
          • return { (style_string(k) if isinstance(k, str) else k): transform(v) for k, v in node.items() }
        • if isinstance(node, list):
          • return [ transform(x) for x in node ]
        • if isinstance(node, str):
          • return style_string(node)
        • return node # bool, None, number はそのまま
      • yaml = YAML()
      • yaml.version = (1, 2)
      • yaml.indent(mapping=2, sequence=2, offset=0)
      • out = transform(data)
      • import io
      • buf = io.StringIO()
      • yaml.dump(out, buf)
      • return buf.getvalue()
    • 上記の needs_double_quote(s) は前述ロジックをそのままPythonで実装。

検証・テスト観点

  • 型保持テスト
    • JSONのboolean/null/numberがYAMLで同型として再解釈されることを確認。
  • 文字列の安全性テスト
    • "yes", "on", "#comment", ": value" 等が文字列として保持される(引用される)ことを確認。
    • 改行文字列がリテラルブロックで出力され、改行・末尾改行が一致することを確認。
  • 構造・順序
    • 配列順序・オブジェクト挿入順を保持。
  • エッジケース
    • 空文字列、先頭/末尾空白、長大文字列(折り畳み不要方針なら幅制限を外す)、Unicode絵文字・サロゲートペア。

このロジックにより、JSONの意味を損なわず、YAMLのパーサ差異による型誤解釈を最小化した安全な変換が可能です。

示例详情

解决的问题

帮助团队快速将“现有数据格式”无缝转换为“目标数据格式”,在几分钟内生成可执行的转换方案与规则清单,配套质量校验与落地步骤,支持多语言输出,显著降低人为错误与返工,提升数据接入、系统迁移与报表统一的交付速度。

适用用户

数据工程师

统一不同来源的数据格式,制定字段对应关系与转换规则,产出实施步骤与校验清单,并生成多语言交付文档支持上线

商业分析师

将原始报表与日志整理为可分析结构,明确口径与清洗规则,获得可复用的转换方案,提升洞察的准确度与时效

产品经理

把分散业务数据整合为可追踪指标,形成标准化转换流程与注意事项,与技术团队快速对齐需求并加速交付

特征总结

一键生成跨格式数据转换思路与步骤,帮助你从现状到目标格式快速落地
自动分析源数据与目标需求,给出清晰字段对应与规则,避免遗漏与偏差
支持多语言输出说明文档,便于团队内外沟通、审阅与交付
提供可复制的转换清单与注意事项,让实施更顺畅、复用更简单
针对不同业务场景给出示例与变体,缩短试错时间,提升上线速度
内置质量校验与异常处理建议,降低数据错误风险,保障结果可靠
可按模板与参数定制转换逻辑,全面适配你独特流程与约束
结合摄取、转换、存储全流程视角,让管道设计更完整、更可维护
输出结构化技术写作风格说明,便于审阅、交接与合规备案
快速比较多种转换方案利弊与适用场景,帮助你做出更稳妥选择

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥15.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 245 tokens
- 3 个可调节参数
{ 当前数据格式 } { 目标数据格式 } { 输出语言 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59