数据转换逻辑设计

0 浏览
0 试用
0 购买
Sep 26, 2025更新

提供数据从一种格式转换为另一种格式的逻辑解决方案。

示例1

以下为将 NDJSON(每行一个 JSON 对象)转换为 CSV 的标准化转换逻辑及参考实现。重点覆盖模式推断、嵌套展开、数组处理、缺失值、类型与转义、错误与性能。

一、输入与输出定义
- 输入:UTF-8 编码 NDJSON 文件;每行一个合法 JSON 对象。
- 输出:UTF-8 编码 CSV 文件;第一行为表头;后续为数据行。

二、模式与列顺序(Schema)
- 优先使用预定义模式(推荐):提前声明要输出的列及其路径映射,确保一次扫描即可输出。
- 无预定义模式时:
  - 两次扫描策略:第一次扫描收集所有行扁平化后的键集合,生成表头;第二次扫描写数据。
  - 列顺序建议:稳定排序(例如字典序),或按照业务优先级固定顺序。
- 键名规范化:若原始键含有分隔符(如“.”),建议统一替换为“_”,避免与路径分隔符冲突。

三、嵌套结构展开规则(对象/字典)
- 扁平化策略(dot path,默认):
  - 嵌套对象按 “父.子” 路径生成列名。例如 {"a":{"b":1}} → 列 "a.b" = 1。
- 键冲突处理:
  - 若不同路径产生相同列名(因键名重复或替换规则),需加前缀或使用枚举策略(如 "a.b" 与 "a_b" 冲突时,优先原始 dot path,并将替换后的冲突键重命名为 "a_b__dup")。

四、数组处理策略
数组是 CSV 转换的关键,需明确策略(可按字段配置):
- 连接(join,单行):将数组按分隔符连接为字符串。
  - 原子类型数组(string/number/bool)可直接连接,如 tags=["a","b"] → "a|b"。
  - 对象数组:可选择将每个元素序列化为紧凑 JSON 再连接,如 items=[{"id":1},{"id":2}] → {"id":1}|{"id":2}。
- 展开(explode,多行):数组产生多行;如一个字段配置为 explode,则每个元素生成一行。
  - 多数组字段同时 explode 时需生成笛卡尔积(行数可能急剧膨胀,需审慎)。
- 选择策略建议:
  - 明确指定需要分析明细的数组字段 explode;其它字段 join。
  - 若存在多个数组字段,仅允许一个 explode,或限制在小规模的笛卡尔积范围(如设定最大乘积阈值)。

五、数据类型与格式
- 字符串保留原样,必要时进行转义。
- 数字以十进制字符串输出;避免科学计数法(如需要,可格式化)。
- 布尔转为 true/false。
- 时间戳:若有标准格式,统一输出为 ISO 8601(建议在预处理/映射层完成);否则原样。
- 二进制/超长字段:建议 base64 或摘要化(取决于业务),避免污染 CSV。

六、空值与缺失
- JSON null → CSV 空单元格。
- 缺失字段(行中不存在某列) → CSV 空单元格。
- 明确区分空字符串 "" 与缺失/NULL。

七、CSV 写出与转义
- 分隔符:逗号,或根据业务使用制表符(TSV)。
- 引号策略:最小引号(仅当字段包含分隔符、引号或换行时加双引号),内部双引号转义为两个双引号。
- 换行:禁止字段跨行;若数据包含换行,需被引号包裹。
- 编码:UTF-8,无 BOM;必要时统一规范化为 NFC。
- 表头固定且写一次。

八、错误与数据质量
- 每行独立解析;遇到无效 JSON 行:
  - 记录至错误文件(包含行号与内容),跳过或中止(可配置)。
- 类型不一致(同一列出现多种类型):
  - 默认转字符串;可记录类型冲突警告。
- 超长字段或笛卡尔膨胀:
  - 设置行长度和最大展开行数的上限;超过则丢入隔离区。

九、性能与可扩展性
- 流式处理:按行读取与写出,降低内存占用。
- 压缩:支持 .gz 输入;输出按需压缩。
- 大文件:两次扫描时避免将整文件载入内存;使用临时文件缓存或先收集 schema 再二次读取。
- 并行:可按文件分片或行块并行处理,合并时确保表头一致。
- 大数据场景:优先使用分布式引擎(Spark/Flink),通过 DataFrame 展开与写出 CSV。

十、参考实现(Python,流式,支持 join 和可选单字段 explode)
说明:
- 默认扁平化对象并将数组 join 为字符串。
- 可配置一个字段进行 explode(多字段 explode 需扩展笛卡尔逻辑)。
- 两次扫描推断表头;若有预定义 schema,可跳过第一次扫描。

