Kafka Internals
🚧 This post is a work in progress, but feel free to explore what’s here so far. Stay tuned for more!
14 years
of Apache Kafka! Kafka is the de facto standard for event streaming, just like AWS S3 is for object storage and PostgreSQL is for RDBMS. While every TD&H (SWE) has likely used Kafka, managing a Kafka cluster is a whole other game. The long list of high-importance configurations is a testament to this. In this blog post, the goal is to understand Kafka's internals enough to make sense of its many configurations and highlight best practices.
On a completely different note, the cost and operational complexity of Kafka have led to the emergence of alternatives, making the Kafka API
the de facto standard for event streaming, similar to the S3 API and PG Wire. Some examples include: Confluent Kafka, RedPanda, WrapStream, AutoMQ, AWS MSK, Pulsar, and many more!
1. Event Stream
The core concept of Kafka revolves around streaming events. An event can be anything, typically representing an action or information of what happened such as a button click or a temperature reading.
Each event is modeled as a record
in Kafka with a timestamp
, key
, value
, and optional headers
.
The payload or event data is included in the value
, and the key
is used for:
- imposing the ordering of events/messages,
- co-locating the events that has the same key property,
- and key-based storage, retention or compaction.
In Kafka, the key
and value
are stored as byte arrays, giving flexibility to encode the data in whatever way (serializer). Optionally, using a combination of Schema Registry and Avro serializer is a common practice.
2. Kafka Topics
As for comparison, topics
are like tables in a database. In the context of Kafka, they are used to organize events of the same type, hence the same schema, together. Therefore, the producer specifies which topic to publish to, and the subscriber or consumer specifies which topic(s) to read from. Note: the stream is immutable and append-only.
The immediate question is, how do we distribute data in topics across different nodes in the Kafka cluster? This calls for a way to distribute data within the topic. That's where partitions
come into play.
2.1. Kafka Topic Partitions
A Kafka topic can have one or more partitions
, and a partition can be regarded as the unit of data distribution and also a unit of parallelism
. Partitions of a topic can reside on different nodes of the Kafka cluster. Each partition can be accessed independently, hence you can only have as many consumers as the number of partitions (strongly dictating horizontal scalability of consumers).
Furthermore, each event/record within the partition has a unique ID called the offset
, which is a monotonically increasing number, once an offset number is assigned, it's never reused. The events in the partition are delivered to the consumer in assigned offset order.
2.2. Choosing Number of Partitions
The number of partitions dictates parallelism
and hence the throughput
.
The more the partitions:
- Higher is the
throughput
: both the producer and the broker can process different partitions independently and in parallel, leading to better utilization of resources for expensive operations such ascompression
and other processes. - More partitions mean more consumers in a
consumer group
, leading to higher throughput. Each consumer can consume messages from multiple partitions, but one partition cannot be shared across consumers in the same consumer group.
However, it's important to strike a balance when choosing the number of partitions. More partitions may increase unavailability/downtime periods.
- Quick pre-context (from Section 3.3): A partition has multiple
replicas
, each stored in different brokers, and one replica is assigned as theleader
while the rest arefollowers
. The producer and consumer requests are typically served by the leader broker (of that partition). - When a Kafka broker goes down, the leader of those unavailable partitions is moved to other available replicas to serve client requests. When the number of partitions is high, the latency to elect a new leader adds up.
More partitions mean more RAM is consumed by the clients (especially the producer):
- the producer client creates a buffer per partition (Section 3.1: accumulated by byte size or time). With more partitions, the memory consumption adds up.
- Similarly, the consumer client fetches a batch of records per partition, hence increasing the memory needs (crucial for real-time low-latency consumers).
The idea behind choosing the number of partitions is to measure the maximum throughput that can be achieved on a single partition (for both production and consumption) and choose the number of partitions to accommodate the target throughput
.
The reason for running these benchmarks to determine the number of partitions is that it depends on several factors such as: batching size, compression codec, type of acknowledgment, replication factor, etc. To accommodate for the buffer, choose (1.2 * P)
or higher; It's a common practice to over-partition
by a bit.
The Kafka cluster has a control plane
and a data plane
, where the control plane is responsible for handling all the metadata, and the data plane handles the actual data/events.
3. Kafka Broker (Data Plane)
Diving into the workings of the data plane, there are two types of requests the Kafka broker handles: the put requests from the producer and the get requests from the consumer.
3.1. Producer
The producer requests start with the producer application, sending the request with the key and value. The Kafka producer library determines which partition the messages should be produced to. This is done by using a hash algorithm to assign a partition based on the supplied partition key. Hence, records with the same key always go to the same partition. When a partition key is not assigned, the default mechanism is to use round-robin to choose the next partition.
Sending each record to the broker is not very efficient. The producer library also buffers data for a particular partition in an in-memory data structure (record batches). Data in the buffer is accumulated up to a limit based on the total size of all the records or by time (time
and size
). That is, if enough time has passed or enough data has accumulated, the records are flushed to the corresponding broker.
Lastly, batching allows records to be compressed, as it is better to compress a batch of records than a single record.
3.1.1. Socker Receive Buffer & Network Threads
Network threads
in a Kafka broker are like workers that handle high-level communication between the Kafka server (broker) and the outside world (clients), i,e. handle messages coming into the server (data sent by producers).
*and also send messages back to clients (consumers fetching data)
To avoid network threads being overwhelmed by incoming data, a socket buffer
stands before the network threads that buffers incoming requests.
The network handles each producer/client request throughout the rest of its lifecycle (the same network thread keeps track of the request through the entire process; the request is fully handled and the response is sent). For example, if a producer sends messages to a Kafka topic:
- The network thread receives the request from the producer,
- processes the request
*(write the message to the Kafka commit log & wait for replication). - Once processing is done, the network thread sends a response (acknowledgment that the messages were successfully received).
3.1.2. Request Queue & I/O Threads
Each network thread handles multiple requests from different clients (multiplex) and is meant to be lightweight, where it receives the bytes, forms a producer request, and publishes it to a shared request queue
, immediately handling the next request.
Note: In order to guarantee the order of requests from a client, the network thread handles one request per client at a time; i.e., only after completing a request (with a response), does the network thread take another request from the same client.
The second main pool in Kafka, the I/O threads
, picks requests from the shared request queue
. The I/O threads handle requests from any client, unlike the network threads.
3.1.3. Commit Log
The I/O thread first validates the data (CRC) and appends data to a data structure called the commit log
(by partition).
00000000000000000000.log
00000000000000000000.index
00000000000000000025.log
00000000000000000025.index
...
00000000000000004580.log
00000000000000004580.index
...
The suffix (0, 25 & 4580
) in the segment's file name represents the base offset (i.e., the offset of the first message) of the segment.
The commit log (per partition) is organized on disk
as segments
. Each segment has two main parts: the actual data
and the index
(.log
and .index
), which stores the position inside the log file. By default, the broker acknowledges the produce request only after replicating across other brokers (based on the replication factor
), since Kafka offers high durability via replication.
Note: The new batch of records (producer request) is first written into the OS's page cache
and flushed to disk asynchronously. If the Kafka JVM crashes for any reason, recent messages are still in the page cache but may result in data loss when the machine crashes. Topic replication
solves the problem, meaning data loss is possible only if multiple brokers crash simultaneously.
3.1.4. Purgatory & Response Queue
While waiting for full replication, the I/O thread is not blocked. Instead, the pending produce requests are stashed in the purgatory
, and the I/O Thread is freed up to process the next set of requests.
Once the data of the pending producer request is fully replicated, the request is then moved out of the purgatory
3.1.5. Network Thread & Socket Send Buffer
and then sent to the shared response queue
, which is then picked up by the network thread and sent through the socket send buffer
.
3.2. Consumer
- The consumer client sends the fetch request, specifying the
topic
, thepartition
, and thestart offset
. - Similar to the produce request, the fetch request goes through the
socket receive buffer
>network threads
>shared request queue
. IO threads
now refer to the index structure to find the corresponding file byte range using theoffset index
.- To prevent frequent empty responses when no new data has been ingested, the consumer typically specifies the minimum number of bytes and maximum amount of time for the response.
- The fetch request is pushed to the
purgatory
, for either of the conditions to be met. - When either time or bytes are met, the request is taken out of purgatory and placed in the
response queue
for the network thread, which sends the actual data as a response to the consumer/client.
Kafka uses zero-copy
transfers in the network, meaning there are no intermediate memory copies. Instead, data is transferred directly from disk buffers to the remote socket, making it memory efficient.
However, reading older data, which involves accessing the disk, can block the network thread. This isn't ideal, as the network threads are used by several clients, delaying processing for other clients. The Tiered Storage
Fetch solves this very problem.
3.2.1. Tiered Storage
Tiered storage in Kafka was introduced as an early access feature in 3.6.0 (October 10, 2023).
Tiered storage
is a common storage architecture that uses different classes/layers/tiers of storage to efficiently store and manage data based on access patterns, performance needs, and cost. A typical tier model has frequently accessed data or "hot" data, and less frequently accessed data is moved (not copied) to a lower-cost, lower-performance storage ("warm"). Outside of the tiers, "cold" storage is a common practice for storing backups.
Kafka is designed to ingest large volumes of data. Without tiered storage, a single broker is responsible for hosting an entire replica of a topic partition, adding a limit to how much data can be stored. This isn't much of a concern in real-time applications where older data is not relevant.
But in cases where historical data is necessary, tiered storage allows storing less frequently accessed data in remote storage (not present locally in the broker).
Tiered storage offers several advantages:
Cost
: It's cost-effective as inactive segments of local storage (stored on expensive fast local disks like SSDs) can be moved to remote storage (object stores such as S3), making storage cheaper and virtually unlimited.Elasticity
: Now that storage and compute of brokers are separated and can be scaled independently, it also allows faster cluster operations due to less local data. Without tiered storage, needing more storage essentially meant increasing the number of brokers (which also increases compute).Isolation
: It provides better isolation between real-time consumers and historical data consumers.
Coming back to the fetch request (from consumer) with tiered storage
enabled: If the consumer requests from an offset
, the data is served the same way as before from the page cache
.
The chances of most local data being in the page cache are also higher (due to smaller local data). However, if the data is not present locally and is in the remote store
, the broker will stream the remote data from the object store into an in-memory buffer via the Tiered Fetch Threads
, all the way to the remote socket send buffer
in the network thread.
Hence, the network thread is no longer blocked even when the consumer is accessing historical data. i.e., real-time and historical data access don't impact each other.
3.3. Data Replication
Replication
in the Data Plane is a critical feature of Kafka that offers durability
and high-availability
. Replication is typically enabled and defined at the time of creating the topic.
Each partition of the topic will be replicated across replicas (replication factor
).
One of the replicas is assigned to be the leader
of that partition, and the rest are called followers
. The producer sends the data to the leader, and the followers retrieve the data from the leader for replication. In a similar fashion, the consumer reads from the leader; however, the consumer(s) can also read from the follower(s).
6. References
[1] "Apache Kafka Streams Architecture," Apache Kafka, [Online]. Available: https://kafka.apache.org/39/documentation/streams/architecture.
[2] "Apache Kafka Documentation: Configuration," Apache Kafka, [Online]. Available: https://kafka.apache.org/documentation/#configuration.
[3] J. Rao, "Apache Kafka Architecture and Internals," Confluent, [Online]. Available: https://www.confluent.io/blog/apache-kafka-architecture-and-internals-by-jun-rao/.
Cite this article as: Adesh Nalpet Adimurthy. (Feb 9, 2025). Kafka Internals. PyBlog. https://www.pyblog.xyz/kafka-internals