Kafka-to-HDFS Data Pipeline: Technical Overview
Purpose
Move streaming data from Apache Kafka topics into HDFS for durable storage and downstream batch/interactive analytics. The pipeline ensures reliable ingestion, schema-aware transformation, efficient storage, and query-ready layout.
Architecture
- Producers: Publish events to Kafka topics (partitioned and replicated).
- Stream Processor: Consumes Kafka, validates/transforms records, and writes to HDFS.
- Option A: Spark Structured Streaming (recommended for flexible transformation and strong fault tolerance).
- Option B: Kafka Connect HDFS Sink (recommended for minimal transformation pipelines).
- Storage: HDFS (HA NameNode), optionally integrated with Hive/Impala/Presto via external tables on the HDFS paths.
- Observability: Metrics and logs from Kafka, stream processor, and HDFS; consumer lag monitoring.
Data Model and Schema
- Message format: Avro or JSON. Prefer Avro/Protobuf with a schema registry to enforce compatibility.
- Required fields:
- event_id (string or long) for deduplication.
- event_time (event-time timestamp).
- topic, partition, offset (from Kafka) for lineage.
- Versioning: Use backward-compatible schema evolution. Validate all records; route invalid records to a dead-letter queue (DLQ) topic or quarantine HDFS path.
HDFS Storage Design
- File format: Parquet (columnar, splittable, efficient compression; e.g., Snappy).
- Directory layout:
- Target file size: 128–256 MB to reduce small files. Control via micro-batch size and repartitioning (Spark) or flush/rotate settings (Kafka Connect).
- Replication factor: 3 (adjust per cluster SLA).
- Hive/Metastore: Create external tables partitioned by dt and hour; enable automatic partition discovery.
Reliability and Consistency
- Delivery semantics:
- Spark Structured Streaming: Exactly-once file writes with checkpointing under normal operations. Use deterministic transformations; avoid side effects. Optional deduplication using event_id for at-least-once sources.
- Kafka Connect HDFS Sink: At-least-once delivery. Connector may produce duplicates on failures; mitigate with downstream deduplication keyed by event_id.
- Fault tolerance:
- Checkpointing: Store offsets and state in HDFS checkpointLocation (Spark).
- Recovery: Stream processor restarts from last committed offsets or checkpoints.
- Backpressure and rate control:
- Limit per-trigger offsets (Spark: maxOffsetsPerTrigger).
- Kafka consumer tuning: fetch sizes, max.poll.interval.ms sized for throughput.
Security
- Kafka: TLS for encryption in transit; SASL (GSSAPI/Kerberos, SCRAM) for authentication; ACLs for authorization.
- HDFS: Kerberos-secured cluster; service principals for stream processor; ACLs on output paths.
- Secrets: Centralized secrets management for credentials; avoid embedding in code/config.
Operations and Monitoring
- Kafka consumer lag: Monitor via group coordinator or tools (e.g., Burrow).
- Stream processor metrics:
- Spark: Streaming query progress, state/operator metrics, batch durations, input rows per second.
- Kafka Connect: Task status, sink record rate, failure counts.
- HDFS health: NameNode RPC latency, block reports, DataNode capacity, under-replicated blocks.
- Alerting: Thresholds for lag growth, failed batches/tasks, small-file proliferation, HDFS space.
Retention and Lifecycle
- Kafka topics: Retention sized to accommodate downstream outage windows and replay needs.
- HDFS: Tiered retention; raw zone immutable; curated zone may be compacted or aggregated. Periodic partition compaction to mitigate small files.
Example Implementation A: Spark Structured Streaming (PySpark)
- Characteristics: Flexible transformations, exactly-once file sink with checkpointing, Hive-compatible layout.
from pyspark.sql import SparkSession, functions as F, types as T
spark = (
SparkSession.builder
.appName("KafkaToHDFS")
.getOrCreate()
)
Define the schema for the Kafka message value (Avro/JSON decoded to JSON here for simplicity)
value_schema = T.StructType([
T.StructField("event_id", T.StringType(), False),
T.StructField("event_time", T.TimestampType(), True),
T.StructField("payload", T.StringType(), True)
])
df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "topicA,topicB")
.option("startingOffsets", "latest")
.load()
)
parsed = (
df.select(
F.col("topic").cast("string").alias("topic"),
F.col("timestamp").alias("kafka_ts"),
F.col("value").cast("string").alias("json")
)
.selectExpr("topic", "kafka_ts", "json")
.withColumn("record", F.from_json(F.col("json"), value_schema))
.select("topic", "kafka_ts", "record.*")
.withColumn("event_time", F.coalesce(F.col("event_time"), F.col("kafka_ts")))
.withColumn("dt", F.to_date(F.col("event_time")))
.withColumn("hour", F.date_format(F.col("event_time"), "HH"))
.withWatermark("event_time", "24 hours") # adjust to lateness tolerance
# Optional dedup to mitigate at-least-once behavior upstream
.dropDuplicates(["event_id", "dt", "hour"])
)
query = (
parsed.writeStream
.format("parquet")
.option("path", "hdfs:///data/lake/raw")
.option("checkpointLocation", "hdfs:///chk/kafka_to_hdfs")
.partitionBy("topic", "dt", "hour")
.option("compression", "snappy")
.option("maxRecordsPerFile", "500000") # tune to approach target file size
.outputMode("append")
.trigger(processingTime="1 minute")
.start()
)
query.awaitTermination()
Notes:
- For Avro values, decode via spark-avro or use Confluent’s Avro to JSON conversion before parsing.
- Repartition by dt/hour to control file sizes: parsed.repartition(F.col("topic"), F.col("dt"), F.col("hour")).
- Register external Hive tables on hdfs:///data/lake/raw using partition columns.
Example Implementation B: Kafka Connect HDFS Sink (Connector config)
- Characteristics: Simple deployment, low transformation capability, at-least-once delivery.
{
"name": "hdfs-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "4",
"topics": "topicA,topicB",
"hdfs.url": "hdfs://namenode:8020",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "topic=${topic}/dt=YYYY-MM-dd/hour=HH",
"partition.duration.ms": "3600000",
"flush.size": "100000",
"rotate.interval.ms": "600000",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "event_time",
"hadoop.conf.dir": "/etc/hadoop/conf",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq.hdfs"
}
}
Notes:
- Configure key/value converters (e.g., AvroConverter with Schema Registry) at the Connect worker level or in the connector config if needed.
- The connector writes temporary files and renames on rotation; duplicates can occur on failures. Plan deduplication downstream if required.
- Use Kerberos-enabled Hadoop configs in hadoop.conf.dir for secure clusters.
Testing and Validation
- Functional: Publish test messages; verify HDFS files, partition layout, and schema consistency.
- Fault tolerance: Kill/restart stream processor; validate no missing data and acceptable duplicate behavior.
- Performance: Load test for throughput and end-to-end latency; tune batch duration, partitions, and file sizing.
- Data quality: Validate schema evolution, null handling, and DLQ routing.
Change Management
- Schema changes: Validate in lower environments; enforce compatibility in the registry.
- Topic changes: Update subscriptions and downstream table definitions; maintain backward-compatible paths.
This design provides a robust, secure, and scalable path from Kafka to HDFS with clear trade-offs between Spark and Kafka Connect. Choose the implementation aligned with transformation needs, operational model, and delivery semantics requirements.