示例代码:
- 依赖标准库;如需更高性能,json 可替换为 orjson。

```python
import json
import csv
import gzip
import os
from typing import Any, Dict, List, Iterable, Tuple, Optional

def open_maybe_gzip(path: str, mode: str = 'rt', encoding: str = 'utf-8'):
    if path.endswith('.gz'):
        return gzip.open(path, mode, encoding=encoding)
    return open(path, mode, encoding=encoding)

def sanitize_key(key: str) -> str:
    # 避免路径分隔冲突
    return key.replace('.', '_')

def flatten(obj: Any, parent_key: str = '', sep: str = '.') -> Dict[str, Any]:
    """对象扁平化;数组按原样保留为 Python list(后续统一处理)。"""
    items: Dict[str, Any] = {}
    if isinstance(obj, dict):
        for k, v in obj.items():
            k2 = sanitize_key(k)
            new_key = f"{parent_key}{sep}{k2}" if parent_key else k2
            items.update(flatten(v, new_key, sep))
    elif isinstance(obj, list):
        # 暂存 list,后续根据策略处理
        items[parent_key] = obj
    else:
        items[parent_key] = obj
    return items

def to_str(value: Any) -> str:
    if value is None:
        return ''
    if isinstance(value, (int, float, bool)):
        return str(value)
    if isinstance(value, str):
        return value
    # 其他类型(如对象、列表)统一序列化为紧凑 JSON
    return json.dumps(value, ensure_ascii=False, separators=(',', ':'))

def process_arrays(row: Dict[str, Any],
                   join_delim: str = '|',
                   explode_field: Optional[str] = None) -> Iterable[Dict[str, Any]]:
    """
    将数组处理为 join 或对指定字段进行 explode。
    explode_field 为扁平化后的列名(例如 'items' 或 'a.b.items')。
    如果 explode_field 不存在或不是数组,则按 join 输出单行。
    """
    # 准备一个可变副本
    base = dict(row)

    # 先对非 explode 的数组进行 join
    for key, val in list(base.items()):
        if isinstance(val, list):
            if explode_field and key == explode_field:
                # 留待后续处理
                continue
            # 原子类型数组直接 join;对象数组先序列化每个元素再 join
            joined = []
            for el in val:
                if isinstance(el, (str, int, float, bool)) or el is None:
                    joined.append(to_str(el))
                else:
                    joined.append(json.dumps(el, ensure_ascii=False, separators=(',', ':')))
            base[key] = join_delim.join(joined)

    # 处理 explode 字段
    if explode_field and isinstance(row.get(explode_field), list):
        arr = row[explode_field]
        if len(arr) == 0:
            # 空数组 → 生成一行,对该列留空
            out = dict(base)
            out[explode_field] = ''
            yield out
            return
        for el in arr:
            out = dict(base)
            # 元素为对象时可进一步扁平化到子列;否则直接字符串化
            if isinstance(el, dict):
                sub = flatten(el, explode_field)
                out.update({k: to_str(v) for k, v in sub.items()})
                # 同时保留原 explode 列的摘要(可选)
                out[explode_field] = json.dumps(el, ensure_ascii=False, separators=(',', ':'))
            else:
                out[explode_field] = to_str(el)
            yield out
    else:
        yield base

def infer_header(ndjson_path: str,
                 explode_field: Optional[str] = None) -> List[str]:
    cols = set()
    with open_maybe_gzip(ndjson_path, 'rt') as f:
        for lineno, line in enumerate(f, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
            except Exception:
                # 记录错误,跳过
                # 生产环境应写入错误通道
                continue
            flat = flatten(obj)
            # 先进行数组处理(为了捕获因对象数组展开产生的子列)
            for processed in process_arrays(flat, explode_field=explode_field):
                for k in processed.keys():
                    cols.add(k)
    return sorted(cols)

def ndjson_to_csv(in_path: str,
                  out_path: str,
                  predefined_header: Optional[List[str]] = None,
                  explode_field: Optional[str] = None,
                  join_delim: str = '|'):
    header = predefined_header or infer_header(in_path, explode_field=explode_field)

    # 写 CSV
    with open_maybe_gzip(in_path, 'rt') as fin, open(out_path, 'w', encoding='utf-8', newline='') as fout:
        writer = csv.DictWriter(fout, fieldnames=header, extrasaction='ignore', quoting=csv.QUOTE_MINIMAL)
        writer.writeheader()
        for lineno, line in enumerate(fin, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
            except Exception:
                # 记录并跳过
                # 可将错误行写入 out_path + '.error'
                continue
            flat = flatten(obj)
            for processed in process_arrays(flat, join_delim=join_delim, explode_field=explode_field):
                # 缺失列填空
                row = {k: to_str(processed.get(k)) for k in header}
                writer.writerow(row)

# 使用示例:
# ndjson_to_csv('input.ndjson', 'output.csv', explode_field='items', join_delim='|')
# 若有预定义列:ndjson_to_csv('input.ndjson', 'output.csv', predefined_header=['id','a.b','items','tags'])
```

