热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
提供数据从一种格式转换为另一种格式的逻辑解决方案。
以下为将 NDJSON(每行一个 JSON 对象)转换为 CSV 的标准化转换逻辑及参考实现。重点覆盖模式推断、嵌套展开、数组处理、缺失值、类型与转义、错误与性能。
一、输入与输出定义
二、模式与列顺序(Schema)
三、嵌套结构展开规则(对象/字典)
四、数组处理策略 数组是 CSV 转换的关键,需明确策略(可按字段配置):
五、数据类型与格式
六、空值与缺失
七、CSV 写出与转义
八、错误与数据质量
九、性能与可扩展性
十、参考实现(Python,流式,支持 join 和可选单字段 explode) 说明:
示例代码:
import json
import csv
import gzip
import os
from typing import Any, Dict, List, Iterable, Tuple, Optional
def open_maybe_gzip(path: str, mode: str = 'rt', encoding: str = 'utf-8'):
if path.endswith('.gz'):
return gzip.open(path, mode, encoding=encoding)
return open(path, mode, encoding=encoding)
def sanitize_key(key: str) -> str:
# 避免路径分隔冲突
return key.replace('.', '_')
def flatten(obj: Any, parent_key: str = '', sep: str = '.') -> Dict[str, Any]:
"""对象扁平化;数组按原样保留为 Python list(后续统一处理)。"""
items: Dict[str, Any] = {}
if isinstance(obj, dict):
for k, v in obj.items():
k2 = sanitize_key(k)
new_key = f"{parent_key}{sep}{k2}" if parent_key else k2
items.update(flatten(v, new_key, sep))
elif isinstance(obj, list):
# 暂存 list,后续根据策略处理
items[parent_key] = obj
else:
items[parent_key] = obj
return items
def to_str(value: Any) -> str:
if value is None:
return ''
if isinstance(value, (int, float, bool)):
return str(value)
if isinstance(value, str):
return value
# 其他类型(如对象、列表)统一序列化为紧凑 JSON
return json.dumps(value, ensure_ascii=False, separators=(',', ':'))
def process_arrays(row: Dict[str, Any],
join_delim: str = '|',
explode_field: Optional[str] = None) -> Iterable[Dict[str, Any]]:
"""
将数组处理为 join 或对指定字段进行 explode。
explode_field 为扁平化后的列名(例如 'items' 或 'a.b.items')。
如果 explode_field 不存在或不是数组,则按 join 输出单行。
"""
# 准备一个可变副本
base = dict(row)
# 先对非 explode 的数组进行 join
for key, val in list(base.items()):
if isinstance(val, list):
if explode_field and key == explode_field:
# 留待后续处理
continue
# 原子类型数组直接 join;对象数组先序列化每个元素再 join
joined = []
for el in val:
if isinstance(el, (str, int, float, bool)) or el is None:
joined.append(to_str(el))
else:
joined.append(json.dumps(el, ensure_ascii=False, separators=(',', ':')))
base[key] = join_delim.join(joined)
# 处理 explode 字段
if explode_field and isinstance(row.get(explode_field), list):
arr = row[explode_field]
if len(arr) == 0:
# 空数组 → 生成一行,对该列留空
out = dict(base)
out[explode_field] = ''
yield out
return
for el in arr:
out = dict(base)
# 元素为对象时可进一步扁平化到子列;否则直接字符串化
if isinstance(el, dict):
sub = flatten(el, explode_field)
out.update({k: to_str(v) for k, v in sub.items()})
# 同时保留原 explode 列的摘要(可选)
out[explode_field] = json.dumps(el, ensure_ascii=False, separators=(',', ':'))
else:
out[explode_field] = to_str(el)
yield out
else:
yield base
def infer_header(ndjson_path: str,
explode_field: Optional[str] = None) -> List[str]:
cols = set()
with open_maybe_gzip(ndjson_path, 'rt') as f:
for lineno, line in enumerate(f, start=1):
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except Exception:
# 记录错误,跳过
# 生产环境应写入错误通道
continue
flat = flatten(obj)
# 先进行数组处理(为了捕获因对象数组展开产生的子列)
for processed in process_arrays(flat, explode_field=explode_field):
for k in processed.keys():
cols.add(k)
return sorted(cols)
def ndjson_to_csv(in_path: str,
out_path: str,
predefined_header: Optional[List[str]] = None,
explode_field: Optional[str] = None,
join_delim: str = '|'):
header = predefined_header or infer_header(in_path, explode_field=explode_field)
# 写 CSV
with open_maybe_gzip(in_path, 'rt') as fin, open(out_path, 'w', encoding='utf-8', newline='') as fout:
writer = csv.DictWriter(fout, fieldnames=header, extrasaction='ignore', quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
for lineno, line in enumerate(fin, start=1):
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except Exception:
# 记录并跳过
# 可将错误行写入 out_path + '.error'
continue
flat = flatten(obj)
for processed in process_arrays(flat, join_delim=join_delim, explode_field=explode_field):
# 缺失列填空
row = {k: to_str(processed.get(k)) for k in header}
writer.writerow(row)
# 使用示例:
# ndjson_to_csv('input.ndjson', 'output.csv', explode_field='items', join_delim='|')
# 若有预定义列:ndjson_to_csv('input.ndjson', 'output.csv', predefined_header=['id','a.b','items','tags'])
十一、Spark 转换(适用于大规模数据)
十二、配置建议
该逻辑在数据工程实践中具备可移植性与可扩展性,既能快速在单机脚本中落地,也能在分布式计算框架中保持一致的语义与质量控制。
TSV conversion logic for logs
Scope
Notes:
A) JSON log line
B) Text log line with optional key=value pairs
Timestamp (ts_iso):
Level:
Service, host, logger, thread:
HTTP-related:
Request/user identifiers:
Latency:
Message:
extras_json:
raw:
Input lines:
Output TSV (header + rows): ts_iso level service host logger thread request_id user_id client_ip method path status latency_ms message extras_json raw 2025-09-26T12:05:34.123 INFO abc123 42 GET /api/v1/items 12 Completed request {} 2025-09-26 12:05:34,123 INFO request_id=abc123 user_id=42 method=GET path=/api/v1/items latency=12ms Completed request 2025-09-26T12:05:35.001Z WARN checkout {"http.method":"POST","http.path":"/orders"} 75 Payment pending {"http.method":"POST","http.path":"/orders"} {"time":"2025-09-26T12:05:35.001+00:00","level":"warn","service":"checkout","http":{"method":"POST","path":"/orders"},"latency_ms":75,"message":"Payment pending"}
Note: Tabs are shown conceptually above; actual output will use tab characters. Newlines/tabs inside fields must be escaped.
This implementation handles both JSON and text logs, performs normalization, and writes TSV to stdout. It avoids non-standard libraries.
import sys
import json
import re
from datetime import datetime, timezone
# Precompiled regexes
TS_PATTERNS = [
("%Y-%m-%d %H:%M:%S,%f", None),
("%Y-%m-%d %H:%M:%S", None),
("%Y-%m-%dT%H:%M:%S.%f%z", "utc"),
("%Y-%m-%dT%H:%M:%S%z", "utc"),
("%Y-%m-%dT%H:%M:%S.%fZ", "z"),
("%Y-%m-%dT%H:%M:%SZ", "z"),
]
LEVEL_RE = re.compile(r'\b(trace|debug|info|warn|warning|error|critical|fatal)\b', re.IGNORECASE)
KV_RE = re.compile(r'(\w[\w\.\-]*)=("([^"\\]|\\.)*"|\S+)')
HEADER = [
"ts_iso","level","service","host","logger","thread","request_id","user_id","client_ip",
"method","path","status","latency_ms","message","extras_json","raw"
]
def normalize_level(lvl):
if not lvl: return ""
l = lvl.upper()
if l == "WARNING": return "WARN"
if l == "CRITICAL": return "ERROR" # adjust to FATAL if preferred
return l
def parse_ts(s):
s = s.strip()
for fmt, mode in TS_PATTERNS:
try:
if "%z" in fmt:
dt = datetime.strptime(s, fmt)
if mode == "utc":
return dt.astimezone(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
elif mode == "z":
# Already Zulu; ensure milliseconds where possible
if dt.microsecond:
return dt.isoformat(timespec="milliseconds").replace("+00:00", "Z")
return dt.replace(tzinfo=timezone.utc).isoformat().replace("+00:00","Z")
else:
dt = datetime.strptime(s, fmt)
# naive timestamp
if "%f" in fmt:
return dt.isoformat(timespec="milliseconds")
else:
return dt.isoformat()
except ValueError:
continue
return "" # failed
def sanitize(v):
if v is None: return ""
s = str(v)
return s.replace("\t","\\t").replace("\n","\\n").replace("\r","\\r")
def unquote(value):
# Remove surrounding quotes and unescape \" and \\ inside
if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
inner = value[1:-1]
inner = inner.replace('\\"', '"').replace('\\\\', '\\')
return inner
return value
def normalize_latency(v):
if v is None: return ""
s = str(v).strip().lower()
try:
if s.endswith("ms"):
return str(float(s[:-2]))
if s.endswith("s"):
return str(float(s[:-1]) * 1000.0)
if s.endswith("us") or s.endswith("µs"):
return str(float(s[:-2]) / 1000.0)
if s.endswith("ns"):
return str(float(s[:-2]) / 1_000_000.0)
# unitless: assume milliseconds
return str(float(s))
except ValueError:
return "" # leave raw in extras_json if needed
def parse_json_line(line):
try:
obj = json.loads(line)
except json.JSONDecodeError:
return None
flat = {}
def flatten(prefix, val):
if isinstance(val, dict):
for k, v in val.items():
nk = f"{prefix}.{k}" if prefix else k
flatten(nk, v)
else:
flat[prefix] = val
flatten("", obj)
row = {k: "" for k in HEADER}
row["raw"] = line.rstrip("\n")
# Timestamp candidates
for key in ("@timestamp","time","timestamp","ts"):
if key in flat:
row["ts_iso"] = parse_ts(str(flat[key]))
if not row["ts_iso"]:
row["extras_json"] = json.dumps({"timestamp_raw": flat[key]}, separators=(",",":"))
break
row["level"] = normalize_level(flat.get("level") or flat.get("log.level") or flat.get("severity"))
row["service"] = str(flat.get("service") or flat.get("app") or flat.get("application") or "")
row["host"] = str(flat.get("host") or flat.get("hostname") or "")
row["logger"] = str(flat.get("logger") or flat.get("log.logger") or "")
row["thread"] = str(flat.get("thread") or flat.get("tid") or "")
row["request_id"] = str(flat.get("request_id") or flat.get("req_id") or flat.get("correlation_id") or flat.get("trace_id") or "")
row["user_id"] = str(flat.get("user_id") or flat.get("user.id") or flat.get("uid") or "")
row["client_ip"] = str(flat.get("client_ip") or flat.get("ip") or flat.get("remote_addr") or flat.get("http.client_ip") or "")
row["method"] = str(flat.get("method") or flat.get("http.method") or flat.get("http.request.method") or "")
row["path"] = str(flat.get("path") or flat.get("url") or flat.get("uri") or flat.get("http.path") or flat.get("http.request.path") or "")
row["status"] = str(flat.get("status") or flat.get("http.status") or flat.get("response.status") or "")
latency = flat.get("latency_ms") or flat.get("latency") or flat.get("http.latency") or None
row["latency_ms"] = normalize_latency(latency)
row["message"] = str(flat.get("message") or flat.get("msg") or flat.get("log") or flat.get("event") or "")
# Build extras from unmapped keys
mapped = set([
"@timestamp","time","timestamp","ts","level","log.level","severity","service","app","application",
"host","hostname","logger","log.logger","thread","tid","request_id","req_id","correlation_id","trace_id",
"user_id","user.id","uid","client_ip","ip","remote_addr","http.client_ip",
"method","http.method","http.request.method","path","url","uri","http.path","http.request.path",
"status","http.status","response.status","latency","latency_ms","http.latency","message","msg","log","event"
])
extras = {k: v for k, v in flat.items() if k not in mapped}
row["extras_json"] = json.dumps(extras, separators=(",",":")) if extras else "{}"
return row
def parse_text_line(line):
row = {k: "" for k in HEADER}
row["raw"] = line.rstrip("\n")
s = line.strip()
# Attempt to find timestamp at start substrings
ts_match = None
# Heuristic: check first ~40 chars for timestamp tokens separated by space or 'T'
head = s[:40]
# Try direct parsing by slicing common lengths
for token_len in (23, 19, 20, 24): # rough candidates
token = head[:token_len]
ts_iso = parse_ts(token)
if ts_iso:
row["ts_iso"] = ts_iso
ts_match = token
break
# Level
level_m = LEVEL_RE.search(s)
if level_m:
row["level"] = normalize_level(level_m.group(0))
# KV pairs
kvs = []
for m in KV_RE.finditer(s):
key = m.group(1)
val = unquote(m.group(2))
kvs.append((key, val))
# Message: substring after level up to first kv pair
msg = s
if level_m:
start = level_m.end()
msg = s[start:].strip()
if kvs:
first_kv_start = s.find(kvs[0][0] + "=")
if first_kv_start != -1:
msg = s[:first_kv_start].strip()
row["message"] = msg
# Map common keys
def get(klist):
for k in klist:
for kk, vv in kvs:
if kk == k:
return vv
return None
row["service"] = get(["service","app","application"]) or ""
row["host"] = get(["host","hostname"]) or ""
row["logger"] = get(["logger","category"]) or ""
row["thread"] = get(["thread","tid"]) or ""
row["request_id"] = get(["request_id","req_id","correlation_id","trace_id","x_request_id"]) or ""
row["user_id"] = get(["user_id","uid","user.id","principal"]) or ""
row["client_ip"] = get(["client_ip","ip","remote_addr"]) or ""
row["method"] = get(["method","http_method"]) or ""
row["path"] = get(["path","url","uri"]) or ""
row["status"] = get(["status","http_status"]) or ""
row["latency_ms"] = normalize_latency(get(["latency","latency_ms","duration","elapsed"]))
# Build extras
mapped_keys = set([
"service","app","application","host","hostname","logger","category","thread","tid",
"request_id","req_id","correlation_id","trace_id","x_request_id",
"user_id","uid","user.id","principal","client_ip","ip","remote_addr",
"method","http_method","path","url","uri","status","http_status","latency","latency_ms","duration","elapsed"
])
extras = {k: v for (k, v) in kvs if k not in mapped_keys}
row["extras_json"] = json.dumps(extras, separators=(",",":")) if extras else "{}"
return row
def emit_tsv_row(row):
fields = [sanitize(row.get(col, "")) for col in HEADER]
sys.stdout.write("\t".join(fields) + "\n")
def main():
# Write header
sys.stdout.write("\t".join(HEADER) + "\n")
for line in sys.stdin:
stripped = line.lstrip()
parsed = None
if stripped.startswith("{"):
parsed = parse_json_line(line)
if parsed is None:
parsed = parse_text_line(line)
emit_tsv_row(parsed)
if __name__ == "__main__":
main()
This logic establishes a deterministic, streaming-safe conversion of diverse logs to TSV suitable for ingestion into data warehouses or lakehouse tables. Adjust field mappings to your domain, and keep extras_json for schema evolution.
以下は、JSONをYAMLへ安全かつ忠実に変換するための実務的な変換ロジックです。目的は「JSONの型・値の意味を保ち、YAMLの解釈による型誤推測を回避する」ことです。YAML 1.2準拠を前提に記述します。
前提
型マッピング規則
文字列の引用判定ロジック(YAML 1.2を前提に安全運用)
出力フォーマット規則
変換アルゴリズム(擬似コード)
Python実装例(ruamel.yamlを使用、YAML 1.2想定)
検証・テスト観点
このロジックにより、JSONの意味を損なわず、YAMLのパーサ差異による型誤解釈を最小化した安全な変換が可能です。
帮助团队快速将“现有数据格式”无缝转换为“目标数据格式”,在几分钟内生成可执行的转换方案与规则清单,配套质量校验与落地步骤,支持多语言输出,显著降低人为错误与返工,提升数据接入、系统迁移与报表统一的交付速度。
统一不同来源的数据格式,制定字段对应关系与转换规则,产出实施步骤与校验清单,并生成多语言交付文档支持上线
将原始报表与日志整理为可分析结构,明确口径与清洗规则,获得可复用的转换方案,提升洞察的准确度与时效
把分散业务数据整合为可追踪指标,形成标准化转换流程与注意事项,与技术团队快速对齐需求并加速交付
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期