ExamplesKafka Production Deep Dive

Kafka Production Deep Dive

This example models the internals of a production Kafka cluster: three brokers with partition leadership, ZooKeeper (or KRaft controller), producer acknowledgment modes, and consumer group rebalancing.


System overview

The diagram covers two levels simultaneously:

Cluster level: Three Kafka brokers, a KRaft controller, Schema Registry, and Kafka Connect. Producers and consumers connect to the broker that is the leader for their target partition.

Application level: Three producer services and two consumer groups with different acks configurations. One consumer group is a Flink streaming job; the other is a batch Spark job.


Key Kafka topology rules

⚠️

Producers and consumers always dial the broker — the broker never dials them.

  • Producers: connect('producer', 'broker') — producer opens the TCP connection to send records
  • Consumers: connect('consumer', 'broker') — consumer opens the connection to fetch records
  • Replication: connect('leader', 'follower') — the leader pushes FetchRequest responses to followers… actually followers dial the leader for replication: connect('follower', 'leader')

Replication direction: Follower replicas dial the partition leader to fetch new records. Use connect('follower', 'leader').

Replication edge direction

This is a common source of confusion. In Kafka, follower replicas initiate the fetch — they open a connection to the leader and issue FetchRequest messages. The leader does not push to followers.

// Correct: follower dials leader
topology.connect('broker-2', 'broker-1', { protocol: 'tcp', label: 'replicate p0' });
topology.connect('broker-3', 'broker-1', { protocol: 'tcp', label: 'replicate p0' });
 
// Wrong: leader does not initiate connections to followers
// topology.connect('broker-1', 'broker-2', ...);  // Don't do this for replication

Topology sketch

kafka-deep-dive.ts
const topology = new TopologyBuilder(true);
 
// --- KRaft Controller ---
const controllerGroup = topology.createGroup('controller', { label: 'KRaft Controller' });
controllerGroup
  .addProcess({ id: 'controller', label: 'KRaft Controller', state: 'running' })
  .autoResize();
 
// --- Kafka Brokers ---
const brokerGroup = topology.createGroup('brokers', { label: 'Kafka Cluster (3 Brokers)' });
brokerGroup
  .addProcess({ id: 'broker-1', label: 'Broker 1 (Leader p0,p1)', state: 'running', shardId: 'p0,p1', shardRole: 'primary' })
  .addProcess({ id: 'broker-2', label: 'Broker 2 (Leader p2)',    state: 'running', shardId: 'p2',    shardRole: 'primary' })
  .addProcess({ id: 'broker-3', label: 'Broker 3 (Follower)',     state: 'running', shardRole: 'replica' })
  .autoResize();
 
// --- Schema Registry ---
const schemaGroup = topology.createGroup('schema', { label: 'Schema Registry' });
schemaGroup
  .addProcess({ id: 'schema-registry', label: 'Schema Registry', state: 'running' })
  .autoResize();
 
// --- Producers ---
const producerGroup = topology.createGroup('producers', { label: 'Producers' });
producerGroup
  .addProcess({ id: 'order-producer',  label: 'Order Service',   state: 'running' })
  .addProcess({ id: 'event-producer',  label: 'Analytics SDK',   state: 'running' })
  .addProcess({ id: 'cdc-producer',    label: 'Debezium (CDC)',   state: 'running' })
  .autoResize();
 
// --- Consumer Groups ---
const flink = topology.createGroup('flink-cg', { label: 'Consumer Group: flink-stream' });
flink
  .addProcess({ id: 'flink-1', label: 'Flink TaskManager 1', state: 'running' })
  .addProcess({ id: 'flink-2', label: 'Flink TaskManager 2', state: 'running' })
  .autoResize();
 
const spark = topology.createGroup('spark-cg', { label: 'Consumer Group: spark-batch' });
spark
  .addProcess({ id: 'spark', label: 'Spark Executor', state: 'running' })
  .autoResize();
 
// --- Connections ---
 
// Controller manages brokers
topology.connect('controller', 'broker-1', { protocol: 'tcp', label: 'metadata' });
topology.connect('controller', 'broker-2', { protocol: 'tcp', label: 'metadata' });
topology.connect('controller', 'broker-3', { protocol: 'tcp', label: 'metadata' });
 
// Follower replication (follower dials leader)
topology.connect('broker-2', 'broker-1', { protocol: 'tcp', label: 'replicate', type: 'pulse' });
topology.connect('broker-3', 'broker-1', { protocol: 'tcp', label: 'replicate', type: 'pulse' });
 
// Producers connect to brokers
topology.connect('order-producer', 'broker-1', { protocol: 'kafka', label: 'produce acks=all', type: 'pulse' });
topology.connect('event-producer', 'broker-2', { protocol: 'kafka', label: 'produce acks=1',   type: 'pulse' });
topology.connect('cdc-producer',   'broker-1', { protocol: 'kafka', label: 'produce acks=all', type: 'pulse' });
 
// Schema registry validation
topology.connect('order-producer', 'schema-registry', { protocol: 'http', label: 'validate schema' });
 
// Consumers connect to brokers (consumers dial the broker)
topology.connect('flink-1', 'broker-1', { protocol: 'kafka', label: 'fetch p0', type: 'pulse' });
topology.connect('flink-2', 'broker-2', { protocol: 'kafka', label: 'fetch p2', type: 'pulse' });
topology.connect('spark',   'broker-1', { protocol: 'kafka', label: 'batch fetch', type: 'pulse' });
 
await topology.apply();

Animation scenarios

Producer acks=all — Full Acknowledgment Chain

Shows a producer with acks=all waiting for the leader to replicate to all in-sync replicas (ISR) before acknowledging. The packet travels: producer → leader → follower-1 → follower-2 → leader acknowledges → producer.

Consumer Group Rebalancing

Illustrates a consumer group rebalance triggered by a new Flink TaskManager joining: the Group Coordinator on the broker issues a JoinGroup / SyncGroup sequence, partitions are reassigned, and consumers resume fetching from their new assigned partitions.

Schema Registry — Evolution Check

A producer sends a new Avro schema version to Schema Registry before publishing. Schema Registry checks compatibility (BACKWARD mode). On success, the producer receives the schema ID and includes it in the record header before sending to the broker.

Broker Failure — Leader Election

Broker 1 (partition 0 leader) crashes. The KRaft controller detects the absence of heartbeats, promotes Broker 2 as the new leader for partition 0, and updates cluster metadata. Producers and consumers reconnect to the new leader after a metadata refresh.


Protocol notes

Kafka uses a custom binary protocol over TCP. In cloud-arch, use protocol: 'kafka' and type: 'pulse' for all Kafka connections to give them the characteristic pulsing visual. Use protocol: 'tcp' for internal broker-to-broker replication and controller-to-broker metadata connections.