十一、Spark 转换(适用于大规模数据)
- 读入 NDJSON:spark.read.json("path")。
- 扁平化:对 struct 使用 selectExpr 展开为列;对数组:
  - join:使用 concat_ws('|', col('tags'))
  - explode:使用 explode(col('items')) 并按需对元素 struct 再次展开。
- 写出:df.write.option("header", True).csv("out_path").

十二、配置建议
- 配置文件声明:
  - 列映射:JSONPath 或 dot path → 列名。
  - 数组策略:每列选择 join/explode,join 的分隔符。
  - 键名清洗规则与冲突策略。
  - 错误行处理策略(跳过/隔离/中止)。
  - 性能阈值(最大展开乘积、最大字段长度)。

该逻辑在数据工程实践中具备可移植性与可扩展性,既能快速在单机脚本中落地,也能在分布式计算框架中保持一致的语义与质量控制。

示例2

TSV conversion logic for logs

Scope
- Input: arbitrary application logs in either text or JSON-lines.
- Output: tab-separated values (TSV) with a defined header. One record per log event.
- Constraints: streaming-friendly, robust to missing fields, tabs/newlines in values, mixed formats, and occasional multi-line entries.

1) Target TSV schema
Use a fixed schema to maintain streaming compatibility. Recommended minimal columns:
- ts_iso: normalized timestamp in ISO 8601 (UTC if tz provided; otherwise keep naive).
- level: normalized log level (TRACE/DEBUG/INFO/WARN/ERROR/FATAL).
- service: service or application name (if discoverable).
- host: hostname or source (if discoverable).
- logger: logger name/category (if present).
- thread: thread identifier (if present).
- request_id: request or correlation id (if present).
- user_id: user identifier (if present).
- client_ip: client IP (if present).
- method: HTTP method (if present).
- path: URL path (if present).
- status: HTTP status (if present).
- latency_ms: request latency in milliseconds (if parseable).
- message: human-readable message portion.
- extras_json: JSON-encoded string of any remaining key-value pairs.
- raw: original log line for auditability.

Notes:
- If your environment requires a different set of fixed columns, adjust accordingly. Keep an extras_json catch-all for forward compatibility.

2) Detection and parsing strategy
The parser operates per line and chooses one of two branches:

A) JSON log line
- Condition: line starts with “{” after leading whitespace.
- Action:
  - Parse JSON into a dict; if parsing fails, treat as text log (branch B).
  - Flatten to a single level: nested keys joined by dots (e.g., http.request.method → http.request.method) or use a predetermined mapping.
  - Map known fields into the target schema (see section 3).
  - Put unmapped fields into extras_json.

B) Text log line with optional key=value pairs
- Identify timestamp:
  - Try common patterns via regex and datetime parsing:
    - YYYY-MM-DD HH:MM:SS,mmm
    - YYYY-MM-DD HH:MM:SS
    - YYYY-MM-DDTHH:MM:SSZ
    - YYYY-MM-DDTHH:MM:SS.sssZ
    - Syslog-like: MMM dd HH:MM:SS (year unknown; if no year, keep as original string)
- Identify level token (case-insensitive): TRACE|DEBUG|INFO|WARN|WARNING|ERROR|CRITICAL|FATAL.
- Extract key=value pairs:
  - Regex: (\w[\w\.\-]*)=("([^"\\]|\\.)*"|\S+)
    - Supports quoted values with spaces; unescape quotes and backslashes.
    - Key may include dot or dash.
- Message extraction:
  - Take the substring after [timestamp][optional logger/thread][level] up to the first key=value pair; trim.
  - If no key=value pairs, the remainder is the message.
- Map known keys to schema; place remaining pairs in extras_json.

3) Field mapping and normalization
Mapping rules (examples; adapt to your environment):

- Timestamp (ts_iso):
  - For JSON logs: prefer fields like time, timestamp, ts, @timestamp.
  - For text logs: use the first recognized timestamp token.
  - Normalization: convert to ISO 8601. If timezone provided, convert to UTC (e.g., 2025-09-26T12:05:34.123Z). If no timezone, output naive ISO 8601 without Z.
  - If parsing fails, leave ts_iso empty and retain the original in extras_json.timestamp_raw.

