提供数据从一种格式转换为另一种格式的逻辑解决方案。
以下为将 NDJSON(每行一个 JSON 对象)转换为 CSV 的标准化转换逻辑及参考实现。重点覆盖模式推断、嵌套展开、数组处理、缺失值、类型与转义、错误与性能。 一、输入与输出定义 - 输入:UTF-8 编码 NDJSON 文件;每行一个合法 JSON 对象。 - 输出:UTF-8 编码 CSV 文件;第一行为表头;后续为数据行。 二、模式与列顺序(Schema) - 优先使用预定义模式(推荐):提前声明要输出的列及其路径映射,确保一次扫描即可输出。 - 无预定义模式时: - 两次扫描策略:第一次扫描收集所有行扁平化后的键集合,生成表头;第二次扫描写数据。 - 列顺序建议:稳定排序(例如字典序),或按照业务优先级固定顺序。 - 键名规范化:若原始键含有分隔符(如“.”),建议统一替换为“_”,避免与路径分隔符冲突。 三、嵌套结构展开规则(对象/字典) - 扁平化策略(dot path,默认): - 嵌套对象按 “父.子” 路径生成列名。例如 {"a":{"b":1}} → 列 "a.b" = 1。 - 键冲突处理: - 若不同路径产生相同列名(因键名重复或替换规则),需加前缀或使用枚举策略(如 "a.b" 与 "a_b" 冲突时,优先原始 dot path,并将替换后的冲突键重命名为 "a_b__dup")。 四、数组处理策略 数组是 CSV 转换的关键,需明确策略(可按字段配置): - 连接(join,单行):将数组按分隔符连接为字符串。 - 原子类型数组(string/number/bool)可直接连接,如 tags=["a","b"] → "a|b"。 - 对象数组:可选择将每个元素序列化为紧凑 JSON 再连接,如 items=[{"id":1},{"id":2}] → {"id":1}|{"id":2}。 - 展开(explode,多行):数组产生多行;如一个字段配置为 explode,则每个元素生成一行。 - 多数组字段同时 explode 时需生成笛卡尔积(行数可能急剧膨胀,需审慎)。 - 选择策略建议: - 明确指定需要分析明细的数组字段 explode;其它字段 join。 - 若存在多个数组字段,仅允许一个 explode,或限制在小规模的笛卡尔积范围(如设定最大乘积阈值)。 五、数据类型与格式 - 字符串保留原样,必要时进行转义。 - 数字以十进制字符串输出;避免科学计数法(如需要,可格式化)。 - 布尔转为 true/false。 - 时间戳:若有标准格式,统一输出为 ISO 8601(建议在预处理/映射层完成);否则原样。 - 二进制/超长字段:建议 base64 或摘要化(取决于业务),避免污染 CSV。 六、空值与缺失 - JSON null → CSV 空单元格。 - 缺失字段(行中不存在某列) → CSV 空单元格。 - 明确区分空字符串 "" 与缺失/NULL。 七、CSV 写出与转义 - 分隔符:逗号,或根据业务使用制表符(TSV)。 - 引号策略:最小引号(仅当字段包含分隔符、引号或换行时加双引号),内部双引号转义为两个双引号。 - 换行:禁止字段跨行;若数据包含换行,需被引号包裹。 - 编码:UTF-8,无 BOM;必要时统一规范化为 NFC。 - 表头固定且写一次。 八、错误与数据质量 - 每行独立解析;遇到无效 JSON 行: - 记录至错误文件(包含行号与内容),跳过或中止(可配置)。 - 类型不一致(同一列出现多种类型): - 默认转字符串;可记录类型冲突警告。 - 超长字段或笛卡尔膨胀: - 设置行长度和最大展开行数的上限;超过则丢入隔离区。 九、性能与可扩展性 - 流式处理:按行读取与写出,降低内存占用。 - 压缩:支持 .gz 输入;输出按需压缩。 - 大文件:两次扫描时避免将整文件载入内存;使用临时文件缓存或先收集 schema 再二次读取。 - 并行:可按文件分片或行块并行处理,合并时确保表头一致。 - 大数据场景:优先使用分布式引擎(Spark/Flink),通过 DataFrame 展开与写出 CSV。 十、参考实现(Python,流式,支持 join 和可选单字段 explode) 说明: - 默认扁平化对象并将数组 join 为字符串。 - 可配置一个字段进行 explode(多字段 explode 需扩展笛卡尔逻辑)。 - 两次扫描推断表头;若有预定义 schema,可跳过第一次扫描。 示例代码: - 依赖标准库;如需更高性能,json 可替换为 orjson。 ```python import json import csv import gzip import os from typing import Any, Dict, List, Iterable, Tuple, Optional def open_maybe_gzip(path: str, mode: str = 'rt', encoding: str = 'utf-8'): if path.endswith('.gz'): return gzip.open(path, mode, encoding=encoding) return open(path, mode, encoding=encoding) def sanitize_key(key: str) -> str: # 避免路径分隔冲突 return key.replace('.', '_') def flatten(obj: Any, parent_key: str = '', sep: str = '.') -> Dict[str, Any]: """对象扁平化;数组按原样保留为 Python list(后续统一处理)。""" items: Dict[str, Any] = {} if isinstance(obj, dict): for k, v in obj.items(): k2 = sanitize_key(k) new_key = f"{parent_key}{sep}{k2}" if parent_key else k2 items.update(flatten(v, new_key, sep)) elif isinstance(obj, list): # 暂存 list,后续根据策略处理 items[parent_key] = obj else: items[parent_key] = obj return items def to_str(value: Any) -> str: if value is None: return '' if isinstance(value, (int, float, bool)): return str(value) if isinstance(value, str): return value # 其他类型(如对象、列表)统一序列化为紧凑 JSON return json.dumps(value, ensure_ascii=False, separators=(',', ':')) def process_arrays(row: Dict[str, Any], join_delim: str = '|', explode_field: Optional[str] = None) -> Iterable[Dict[str, Any]]: """ 将数组处理为 join 或对指定字段进行 explode。 explode_field 为扁平化后的列名(例如 'items' 或 'a.b.items')。 如果 explode_field 不存在或不是数组,则按 join 输出单行。 """ # 准备一个可变副本 base = dict(row) # 先对非 explode 的数组进行 join for key, val in list(base.items()): if isinstance(val, list): if explode_field and key == explode_field: # 留待后续处理 continue # 原子类型数组直接 join;对象数组先序列化每个元素再 join joined = [] for el in val: if isinstance(el, (str, int, float, bool)) or el is None: joined.append(to_str(el)) else: joined.append(json.dumps(el, ensure_ascii=False, separators=(',', ':'))) base[key] = join_delim.join(joined) # 处理 explode 字段 if explode_field and isinstance(row.get(explode_field), list): arr = row[explode_field] if len(arr) == 0: # 空数组 → 生成一行,对该列留空 out = dict(base) out[explode_field] = '' yield out return for el in arr: out = dict(base) # 元素为对象时可进一步扁平化到子列;否则直接字符串化 if isinstance(el, dict): sub = flatten(el, explode_field) out.update({k: to_str(v) for k, v in sub.items()}) # 同时保留原 explode 列的摘要(可选) out[explode_field] = json.dumps(el, ensure_ascii=False, separators=(',', ':')) else: out[explode_field] = to_str(el) yield out else: yield base def infer_header(ndjson_path: str, explode_field: Optional[str] = None) -> List[str]: cols = set() with open_maybe_gzip(ndjson_path, 'rt') as f: for lineno, line in enumerate(f, start=1): line = line.strip() if not line: continue try: obj = json.loads(line) except Exception: # 记录错误,跳过 # 生产环境应写入错误通道 continue flat = flatten(obj) # 先进行数组处理(为了捕获因对象数组展开产生的子列) for processed in process_arrays(flat, explode_field=explode_field): for k in processed.keys(): cols.add(k) return sorted(cols) def ndjson_to_csv(in_path: str, out_path: str, predefined_header: Optional[List[str]] = None, explode_field: Optional[str] = None, join_delim: str = '|'): header = predefined_header or infer_header(in_path, explode_field=explode_field) # 写 CSV with open_maybe_gzip(in_path, 'rt') as fin, open(out_path, 'w', encoding='utf-8', newline='') as fout: writer = csv.DictWriter(fout, fieldnames=header, extrasaction='ignore', quoting=csv.QUOTE_MINIMAL) writer.writeheader() for lineno, line in enumerate(fin, start=1): line = line.strip() if not line: continue try: obj = json.loads(line) except Exception: # 记录并跳过 # 可将错误行写入 out_path + '.error' continue flat = flatten(obj) for processed in process_arrays(flat, join_delim=join_delim, explode_field=explode_field): # 缺失列填空 row = {k: to_str(processed.get(k)) for k in header} writer.writerow(row) # 使用示例: # ndjson_to_csv('input.ndjson', 'output.csv', explode_field='items', join_delim='|') # 若有预定义列:ndjson_to_csv('input.ndjson', 'output.csv', predefined_header=['id','a.b','items','tags']) ``` 十一、Spark 转换(适用于大规模数据) - 读入 NDJSON:spark.read.json("path")。 - 扁平化:对 struct 使用 selectExpr 展开为列;对数组: - join:使用 concat_ws('|', col('tags')) - explode:使用 explode(col('items')) 并按需对元素 struct 再次展开。 - 写出:df.write.option("header", True).csv("out_path"). 十二、配置建议 - 配置文件声明: - 列映射:JSONPath 或 dot path → 列名。 - 数组策略:每列选择 join/explode,join 的分隔符。 - 键名清洗规则与冲突策略。 - 错误行处理策略(跳过/隔离/中止)。 - 性能阈值(最大展开乘积、最大字段长度)。 该逻辑在数据工程实践中具备可移植性与可扩展性,既能快速在单机脚本中落地,也能在分布式计算框架中保持一致的语义与质量控制。
TSV conversion logic for logs Scope - Input: arbitrary application logs in either text or JSON-lines. - Output: tab-separated values (TSV) with a defined header. One record per log event. - Constraints: streaming-friendly, robust to missing fields, tabs/newlines in values, mixed formats, and occasional multi-line entries. 1) Target TSV schema Use a fixed schema to maintain streaming compatibility. Recommended minimal columns: - ts_iso: normalized timestamp in ISO 8601 (UTC if tz provided; otherwise keep naive). - level: normalized log level (TRACE/DEBUG/INFO/WARN/ERROR/FATAL). - service: service or application name (if discoverable). - host: hostname or source (if discoverable). - logger: logger name/category (if present). - thread: thread identifier (if present). - request_id: request or correlation id (if present). - user_id: user identifier (if present). - client_ip: client IP (if present). - method: HTTP method (if present). - path: URL path (if present). - status: HTTP status (if present). - latency_ms: request latency in milliseconds (if parseable). - message: human-readable message portion. - extras_json: JSON-encoded string of any remaining key-value pairs. - raw: original log line for auditability. Notes: - If your environment requires a different set of fixed columns, adjust accordingly. Keep an extras_json catch-all for forward compatibility. 2) Detection and parsing strategy The parser operates per line and chooses one of two branches: A) JSON log line - Condition: line starts with “{” after leading whitespace. - Action: - Parse JSON into a dict; if parsing fails, treat as text log (branch B). - Flatten to a single level: nested keys joined by dots (e.g., http.request.method → http.request.method) or use a predetermined mapping. - Map known fields into the target schema (see section 3). - Put unmapped fields into extras_json. B) Text log line with optional key=value pairs - Identify timestamp: - Try common patterns via regex and datetime parsing: - YYYY-MM-DD HH:MM:SS,mmm - YYYY-MM-DD HH:MM:SS - YYYY-MM-DDTHH:MM:SSZ - YYYY-MM-DDTHH:MM:SS.sssZ - Syslog-like: MMM dd HH:MM:SS (year unknown; if no year, keep as original string) - Identify level token (case-insensitive): TRACE|DEBUG|INFO|WARN|WARNING|ERROR|CRITICAL|FATAL. - Extract key=value pairs: - Regex: (\w[\w\.\-]*)=("([^"\\]|\\.)*"|\S+) - Supports quoted values with spaces; unescape quotes and backslashes. - Key may include dot or dash. - Message extraction: - Take the substring after [timestamp][optional logger/thread][level] up to the first key=value pair; trim. - If no key=value pairs, the remainder is the message. - Map known keys to schema; place remaining pairs in extras_json. 3) Field mapping and normalization Mapping rules (examples; adapt to your environment): - Timestamp (ts_iso): - For JSON logs: prefer fields like time, timestamp, ts, @timestamp. - For text logs: use the first recognized timestamp token. - Normalization: convert to ISO 8601. If timezone provided, convert to UTC (e.g., 2025-09-26T12:05:34.123Z). If no timezone, output naive ISO 8601 without Z. - If parsing fails, leave ts_iso empty and retain the original in extras_json.timestamp_raw. - Level: - Map aliases: WARNING→WARN, CRITICAL→ERROR (or FATAL if preferred), VERBOSE→DEBUG. - Output uppercase canonical values. - Service, host, logger, thread: - From keys: service, app, application; host, hostname; logger, category; thread, tid. If not present, leave empty. - HTTP-related: - method from method, http_method, http.request.method. - path from path, url, uri, http.request.path. - status from status, http.status, response.status. - client_ip from client_ip, ip, remote_addr, http.client_ip. - Request/user identifiers: - request_id from request_id, req_id, correlation_id, trace_id, x_request_id. - user_id from user_id, uid, user.id, principal. - Latency: - Accept values like “123ms”, “1.5s”, “800us”, numeric milliseconds. - Normalize: - ms → value - s → value × 1000 - us/µs → value / 1000 - ns → value / 1_000_000 - If unitless integer/float, treat as milliseconds. - If unparsable, leave latency_ms empty and persist original under extras_json.latency_raw. - Message: - If JSON logs: use message, msg, log, event as primary; else let message be empty. - For text logs: the extracted message substring. - extras_json: - JSON-encode remaining unmapped key-value pairs. - Ensure tab/newline escaping (see section 4). - raw: - The original line, unmodified (except escaping tabs/newlines when outputting TSV). 4) Escaping and TSV output rules - Separator: single tab character between fields. - Header: emit once at the start in the specified column order. - Value sanitation: - Replace literal tab with \t. - Replace newline with \n and carriage return with \r. - Remove or escape other control characters as needed. - For extras_json, ensure it is a compact JSON string with the same escaping rules applied to embedded tabs/newlines. - Missing values: output as empty string. - No quoting in TSV; rely on escaping. 5) Multi-line log handling - If a line lacks a timestamp and level and does not look like JSON, treat it as a continuation of the previous record’s message. - Append to previous message with a literal \n inserted. - If strict one-line-per-record is required, disable multi-line merge and include the entire raw line as a separate row with empty fields. 6) Example Input lines: 1) 2025-09-26 12:05:34,123 INFO request_id=abc123 user_id=42 method=GET path=/api/v1/items latency=12ms Completed request 2) {"time":"2025-09-26T12:05:35.001+00:00","level":"warn","service":"checkout","http":{"method":"POST","path":"/orders"},"latency_ms":75,"message":"Payment pending"} Output TSV (header + rows): ts_iso level service host logger thread request_id user_id client_ip method path status latency_ms message extras_json raw 2025-09-26T12:05:34.123 INFO abc123 42 GET /api/v1/items 12 Completed request {} 2025-09-26 12:05:34,123 INFO request_id=abc123 user_id=42 method=GET path=/api/v1/items latency=12ms Completed request 2025-09-26T12:05:35.001Z WARN checkout {"http.method":"POST","http.path":"/orders"} 75 Payment pending {"http.method":"POST","http.path":"/orders"} {"time":"2025-09-26T12:05:35.001+00:00","level":"warn","service":"checkout","http":{"method":"POST","path":"/orders"},"latency_ms":75,"message":"Payment pending"} Note: Tabs are shown conceptually above; actual output will use tab characters. Newlines/tabs inside fields must be escaped. 7) Reference implementation (Python, streaming) This implementation handles both JSON and text logs, performs normalization, and writes TSV to stdout. It avoids non-standard libraries. ```python import sys import json import re from datetime import datetime, timezone # Precompiled regexes TS_PATTERNS = [ ("%Y-%m-%d %H:%M:%S,%f", None), ("%Y-%m-%d %H:%M:%S", None), ("%Y-%m-%dT%H:%M:%S.%f%z", "utc"), ("%Y-%m-%dT%H:%M:%S%z", "utc"), ("%Y-%m-%dT%H:%M:%S.%fZ", "z"), ("%Y-%m-%dT%H:%M:%SZ", "z"), ] LEVEL_RE = re.compile(r'\b(trace|debug|info|warn|warning|error|critical|fatal)\b', re.IGNORECASE) KV_RE = re.compile(r'(\w[\w\.\-]*)=("([^"\\]|\\.)*"|\S+)') HEADER = [ "ts_iso","level","service","host","logger","thread","request_id","user_id","client_ip", "method","path","status","latency_ms","message","extras_json","raw" ] def normalize_level(lvl): if not lvl: return "" l = lvl.upper() if l == "WARNING": return "WARN" if l == "CRITICAL": return "ERROR" # adjust to FATAL if preferred return l def parse_ts(s): s = s.strip() for fmt, mode in TS_PATTERNS: try: if "%z" in fmt: dt = datetime.strptime(s, fmt) if mode == "utc": return dt.astimezone(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") elif mode == "z": # Already Zulu; ensure milliseconds where possible if dt.microsecond: return dt.isoformat(timespec="milliseconds").replace("+00:00", "Z") return dt.replace(tzinfo=timezone.utc).isoformat().replace("+00:00","Z") else: dt = datetime.strptime(s, fmt) # naive timestamp if "%f" in fmt: return dt.isoformat(timespec="milliseconds") else: return dt.isoformat() except ValueError: continue return "" # failed def sanitize(v): if v is None: return "" s = str(v) return s.replace("\t","\\t").replace("\n","\\n").replace("\r","\\r") def unquote(value): # Remove surrounding quotes and unescape \" and \\ inside if len(value) >= 2 and value[0] == '"' and value[-1] == '"': inner = value[1:-1] inner = inner.replace('\\"', '"').replace('\\\\', '\\') return inner return value def normalize_latency(v): if v is None: return "" s = str(v).strip().lower() try: if s.endswith("ms"): return str(float(s[:-2])) if s.endswith("s"): return str(float(s[:-1]) * 1000.0) if s.endswith("us") or s.endswith("µs"): return str(float(s[:-2]) / 1000.0) if s.endswith("ns"): return str(float(s[:-2]) / 1_000_000.0) # unitless: assume milliseconds return str(float(s)) except ValueError: return "" # leave raw in extras_json if needed def parse_json_line(line): try: obj = json.loads(line) except json.JSONDecodeError: return None flat = {} def flatten(prefix, val): if isinstance(val, dict): for k, v in val.items(): nk = f"{prefix}.{k}" if prefix else k flatten(nk, v) else: flat[prefix] = val flatten("", obj) row = {k: "" for k in HEADER} row["raw"] = line.rstrip("\n") # Timestamp candidates for key in ("@timestamp","time","timestamp","ts"): if key in flat: row["ts_iso"] = parse_ts(str(flat[key])) if not row["ts_iso"]: row["extras_json"] = json.dumps({"timestamp_raw": flat[key]}, separators=(",",":")) break row["level"] = normalize_level(flat.get("level") or flat.get("log.level") or flat.get("severity")) row["service"] = str(flat.get("service") or flat.get("app") or flat.get("application") or "") row["host"] = str(flat.get("host") or flat.get("hostname") or "") row["logger"] = str(flat.get("logger") or flat.get("log.logger") or "") row["thread"] = str(flat.get("thread") or flat.get("tid") or "") row["request_id"] = str(flat.get("request_id") or flat.get("req_id") or flat.get("correlation_id") or flat.get("trace_id") or "") row["user_id"] = str(flat.get("user_id") or flat.get("user.id") or flat.get("uid") or "") row["client_ip"] = str(flat.get("client_ip") or flat.get("ip") or flat.get("remote_addr") or flat.get("http.client_ip") or "") row["method"] = str(flat.get("method") or flat.get("http.method") or flat.get("http.request.method") or "") row["path"] = str(flat.get("path") or flat.get("url") or flat.get("uri") or flat.get("http.path") or flat.get("http.request.path") or "") row["status"] = str(flat.get("status") or flat.get("http.status") or flat.get("response.status") or "") latency = flat.get("latency_ms") or flat.get("latency") or flat.get("http.latency") or None row["latency_ms"] = normalize_latency(latency) row["message"] = str(flat.get("message") or flat.get("msg") or flat.get("log") or flat.get("event") or "") # Build extras from unmapped keys mapped = set([ "@timestamp","time","timestamp","ts","level","log.level","severity","service","app","application", "host","hostname","logger","log.logger","thread","tid","request_id","req_id","correlation_id","trace_id", "user_id","user.id","uid","client_ip","ip","remote_addr","http.client_ip", "method","http.method","http.request.method","path","url","uri","http.path","http.request.path", "status","http.status","response.status","latency","latency_ms","http.latency","message","msg","log","event" ]) extras = {k: v for k, v in flat.items() if k not in mapped} row["extras_json"] = json.dumps(extras, separators=(",",":")) if extras else "{}" return row def parse_text_line(line): row = {k: "" for k in HEADER} row["raw"] = line.rstrip("\n") s = line.strip() # Attempt to find timestamp at start substrings ts_match = None # Heuristic: check first ~40 chars for timestamp tokens separated by space or 'T' head = s[:40] # Try direct parsing by slicing common lengths for token_len in (23, 19, 20, 24): # rough candidates token = head[:token_len] ts_iso = parse_ts(token) if ts_iso: row["ts_iso"] = ts_iso ts_match = token break # Level level_m = LEVEL_RE.search(s) if level_m: row["level"] = normalize_level(level_m.group(0)) # KV pairs kvs = [] for m in KV_RE.finditer(s): key = m.group(1) val = unquote(m.group(2)) kvs.append((key, val)) # Message: substring after level up to first kv pair msg = s if level_m: start = level_m.end() msg = s[start:].strip() if kvs: first_kv_start = s.find(kvs[0][0] + "=") if first_kv_start != -1: msg = s[:first_kv_start].strip() row["message"] = msg # Map common keys def get(klist): for k in klist: for kk, vv in kvs: if kk == k: return vv return None row["service"] = get(["service","app","application"]) or "" row["host"] = get(["host","hostname"]) or "" row["logger"] = get(["logger","category"]) or "" row["thread"] = get(["thread","tid"]) or "" row["request_id"] = get(["request_id","req_id","correlation_id","trace_id","x_request_id"]) or "" row["user_id"] = get(["user_id","uid","user.id","principal"]) or "" row["client_ip"] = get(["client_ip","ip","remote_addr"]) or "" row["method"] = get(["method","http_method"]) or "" row["path"] = get(["path","url","uri"]) or "" row["status"] = get(["status","http_status"]) or "" row["latency_ms"] = normalize_latency(get(["latency","latency_ms","duration","elapsed"])) # Build extras mapped_keys = set([ "service","app","application","host","hostname","logger","category","thread","tid", "request_id","req_id","correlation_id","trace_id","x_request_id", "user_id","uid","user.id","principal","client_ip","ip","remote_addr", "method","http_method","path","url","uri","status","http_status","latency","latency_ms","duration","elapsed" ]) extras = {k: v for (k, v) in kvs if k not in mapped_keys} row["extras_json"] = json.dumps(extras, separators=(",",":")) if extras else "{}" return row def emit_tsv_row(row): fields = [sanitize(row.get(col, "")) for col in HEADER] sys.stdout.write("\t".join(fields) + "\n") def main(): # Write header sys.stdout.write("\t".join(HEADER) + "\n") for line in sys.stdin: stripped = line.lstrip() parsed = None if stripped.startswith("{"): parsed = parse_json_line(line) if parsed is None: parsed = parse_text_line(line) emit_tsv_row(parsed) if __name__ == "__main__": main() ``` 8) Operational guidance - Run as: python log_to_tsv.py < input.log > output.tsv - For large files, consider: - Using buffered I/O (default in Python is sufficient for most cases). - Precompiling all regexes (done). - Avoiding per-line JSON dumps for empty extras (use "{}" constant as shown). - Parallelization: sharding input files and merging outputs; ensure identical headers. - Monitoring and testing: - Create unit tests with representative samples for each log format you encounter. - Validate TSV with a reader that checks column counts and escaping. - Track parse failure rates (e.g., empty ts_iso or level) and revisit patterns accordingly. This logic establishes a deterministic, streaming-safe conversion of diverse logs to TSV suitable for ingestion into data warehouses or lakehouse tables. Adjust field mappings to your domain, and keep extras_json for schema evolution.
以下は、JSONをYAMLへ安全かつ忠実に変換するための実務的な変換ロジックです。目的は「JSONの型・値の意味を保ち、YAMLの解釈による型誤推測を回避する」ことです。YAML 1.2準拠を前提に記述します。 前提 - JSONはYAML 1.2の厳密な部分集合であり、原則として型対応は1対1で可能。 - ただしYAMLパーサ(特にYAML 1.1系)による曖昧なスカラー解釈(例: yes/no, on/off のブーリアン解釈)を避けるため、文字列の引用規則を設ける。 - JSONのオブジェクトのキーは文字列のみ。YAMLのマッピングキーも文字列で出力する。 - 順序は入力順を維持(一般的なJSONパーサでは挿入順を保持できる)。 型マッピング規則 - Object(JSON)→ Mapping(YAML) - { "k": v } → k: v - インデントは2スペース推奨。 - ドキュメント開始記号(---)は任意。複数ドキュメント扱いが不要なら省略可。 - Array(JSON)→ Sequence(YAML) - [a, b] → - a - b - 順序はそのまま保持。 - String(JSON)→ Scalar(YAML) - 文字列は「プレーン」「ダブルクォート」「リテラルブロック(|)」のいずれかで表現。 - 判定ロジックは後述。 - Number(JSON)→ Number(YAML) - 整数・浮動小数をそのまま文字列表現へ。 - JSONにNaN/Infinityは存在しない前提(出力しない)。 - Boolean(JSON)→ Boolean(YAML) - true → true、false → false(小文字) - Null(JSON)→ Null(YAML) - null → null(~は使わない方針) - 日付/時刻 - JSONでは文字列。YAML側で型推測されないよう文字列のまま出力(必要に応じて引用)。 文字列の引用判定ロジック(YAML 1.2を前提に安全運用) - 以下のいずれかに該当する場合はダブルクォートで囲む("...")。 - 先頭または末尾に空白がある - 改行を含まないが、次の文字を含む/先頭が危険文字 - 「#」(コメント開始になりうる) - 「: 」(コロンに続くスペースはマッピング記号と衝突) - 先頭が「-」「?」「@」「`」「!」「*」「&」「%」「|」「>」 - JSONの文字列値が次の曖昧トークンに完全一致(大小問わず) - "true", "false", "null", "~", "yes", "no", "on", "off" - 注: YAML 1.2では yes/no/on/off はブーリアンではないが、1.1系を避けるため安全に引用する - ISO8601風の日時文字列(例: 2024-01-01T12:00:00Z 等)はパーサ差異対策として引用推奨 - 制御文字や非表示文字を含む(JSONのエスケープをYAMLのダブルクォート内で保持) - 文字列に改行が含まれる場合はリテラルブロック(|)で出力し、改行を忠実に保持 - 末尾改行の有無も正確に維持 - 上記に該当しない場合のみプレーンスカラー(引用なし)で出力 - プレーン許容文字の目安: 英数字、ハイフン(-)、アンダースコア(_)、ドット(.)、スラッシュ(/) - 空文字はダブルクォートで "" とする 出力フォーマット規則 - インデントは2スペース - シーケンスは「- 」で要素を列挙 - マッピングは「key: value」形式 - 必要に応じてドキュメント開始記号「---」を付与 - アンカー(&)やエイリアス(*)、タグ(!!)は生成しない(JSON互換の維持) 変換アルゴリズム(擬似コード) - function to_yaml(value, indent=0): - switch type(value): - dict: - for (k, v) in insertion_order(value): - k_out = quote_key_if_needed(k) // キーも文字列。通常はプレーンだが危険時はダブルクォート - print(indent_spaces(indent) + k_out + ": " + inline_or_child(v, indent)) - list: - for item in value: - print(indent_spaces(indent) + "- " + inline_or_child(item, indent)) - string: - if contains_newline(value): - print(block_literal("|", value, indent)) // 改行をそのまま保持 - else if needs_double_quote(value): - print(double_quoted(value)) // JSONエスケープはYAMLダブルクォートへ移植 - else: - print(plain(value)) - bool: - print("true" if value else "false") - null: - print("null") - number: - print(number_to_string(value)) // 整数/浮動小数を文字列化(指数表記等はそのまま) - function needs_double_quote(s): - if s == "" return true - if leading_or_trailing_space(s) return true - if contains_any(s, ["#"]) return true - if contains_substring(s, ": ") return true - if starts_with_any(s, ["-", "?", "@", "`", "!", "*", "&", "%", "|", ">"]) return true - if is_ambiguous_token_ci(s, ["true","false","null","~","yes","no","on","off"]) return true - if looks_like_iso8601_timestamp(s) return true - if contains_control_char(s) return true - return false Python実装例(ruamel.yamlを使用、YAML 1.2想定) - ポイント - ruamel.yaml.scalarstring の LiteralScalarString(|)、DoubleQuotedScalarString(")を使ってスタイルを明示。 - 再帰的にJSON値を走査してスタイル化したオブジェクトへ変換後、YAMLへダンプ。 - コード - from json import loads - from ruamel.yaml import YAML - from ruamel.yaml.scalarstring import LiteralScalarString, DoubleQuotedScalarString - def convert_json_to_yaml_str(json_str): - data = loads(json_str) - def style_string(s): - if "\n" in s: - return LiteralScalarString(s) # 改行保持 - if needs_double_quote(s): - return DoubleQuotedScalarString(s) - return s # プレーン - def transform(node): - if isinstance(node, dict): - return { (style_string(k) if isinstance(k, str) else k): transform(v) for k, v in node.items() } - if isinstance(node, list): - return [ transform(x) for x in node ] - if isinstance(node, str): - return style_string(node) - return node # bool, None, number はそのまま - yaml = YAML() - yaml.version = (1, 2) - yaml.indent(mapping=2, sequence=2, offset=0) - out = transform(data) - import io - buf = io.StringIO() - yaml.dump(out, buf) - return buf.getvalue() - 上記の needs_double_quote(s) は前述ロジックをそのままPythonで実装。 検証・テスト観点 - 型保持テスト - JSONのboolean/null/numberがYAMLで同型として再解釈されることを確認。 - 文字列の安全性テスト - "yes", "on", "#comment", ": value" 等が文字列として保持される(引用される)ことを確認。 - 改行文字列がリテラルブロックで出力され、改行・末尾改行が一致することを確認。 - 構造・順序 - 配列順序・オブジェクト挿入順を保持。 - エッジケース - 空文字列、先頭/末尾空白、長大文字列(折り畳み不要方針なら幅制限を外す)、Unicode絵文字・サロゲートペア。 このロジックにより、JSONの意味を損なわず、YAMLのパーサ差異による型誤解釈を最小化した安全な変換が可能です。
统一不同来源的数据格式,制定字段对应关系与转换规则,产出实施步骤与校验清单,并生成多语言交付文档支持上线
将原始报表与日志整理为可分析结构,明确口径与清洗规则,获得可复用的转换方案,提升洞察的准确度与时效
把分散业务数据整合为可追踪指标,形成标准化转换流程与注意事项,与技术团队快速对齐需求并加速交付
为新接入数据制定统一转换规范,评估各方案利弊与风险,落地可维护的管道文档与验收标准,降低后期运维成本
将对话与工单数据转成易分析格式,明确脱敏与合规处理策略,快速生成迭代方案支撑活动评估与复盘
用清晰案例演示转换流程,输出标准化讲义与练习指引,帮助学员掌握从源到目标的完整设计方法
帮助团队快速将“现有数据格式”无缝转换为“目标数据格式”,在几分钟内生成可执行的转换方案与规则清单,配套质量校验与落地步骤,支持多语言输出,显著降低人为错误与返工,提升数据接入、系统迁移与报表统一的交付速度。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期