以下为“日志源A”数据摄取失败的标准化错误处理流程,适用于流式或批式摄取场景(如 Filebeat/Fluent Bit → Kafka/Kinesis → 流处理/Flink/Spark → 数据湖/数仓)。
一、目标与范围
- 目标:在不影响整体吞吐的前提下,稳定识别、隔离与恢复摄取失败,保证可追踪、可重放、数据一致性与最小数据丢失。
- 范围:源端采集、传输链路、解析/转换、目标写入、状态管理(偏移/检查点)、告警与回放。
二、参考架构(抽象)
- 源端代理:Agent/Collector(如 Fluent Bit)将日志推送至消息队列。
- 消息队列:Kafka/Kinesis,承接背压与重放。
- 摄取作业:Flink/Spark/自研消费者,完成解析、校验、清洗、入湖/入仓。
- 存储:对象存储(S3/OSS/HDFS)或数据仓库(Iceberg/Hudi/Delta、ClickHouse、BigQuery 等)。
- 附属组件:Schema Registry、DLQ(死信队列)、监控告警、审计与重放工具。
三、错误类型与判定
- 源端采集失败:网络不可达、鉴权失败、证书过期、源接口限流/429、Agent 崩溃或本地缓冲写满、时间漂移。
- 传输链路失败:Broker 不可用、分区 Leader 选举中、连接超时、吞吐不足导致高滞后。
- 解析/模式失败:编码不符(UTF-8/GBK)、压缩格式不匹配、消息超限、必填字段缺失、Schema 变更不兼容。
- 转换/质量失败:业务规则校验失败、类型转换异常、PII 脱敏缺失。
- 目标写入失败:对象存储/仓库不可用、权限不足、表/分区不存在、Schema 不兼容、事务提交失败。
- 状态/检查点失败:偏移提交失败、Flink Checkpoint 失败、幂等键冲突。
判定原则:
- 瞬时错误(网络抖动、限流、短时不可用)→ 可重试。
- 永久错误(鉴权错误、格式不可解析、Schema 永久不兼容)→ 直接送入 DLQ/隔离区。
- 模糊错误初判为瞬时,但在阈值内连续失败升级为永久错误策略。
四、处理流程(文字版流程图)
- 采集器/消费者接收消息 → 基础校验(大小、编码、必填字段、Schema ID)。
- 校验失败 → 标记为永久错误 → 写入 DLQ(携带源偏移、分区、错误码、首次出现时间、尝试次数、原始载荷指针)。
- 校验通过 → 执行解析/转换。
- 转换失败:
- 可修复(缺维、外部维表短暂不可用)→ 有界重试(指数退避+抖动)→ 失败后入 DLQ。
- 不可修复(字段类型根本不兼容)→ 直接入 DLQ。
- 目标写入:
- 使用幂等/事务写入;失败则按瞬时/永久分类重试。
- 成功后提交偏移/检查点;失败则不提交偏移,避免丢数。
- 对连续失败的源分区/租户开启断路器,降速或暂停消费,防雪崩。
- 观测系统实时汇总成功率、延迟、滞后、DLQ 增长,触发告警与自动化修复。
- 故障解除后,基于偏移/检查点或 DLQ 进行选择性回放,确保整体一致性。
五、关键策略与配置
- 重试与退避
- 指数退避:base_delay × 2^attempt,叠加抖动(±20%)。
- 最大尝试次数:瞬时错误 3–7 次;分层(解析<写入<外部依赖)。
- 断路器:窗口内失败率>阈值(如 50%/5 分钟)则熔断,冷却期后半开探测。
- 限速与背压:基于消费者滞后、sink QPS、失败率动态调节并发/批量。
- DLQ/隔离区
- 形态:DLQ Topic(Kafka)或隔离存储桶(原始文件+错误元数据)。
- 记录项:source_id、topic/partition/offset、event_time、ingest_time、schema_id、error_code、error_detail_hash、retriable、attempt_count、payload_ptr(或原文)、PII 标识。
- 保留策略:与合规匹配(如 7–30 天);加密、访问控制。
- 重放工具:支持按时间/错误类型/租户范围回放到主 Topic 或旁路修复管道。
- 幂等与事务
- 事件键:event_id(源唯一键或 hash),在 sink 端做去重/幂等写(MERGE/UPSERT)。
- Kafka 侧:启用 idempotent producer、必要时使用事务性写入(producer tx + sink 事务/两阶段提交),或在对象存储采用临时区→清单文件→原子提交。
- Flink:启用 Exactly-Once(Checkpoint + TwoPhaseCommit Sink);Spark 结构化流使用幂等输出与 checkpoint 目录。
- Schema 与数据质量
- Schema Registry 强制兼容策略(如 backward/forward);不兼容变更触发阻断与告警。
- DQ 规则:必填/枚举/范围/时间戳新鲜度;失败送 DQ-DLQ 并标注规则 ID。
- 部分容忍:可配置字段降级(如缺少非关键字段时默认值+标记)。
- 观测与告警
- 指标:
- ingest_success_rate、error_rate_by_type、DLQ_rate、DLQ_backlog
- consumer_lag、end-to-end_latency、throughput、checkpoint_duration/failure
- sink_write_failures、schema_incompat_count
- SLO 示例:5 分钟窗口成功率≥99%,端到端延迟P95≤2 分钟,DLQ 比例≤0.5%。
- 告警:零摄取 X 分钟、失败率激增、DLQ 突增、滞后>阈值、连续 checkpoint 失败、Schema 不兼容事件。
- 自动化修复
- 鉴权失败:自动刷新 Token/轮转密钥,失败升级为人工。
- 限流/429:读取 Retry-After,动态降速。
- Schema 漂移:自动注册兼容字段(仅允许向后兼容),不兼容变更阻断并创建变更请求。
- 目标不可用:切换到暂存区(staging)+延迟提交。
- 时钟漂移:触发 NTP 校正;时间窗口内矫正 event_time。
- 安全与合规
- DLQ 和原始日志加密、细粒度访问控制、PII 脱敏/标记。
- 审计表记录每次丢弃/隔离/重放决策与操作者。
六、运行手册(应急步骤)
- 步骤 1:确认范围
- 查看 Dashboard:error_rate、DLQ_backlog、consumer_lag、sink 可用性。
- 定位失败集中在采集、解析、写入还是检查点。
- 步骤 2:分类
- 瞬时 vs 永久:依据错误码/重试历史/断路器状态。
- 步骤 3:处置
- 瞬时:增加退避、降低并发、确认下游可用性;必要时启用暂存区。
- 永久:保持主链路运行,将问题记录入 DLQ;修复后从 DLQ 或偏移回放。
- 步骤 4:修复
- 鉴权:更新凭据;源端连通性检查;证书/CA 更新。
- Schema:更新注册与转换逻辑;回放前在影子环境验证。
- 目标写入:恢复写入权限/配额/表结构;验证幂等键。
- 步骤 5:回放与验证
- 从 DLQ/偏移区间回放;监控重复写入与延迟;对账审计(投入/产出条数、哈希抽样)。
- 步骤 6:复盘
- 根因分析(RCA)、SLO 影响、是否需要阈值/退避/分区/限速策略调整。
七、最小实现示例(Kafka 消费者,带重试与 DLQ,伪代码)
- 假设:
- 主 Topic:logs.sourceA
- DLQ Topic:dlq.sourceA
- 永久错误:Schema 不兼容/必填缺失/无法解析
- 瞬时错误:Broker/Sink 超时、限流
示例(Python 伪代码):
def handle_message(msg):
payload = msg.value()
meta = {"topic": msg.topic(), "partition": msg.partition(), "offset": msg.offset()}
try:
record = parse_and_validate(payload) # 编码/Schema/必填校验
except PermanentError as e:
send_to_dlq(payload, meta, error_code="PARSE_PERM", detail=str(e))
return "DLQ"
except TransientError as e:
raise e
attempt = 0
while True:
try:
upsert_to_sink(record) # 幂等写,使用 event_id 去重
return "OK"
except TransientError as e:
attempt += 1
if attempt > MAX_RETRIES:
send_to_dlq(payload, meta, error_code="SINK_RETRY_EXCEEDED", detail=str(e), attempts=attempt)
return "DLQ"
sleep(backoff_with_jitter(attempt))
except PermanentError as e:
send_to_dlq(payload, meta, error_code="SINK_PERM", detail=str(e))
return "DLQ"
for msg in consumer:
result = handle_message(msg)
if result in ("OK", "DLQ"):
consumer.commit(msg) # 仅在成功写入或已入 DLQ 后提交偏移,避免丢数
- 关键点:
- 不在“写入未成功”时提交偏移。
- 区分永久/瞬时错误,DLQ 记全量上下文。
- 幂等写入保障回放安全。
- backoff_with_jitter 实现指数退避+抖动。
八、元数据与审计
- 错误审计表(示例字段):event_id、source_id、topic/partition/offset、error_type、error_code、attempt_count、first_seen_ts、last_seen_ts、operator/auto_action、replay_job_id、final_status。
- 数据血缘:记录从源到目标的 run_id、schema_id、变更版本,便于追溯与合规。
九、验收标准
- 失败被正确分类并隔离,主链路无阻塞。
- 可观测:关键 SLI 实时可见,告警不漏报。
- 可恢复:DLQ 回放闭环可验证,重复写可控(幂等)。
- 合规安全:敏感数据在 DLQ 亦受控与可审计。
以上流程可直接落地到现有 Kafka/Flink/Airflow 等栈;对其他队列或云原生服务(Kinesis、Pub/Sub、BigQuery、S3 等)仅需替换对应的客户端与幂等/事务机制实现。