- Level:
  - Map aliases: WARNING→WARN, CRITICAL→ERROR (or FATAL if preferred), VERBOSE→DEBUG.
  - Output uppercase canonical values.

- Service, host, logger, thread:
  - From keys: service, app, application; host, hostname; logger, category; thread, tid. If not present, leave empty.

- HTTP-related:
  - method from method, http_method, http.request.method.
  - path from path, url, uri, http.request.path.
  - status from status, http.status, response.status.
  - client_ip from client_ip, ip, remote_addr, http.client_ip.

- Request/user identifiers:
  - request_id from request_id, req_id, correlation_id, trace_id, x_request_id.
  - user_id from user_id, uid, user.id, principal.

- Latency:
  - Accept values like “123ms”, “1.5s”, “800us”, numeric milliseconds.
  - Normalize:
    - ms → value
    - s → value × 1000
    - us/µs → value / 1000
    - ns → value / 1_000_000
  - If unitless integer/float, treat as milliseconds.
  - If unparsable, leave latency_ms empty and persist original under extras_json.latency_raw.

- Message:
  - If JSON logs: use message, msg, log, event as primary; else let message be empty.
  - For text logs: the extracted message substring.

- extras_json:
  - JSON-encode remaining unmapped key-value pairs.
  - Ensure tab/newline escaping (see section 4).

- raw:
  - The original line, unmodified (except escaping tabs/newlines when outputting TSV).

4) Escaping and TSV output rules
- Separator: single tab character between fields.
- Header: emit once at the start in the specified column order.
- Value sanitation:
  - Replace literal tab with \t.
  - Replace newline with \n and carriage return with \r.
  - Remove or escape other control characters as needed.
  - For extras_json, ensure it is a compact JSON string with the same escaping rules applied to embedded tabs/newlines.
- Missing values: output as empty string.
- No quoting in TSV; rely on escaping.

5) Multi-line log handling
- If a line lacks a timestamp and level and does not look like JSON, treat it as a continuation of the previous record’s message.
- Append to previous message with a literal \n inserted.
- If strict one-line-per-record is required, disable multi-line merge and include the entire raw line as a separate row with empty fields.

6) Example

Input lines:
1) 2025-09-26 12:05:34,123 INFO request_id=abc123 user_id=42 method=GET path=/api/v1/items latency=12ms Completed request
2) {"time":"2025-09-26T12:05:35.001+00:00","level":"warn","service":"checkout","http":{"method":"POST","path":"/orders"},"latency_ms":75,"message":"Payment pending"}

Output TSV (header + rows):
ts_iso    level    service    host    logger    thread    request_id    user_id    client_ip    method    path    status    latency_ms    message    extras_json    raw
2025-09-26T12:05:34.123    INFO                                    abc123    42            GET    /api/v1/items            12    Completed request    {}    2025-09-26 12:05:34,123 INFO request_id=abc123 user_id=42 method=GET path=/api/v1/items latency=12ms Completed request
2025-09-26T12:05:35.001Z    WARN    checkout                        {"http.method":"POST","http.path":"/orders"}    75    Payment pending    {"http.method":"POST","http.path":"/orders"}    {"time":"2025-09-26T12:05:35.001+00:00","level":"warn","service":"checkout","http":{"method":"POST","path":"/orders"},"latency_ms":75,"message":"Payment pending"}

Note: Tabs are shown conceptually above; actual output will use tab characters. Newlines/tabs inside fields must be escaped.

7) Reference implementation (Python, streaming)

This implementation handles both JSON and text logs, performs normalization, and writes TSV to stdout. It avoids non-standard libraries.

```python
import sys
import json
import re
from datetime import datetime, timezone

# Precompiled regexes
TS_PATTERNS = [
    ("%Y-%m-%d %H:%M:%S,%f", None),
    ("%Y-%m-%d %H:%M:%S", None),
    ("%Y-%m-%dT%H:%M:%S.%f%z", "utc"),
    ("%Y-%m-%dT%H:%M:%S%z", "utc"),
    ("%Y-%m-%dT%H:%M:%S.%fZ", "z"),
    ("%Y-%m-%dT%H:%M:%SZ", "z"),
]
LEVEL_RE = re.compile(r'\b(trace|debug|info|warn|warning|error|critical|fatal)\b', re.IGNORECASE)
KV_RE = re.compile(r'(\w[\w\.\-]*)=("([^"\\]|\\.)*"|\S+)')

HEADER = [
    "ts_iso","level","service","host","logger","thread","request_id","user_id","client_ip",
    "method","path","status","latency_ms","message","extras_json","raw"
]

def normalize_level(lvl):
    if not lvl: return ""
    l = lvl.upper()
    if l == "WARNING": return "WARN"
    if l == "CRITICAL": return "ERROR"  # adjust to FATAL if preferred
    return l

def parse_ts(s):
    s = s.strip()
    for fmt, mode in TS_PATTERNS:
        try:
            if "%z" in fmt:
                dt = datetime.strptime(s, fmt)
                if mode == "utc":
                    return dt.astimezone(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
                elif mode == "z":
                    # Already Zulu; ensure milliseconds where possible
                    if dt.microsecond:
                        return dt.isoformat(timespec="milliseconds").replace("+00:00", "Z")
                    return dt.replace(tzinfo=timezone.utc).isoformat().replace("+00:00","Z")
            else:
                dt = datetime.strptime(s, fmt)
                # naive timestamp
                if "%f" in fmt:
                    return dt.isoformat(timespec="milliseconds")
                else:
                    return dt.isoformat()
        except ValueError:
            continue
    return ""  # failed

def sanitize(v):
    if v is None: return ""
    s = str(v)
    return s.replace("\t","\\t").replace("\n","\\n").replace("\r","\\r")

def unquote(value):
    # Remove surrounding quotes and unescape \" and \\ inside
    if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
        inner = value[1:-1]
        inner = inner.replace('\\"', '"').replace('\\\\', '\\')
        return inner
    return value

def normalize_latency(v):
    if v is None: return ""
    s = str(v).strip().lower()
    try:
        if s.endswith("ms"):
            return str(float(s[:-2]))
        if s.endswith("s"):
            return str(float(s[:-1]) * 1000.0)
        if s.endswith("us") or s.endswith("µs"):
            return str(float(s[:-2]) / 1000.0)
        if s.endswith("ns"):
            return str(float(s[:-2]) / 1_000_000.0)
        # unitless: assume milliseconds
        return str(float(s))
    except ValueError:
        return ""  # leave raw in extras_json if needed

def parse_json_line(line):
    try:
        obj = json.loads(line)
    except json.JSONDecodeError:
        return None
    flat = {}

    def flatten(prefix, val):
        if isinstance(val, dict):
            for k, v in val.items():
                nk = f"{prefix}.{k}" if prefix else k
                flatten(nk, v)
        else:
            flat[prefix] = val

    flatten("", obj)

    row = {k: "" for k in HEADER}
    row["raw"] = line.rstrip("\n")

    # Timestamp candidates
    for key in ("@timestamp","time","timestamp","ts"):
        if key in flat:
            row["ts_iso"] = parse_ts(str(flat[key]))
            if not row["ts_iso"]:
                row["extras_json"] = json.dumps({"timestamp_raw": flat[key]}, separators=(",",":"))
            break

    row["level"]   = normalize_level(flat.get("level") or flat.get("log.level") or flat.get("severity"))
    row["service"] = str(flat.get("service") or flat.get("app") or flat.get("application") or "")

    row["host"]    = str(flat.get("host") or flat.get("hostname") or "")
    row["logger"]  = str(flat.get("logger") or flat.get("log.logger") or "")
    row["thread"]  = str(flat.get("thread") or flat.get("tid") or "")
    row["request_id"] = str(flat.get("request_id") or flat.get("req_id") or flat.get("correlation_id") or flat.get("trace_id") or "")
    row["user_id"] = str(flat.get("user_id") or flat.get("user.id") or flat.get("uid") or "")
    row["client_ip"] = str(flat.get("client_ip") or flat.get("ip") or flat.get("remote_addr") or flat.get("http.client_ip") or "")

    row["method"]  = str(flat.get("method") or flat.get("http.method") or flat.get("http.request.method") or "")
    row["path"]    = str(flat.get("path") or flat.get("url") or flat.get("uri") or flat.get("http.path") or flat.get("http.request.path") or "")
    row["status"]  = str(flat.get("status") or flat.get("http.status") or flat.get("response.status") or "")

    latency = flat.get("latency_ms") or flat.get("latency") or flat.get("http.latency") or None
    row["latency_ms"] = normalize_latency(latency)

    row["message"] = str(flat.get("message") or flat.get("msg") or flat.get("log") or flat.get("event") or "")

    # Build extras from unmapped keys
    mapped = set([
        "@timestamp","time","timestamp","ts","level","log.level","severity","service","app","application",
        "host","hostname","logger","log.logger","thread","tid","request_id","req_id","correlation_id","trace_id",
        "user_id","user.id","uid","client_ip","ip","remote_addr","http.client_ip",
        "method","http.method","http.request.method","path","url","uri","http.path","http.request.path",
        "status","http.status","response.status","latency","latency_ms","http.latency","message","msg","log","event"
    ])
    extras = {k: v for k, v in flat.items() if k not in mapped}
    row["extras_json"] = json.dumps(extras, separators=(",",":")) if extras else "{}"

    return row

def parse_text_line(line):
    row = {k: "" for k in HEADER}
    row["raw"] = line.rstrip("\n")

    s = line.strip()

    # Attempt to find timestamp at start substrings
    ts_match = None
    # Heuristic: check first ~40 chars for timestamp tokens separated by space or 'T'
    head = s[:40]
    # Try direct parsing by slicing common lengths
    for token_len in (23, 19, 20, 24):  # rough candidates
        token = head[:token_len]
        ts_iso = parse_ts(token)
        if ts_iso:
            row["ts_iso"] = ts_iso
            ts_match = token
            break

    # Level
    level_m = LEVEL_RE.search(s)
    if level_m:
        row["level"] = normalize_level(level_m.group(0))

    # KV pairs
    kvs = []
    for m in KV_RE.finditer(s):
        key = m.group(1)
        val = unquote(m.group(2))
        kvs.append((key, val))

    # Message: substring after level up to first kv pair
    msg = s
    if level_m:
        start = level_m.end()
        msg = s[start:].strip()
    if kvs:
        first_kv_start = s.find(kvs[0][0] + "=")
        if first_kv_start != -1:
            msg = s[:first_kv_start].strip()
    row["message"] = msg

    # Map common keys
    def get(klist):
        for k in klist:
            for kk, vv in kvs:
                if kk == k:
                    return vv
        return None

    row["service"]    = get(["service","app","application"]) or ""
    row["host"]       = get(["host","hostname"]) or ""
    row["logger"]     = get(["logger","category"]) or ""
    row["thread"]     = get(["thread","tid"]) or ""
    row["request_id"] = get(["request_id","req_id","correlation_id","trace_id","x_request_id"]) or ""
    row["user_id"]    = get(["user_id","uid","user.id","principal"]) or ""
    row["client_ip"]  = get(["client_ip","ip","remote_addr"]) or ""
    row["method"]     = get(["method","http_method"]) or ""
    row["path"]       = get(["path","url","uri"]) or ""
    row["status"]     = get(["status","http_status"]) or ""
    row["latency_ms"] = normalize_latency(get(["latency","latency_ms","duration","elapsed"]))

    # Build extras
    mapped_keys = set([
        "service","app","application","host","hostname","logger","category","thread","tid",
        "request_id","req_id","correlation_id","trace_id","x_request_id",
        "user_id","uid","user.id","principal","client_ip","ip","remote_addr",
        "method","http_method","path","url","uri","status","http_status","latency","latency_ms","duration","elapsed"
    ])
    extras = {k: v for (k, v) in kvs if k not in mapped_keys}
    row["extras_json"] = json.dumps(extras, separators=(",",":")) if extras else "{}"

    return row

def emit_tsv_row(row):
    fields = [sanitize(row.get(col, "")) for col in HEADER]
    sys.stdout.write("\t".join(fields) + "\n")

def main():
    # Write header
    sys.stdout.write("\t".join(HEADER) + "\n")
    for line in sys.stdin:
        stripped = line.lstrip()
        parsed = None
        if stripped.startswith("{"):
            parsed = parse_json_line(line)
        if parsed is None:
            parsed = parse_text_line(line)
        emit_tsv_row(parsed)

if __name__ == "__main__":
    main()
```

8) Operational guidance
- Run as: python log_to_tsv.py < input.log > output.tsv
- For large files, consider:
  - Using buffered I/O (default in Python is sufficient for most cases).
  - Precompiling all regexes (done).
  - Avoiding per-line JSON dumps for empty extras (use "{}" constant as shown).
  - Parallelization: sharding input files and merging outputs; ensure identical headers.
- Monitoring and testing:
  - Create unit tests with representative samples for each log format you encounter.
  - Validate TSV with a reader that checks column counts and escaping.
  - Track parse failure rates (e.g., empty ts_iso or level) and revisit patterns accordingly.

This logic establishes a deterministic, streaming-safe conversion of diverse logs to TSV suitable for ingestion into data warehouses or lakehouse tables. Adjust field mappings to your domain, and keep extras_json for schema evolution.

示例3

以下は、JSONをYAMLへ安全かつ忠実に変換するための実務的な変換ロジックです。目的は「JSONの型・値の意味を保ち、YAMLの解釈による型誤推測を回避する」ことです。YAML 1.2準拠を前提に記述します。

前提
- JSONはYAML 1.2の厳密な部分集合であり、原則として型対応は1対1で可能。
- ただしYAMLパーサ(特にYAML 1.1系)による曖昧なスカラー解釈(例: yes/no, on/off のブーリアン解釈)を避けるため、文字列の引用規則を設ける。
- JSONのオブジェクトのキーは文字列のみ。YAMLのマッピングキーも文字列で出力する。
- 順序は入力順を維持(一般的なJSONパーサでは挿入順を保持できる)。

型マッピング規則
- Object(JSON)→ Mapping(YAML)
  - { "k": v } → k: v
  - インデントは2スペース推奨。
  - ドキュメント開始記号(---)は任意。複数ドキュメント扱いが不要なら省略可。
- Array(JSON)→ Sequence(YAML)
  - [a, b] → 
    - a
    - b
  - 順序はそのまま保持。
- String(JSON)→ Scalar(YAML)
  - 文字列は「プレーン」「ダブルクォート」「リテラルブロック(|)」のいずれかで表現。
  - 判定ロジックは後述。
- Number(JSON)→ Number(YAML)
  - 整数・浮動小数をそのまま文字列表現へ。
  - JSONにNaN/Infinityは存在しない前提(出力しない)。
- Boolean(JSON)→ Boolean(YAML)
  - true → true、false → false(小文字)
- Null(JSON)→ Null(YAML)
  - null → null(~は使わない方針)
- 日付/時刻
  - JSONでは文字列。YAML側で型推測されないよう文字列のまま出力(必要に応じて引用)。

文字列の引用判定ロジック(YAML 1.2を前提に安全運用)
- 以下のいずれかに該当する場合はダブルクォートで囲む("...")。
  - 先頭または末尾に空白がある
  - 改行を含まないが、次の文字を含む/先頭が危険文字
    - 「#」(コメント開始になりうる)
    - 「: 」(コロンに続くスペースはマッピング記号と衝突)
    - 先頭が「-」「?」「@」「`」「!」「*」「&」「%」「|」「>」
  - JSONの文字列値が次の曖昧トークンに完全一致(大小問わず)
    - "true", "false", "null", "~", "yes", "no", "on", "off"
    - 注: YAML 1.2では yes/no/on/off はブーリアンではないが、1.1系を避けるため安全に引用する
  - ISO8601風の日時文字列(例: 2024-01-01T12:00:00Z 等)はパーサ差異対策として引用推奨
  - 制御文字や非表示文字を含む(JSONのエスケープをYAMLのダブルクォート内で保持)
- 文字列に改行が含まれる場合はリテラルブロック(|)で出力し、改行を忠実に保持
  - 末尾改行の有無も正確に維持
- 上記に該当しない場合のみプレーンスカラー(引用なし)で出力
  - プレーン許容文字の目安: 英数字、ハイフン(-)、アンダースコア(_)、ドット(.)、スラッシュ(/)
  - 空文字はダブルクォートで "" とする

出力フォーマット規則
- インデントは2スペース
- シーケンスは「- 」で要素を列挙
- マッピングは「key: value」形式
- 必要に応じてドキュメント開始記号「---」を付与
- アンカー(&)やエイリアス(*)、タグ(!!)は生成しない(JSON互換の維持)

変換アルゴリズム(擬似コード)
- function to_yaml(value, indent=0):
  - switch type(value):
    - dict:
      - for (k, v) in insertion_order(value):
        - k_out = quote_key_if_needed(k)  // キーも文字列。通常はプレーンだが危険時はダブルクォート
        - print(indent_spaces(indent) + k_out + ": " + inline_or_child(v, indent))
    - list:
      - for item in value:
        - print(indent_spaces(indent) + "- " + inline_or_child(item, indent))
    - string:
      - if contains_newline(value):
        - print(block_literal("|", value, indent))  // 改行をそのまま保持
      - else if needs_double_quote(value):
        - print(double_quoted(value))  // JSONエスケープはYAMLダブルクォートへ移植
      - else:
        - print(plain(value))
    - bool:
      - print("true" if value else "false")
    - null:
      - print("null")
    - number:
      - print(number_to_string(value))  // 整数/浮動小数を文字列化(指数表記等はそのまま)
- function needs_double_quote(s):
  - if s == "" return true
  - if leading_or_trailing_space(s) return true
  - if contains_any(s, ["#"]) return true
  - if contains_substring(s, ": ") return true
  - if starts_with_any(s, ["-", "?", "@", "`", "!", "*", "&", "%", "|", ">"]) return true
  - if is_ambiguous_token_ci(s, ["true","false","null","~","yes","no","on","off"]) return true
  - if looks_like_iso8601_timestamp(s) return true
  - if contains_control_char(s) return true
  - return false

Python実装例(ruamel.yamlを使用、YAML 1.2想定)
- ポイント
  - ruamel.yaml.scalarstring の LiteralScalarString(|)、DoubleQuotedScalarString(")を使ってスタイルを明示。
  - 再帰的にJSON値を走査してスタイル化したオブジェクトへ変換後、YAMLへダンプ。
- コード
  - from json import loads
  - from ruamel.yaml import YAML
  - from ruamel.yaml.scalarstring import LiteralScalarString, DoubleQuotedScalarString
  - def convert_json_to_yaml_str(json_str):
      - data = loads(json_str)
      - def style_string(s):
        - if "\n" in s:
          - return LiteralScalarString(s)  # 改行保持
        - if needs_double_quote(s):
          - return DoubleQuotedScalarString(s)
        - return s  # プレーン
      - def transform(node):
        - if isinstance(node, dict):
          - return { (style_string(k) if isinstance(k, str) else k): transform(v) for k, v in node.items() }
        - if isinstance(node, list):
          - return [ transform(x) for x in node ]
        - if isinstance(node, str):
          - return style_string(node)
        - return node  # bool, None, number はそのまま
      - yaml = YAML()
      - yaml.version = (1, 2)
      - yaml.indent(mapping=2, sequence=2, offset=0)
      - out = transform(data)
      - import io
      - buf = io.StringIO()
      - yaml.dump(out, buf)
      - return buf.getvalue()
  - 上記の needs_double_quote(s) は前述ロジックをそのままPythonで実装。

検証・テスト観点
- 型保持テスト
  - JSONのboolean/null/numberがYAMLで同型として再解釈されることを確認。
- 文字列の安全性テスト
  - "yes", "on", "#comment", ": value" 等が文字列として保持される(引用される)ことを確認。
  - 改行文字列がリテラルブロックで出力され、改行・末尾改行が一致することを確認。
- 構造・順序
  - 配列順序・オブジェクト挿入順を保持。
- エッジケース
  - 空文字列、先頭/末尾空白、長大文字列(折り畳み不要方針なら幅制限を外す)、Unicode絵文字・サロゲートペア。

このロジックにより、JSONの意味を損なわず、YAMLのパーサ差異による型誤解釈を最小化した安全な変換が可能です。

适用用户

数据工程师

统一不同来源的数据格式,制定字段对应关系与转换规则,产出实施步骤与校验清单,并生成多语言交付文档支持上线

商业分析师

将原始报表与日志整理为可分析结构,明确口径与清洗规则,获得可复用的转换方案,提升洞察的准确度与时效

产品经理

把分散业务数据整合为可追踪指标,形成标准化转换流程与注意事项,与技术团队快速对齐需求并加速交付

运维与数据平台负责人

为新接入数据制定统一转换规范,评估各方案利弊与风险,落地可维护的管道文档与验收标准,降低后期运维成本

客服运营

将对话与工单数据转成易分析格式,明确脱敏与合规处理策略,快速生成迭代方案支撑活动评估与复盘

教育培训讲师

用清晰案例演示转换流程,输出标准化讲义与练习指引,帮助学员掌握从源到目标的完整设计方法

解决的问题

帮助团队快速将“现有数据格式”无缝转换为“目标数据格式”,在几分钟内生成可执行的转换方案与规则清单,配套质量校验与落地步骤,支持多语言输出,显著降低人为错误与返工,提升数据接入、系统迁移与报表统一的交付速度。

特征总结

一键生成跨格式数据转换思路与步骤,帮助你从现状到目标格式快速落地
自动分析源数据与目标需求,给出清晰字段对应与规则,避免遗漏与偏差
支持多语言输出说明文档,便于团队内外沟通、审阅与交付
提供可复制的转换清单与注意事项,让实施更顺畅、复用更简单
针对不同业务场景给出示例与变体,缩短试错时间,提升上线速度
内置质量校验与异常处理建议,降低数据错误风险,保障结果可靠
可按模板与参数定制转换逻辑,全面适配你独特流程与约束
结合摄取、转换、存储全流程视角,让管道设计更完整、更可维护
输出结构化技术写作风格说明,便于审阅、交接与合规备案
快速比较多种转换方案利弊与适用场景,帮助你做出更稳妥选择

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

¥15.00元
平台提供免费试用机制,
确保效果符合预期,再付费购买!

您购买后可以获得什么

获得完整提示词模板
- 共 245 tokens
- 3 个可调节参数
{ 当前数据格式 } { 目标数据格式 } { 输出语言 }
自动加入"我的提示词库"
- 获得提示词优化器支持
- 版本化管理支持
获得社区共享的应用案例
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59