Apache Flink Internals
Apache Flink's official documentation and blog posts are comprehensive, but tend to be fragmented across different examples and cover the breadth of different components in isolation. The approach taken here is to follow a single reference Flink job end-to-end, through every component and moving part it touches, keeping the discussion grounded in the example, rather than attempting broad coverage of Flink's full capabilities. The tradeoff is intentional: depth over breadth.
1. Components
A running Flink system has two sides: the user-facing side and the system side.
The user-facing side is the Client, where the application code lives. This includes the DataStream API calls, job configuration, and JAR packaging. The Client's job is to compile that code into a graph representation and submit it to the cluster.
The system side consists of the JobManager and TaskManagers. The JobManager receives the submitted job, plans its execution, and coordinates the entire lifecycle: scheduling, checkpointing, failure recovery. TaskManagers are the workers that receive individual tasks from the JobManager and run the actual data processing.
The journey from user code to running tasks involves a series of graph transformations, each adding the detail the runtime needs to distribute and execute the job across the cluster.
2. Code to Execution
Consider a simple streaming job: read from a source, apply a map transformation, group by key, aggregate in a window, and write to a sink.
Code does not execute anything until env.execute() is called. Between that call and actual task execution, Flink builds a series of progressively more detailed graphs.
2.1. Transformations
Each API call (fromSource, map, keyBy, window, apply, sinkTo) creates a Transformation object and appends it to a list inside the StreamExecutionEnvironment. Each Transformation holds a reference to its input, its output type, its parallelism, and the operator logic.
Because each one points back to its input(s), they implicitly form a DAG.
Relevant Packages and Classes
In streaming/api/transformations/
Transformation,
OneInputTransformation,
SourceTransformation,
PartitionTransformation,
SinkTransformation
2.2. Logical Topology
When env.execute() fires, StreamGraphGenerator walks the Transformation list and produces a StreamGraph, a DAG of StreamNode(s) connected by StreamEdge(s).
Each physical Transformation (Source, Map, Window/Apply, Sink) becomes a StreamNode. Each StreamNode holds its operator factory, parallelism, and serializers. Connections between nodes become StreamEdges, each carrying a StreamPartitioner that defines how data flows between operators.
Non-physical Transformations like PartitionTransformation (created by keyBy) don't produce their own node. Instead, they attach partitioning information to the downstream edge. These are handled as virtual nodes during generation.
The resulting StreamGraph is a direct representation of the job logic. No optimization has happened yet.
Relevant Packages and Classes
In streaming/api/graph/
StreamGraphGenerator,
StreamGraph,
StreamNode,
StreamEdge
In streaming/runtime/partitioner/
StreamPartitioner,
ForwardPartitioner,
KeyGroupStreamPartitioner,
etc.
2.3. Operator Chaining
The StreamGraph is compiled into a JobGraph by StreamingJobGraphGenerator. The key optimization here is operator chaining: operators that meet certain conditions are fused into a single JobVertex.
Source and Map chain together (same parallelism, forward edge). The keyBy between Map and Window introduces a hash partitioner, a shuffle boundary, so those two cannot chain. Window and Sink also cannot chain because their parallelism differs (2 vs 1). That gives three JobVertices.
4 operators → 3 JobVertices. Chaining reduces the number of network exchanges and avoids unnecessary serialization within a chain.
Relevant Packages and Classes
In streaming/api/graph/
StreamingJobGraphGenerator
In runtime/jobgraph/
JobGraph,
JobVertex,
JobEdge
2.4. Physical Topology
The physical topology describes how it actually runs, in parallel, distributed across machines.
Each operator runs at some parallelism, the number of parallel instances (subtasks) that execute it. At parallelism N, the operator's data stream is divided into N stream partitions.
Using the same example:
Each subtask produces a stream partition, an independent slice of the data. Between operators, data either flows forward or gets redistributed:
Forward: data stays local, 1:1 from upstream partition to downstream partition. No serialization, no network. [Source → Map] uses this because both run at the same parallelism and no repartitioning is needed.
Redistribution (shuffle): data crosses the network. Every upstream partition can send to every downstream partition. Records get serialized, sent over TCP, deserialized.
keyBytriggers this, records are hashed by key so that all records for a given key land on the same downstream subtask. [Map → Window] in the diagram above is a hash shuffle.
Where these shuffle boundaries land is one of the most important performance factors in a Flink job. Forward connections are cheap. Shuffles are expensive.
Relevant Packages and Classes
In streaming/runtime/partitioner/
StreamPartitioner,
ForwardPartitioner,
KeyGroupStreamPartitioner,
RebalancePartitioner,
BroadcastPartitioner
In streaming/api/graph/
StreamEdge
2.5. Execution Plan
The JobGraph is submitted to the JobManager. The JobMaster takes each JobVertex and expands it by parallelism to produce the ExecutionGraph.
Each JobVertex becomes an ExecutionJobVertex. Each parallel instance becomes an ExecutionVertex. Each ExecutionVertex tracks its current Execution attempt. If a subtask fails and needs to restart, a new Execution is created for the same ExecutionVertex.
The ExecutionGraph is the structure the JobMaster uses for scheduling, tracking task state, coordinating checkpoints, and handling failures.
Each ExecutionVertex is deployed to a TaskManager as a Task. A Task is the actual runtime entity: a dedicated thread that runs the OperatorChain, reads from InputGates, processes records through the chained operators, and writes to ResultPartition(s).
Relevant Packages and Classes
In runtime/executiongraph/
DefaultExecutionGraph,
ExecutionJobVertex,
ExecutionVertex,
Execution.
3. State
Operators can be stateless or stateful. From the above example, the map transforms the record, has no state. On the other hand, the window operation collect records until a trigger fires uses state.
Flink state is fault tolerant (through checkpoints) and rescalable (by redistributing it when parallelism changes). Without which, every operator would have to manage its own storage and recovery.
3.1. State Backend
Going back to the example, keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(10))), the window operator collecting events for 10 seconds needs to store that data somewhere until the window fires. Each parallel subtask of a stateful operator maintains its own local state storage. This storage is embedded within the TaskManager process, so state access is fast and does not require any network calls.
The storage engine behind this is called the State Backend. Flink provides two production-ready options:
HashMapStateBackend: State lives as Java objects on the JVM heap. Fast access since there is no serialization overhead, but limited by available memory.EmbeddedRocksDBStateBackend: State is serialized and stored in an embedded RocksDB instance on local disk. Slower per access (every read/write goes through serialization), but can hold state much larger than memory, bounded only by disk space.
The tradeoff is speed vs. capacity. For small to moderate state, heap is faster. For large state (GBs to TBs), RocksDB is the only viable option.
Because each subtask has its own local state backend instance, state scales naturally with parallelism. Two parallel subtasks of the window operator means two independent state stores, each holding only the data for its own subset of keys.
There is also an third option (gaining popularity), ForStStateBackend, built on ForSt (a fork of RocksDB). It stores SST files on remote storage (S3, HDFS) instead of local disk (outside of local cache), allowing state to exceed local disk capacity entirely. Designed for disaggregated, cloud native setups and supports asynchronous state access.
Note: ForStStateBackend does not support canonical savepoint, full snapshot, changelog and file-merging checkpoints
Relevant Packages and Classes
In flink-runtime/, flink-statebackend-rocksdb/, flink-statebackend-forst/
StateBackend,
HashMapStateBackend,
EmbeddedRocksDBStateBackend,
ForStStateBackend
3.2. State Primitives
The state backends described above are the storage engines. What gets stored in them broadly falls into two categories.
3.2.1. Keyed State
Keyed State is partitioned by key. In the example job, the keyBy(...) before the window means each window subtask only processes events for its assigned keys. The window operator internally uses keyed state to buffer incoming events until the window fires. In `MyJob` that buffer is a ListState scoped to each key, stored in whichever state backend is configured.
Beyond the internal use by windows, Flink exposes keyed state primitives for custom operators:
ValueState<T>: A single value per key.ListState<T>: A list of values per key.MapState<K, V>: A key-value map per key.ReducingState<T>/AggregatingState<IN, OUT>: Applies a reduce or aggregate on each addition, storing only the accumulated result.
3.2.2. Operator State
Operator State is per subtask, not tied to keys. Each parallel instance holds its own independent state. The typical use case is a source connector tracking partition assignments and offsets.
Both categories are managed by Flink: included in checkpoints, restored on failure, redistributed on rescale. Keyed state is redistributed through Key Groups, the atomic unit of state redistribution. The total number of Key Groups is fixed at the configured maximum parallelism. Each subtask is assigned a range of Key Groups, and when parallelism changes, those ranges are simply reassigned across the new set of subtasks.
3.3. Snapshots and Checkpointing
State stored locally in each subtask solves the access problem, but not the durability problem. If a TaskManager crashes, that local state is gone. Flink needs a way to periodically capture a consistent snapshot of the entire job's state so it can recover from failures.
This mechanism is called checkpointing, and it is based on the Chandy-Lamport algorithm for distributed snapshots, adapted for Flink's dataflow model.
3.3.1. Checkpoint Barriers
The process works as follows:
The
CheckpointCoordinator(running inside theJobManager) periodically initiates a checkpoint by sending a trigger to all source operators.Each source, records its current position (e.g., Kafka partition offsets) and injects a special marker called a checkpoint barrier into the data stream. The barrier is not a separate signal; it flows with the records, in order, through the DAG.
When an operator receives a barrier, it snapshots its local state and forwards the barrier downstream. The state snapshot is written to durable storage (typically a distributed file system like HDFS or S3).
When all sink(s) have received the barrier and acknowledged it back to the
CheckpointCoordinator, the checkpoint is considered complete.
The result is a consistent global snapshot: source offsets plus the state of every operator, all corresponding to the same logical point in the stream. No records are lost, no records are counted twice.
A key detail: barriers never overtake records. They flow strictly in line. This is what ensures the snapshot captures exactly the state that results from processing all records before the barrier and none of the records after it.
Relevant Packages and Classes
In runtime/checkpoint/, runtime/io/network/api/
CheckpointCoordinator,
CheckpointBarrier
In streaming/api/checkpoint/, streaming/runtime/tasks/
CheckpointedFunction,
SubtaskCheckpointCoordinator
3.3.2. Aligned Checkpoint
For operators with multiple inputs (like after a shuffle), the barrier must arrive from all input channels before the snapshot is taken. This is called barrier alignment, and it ensures that no pre-checkpoint and post-checkpoint data gets mixed. This alignment can briefly pause processing on the faster channels, which is a tradeoff explored further in unaligned checkpoints.
While aligned checkpoint (default) guarantees a clean cut: the snapshot contains exactly the state that results from all records before the barrier and none after, the pausing can cause backpressure. If one channel is significantly faster than another, the fast channel's data backs up, stalling upstream operators.
Relevant Packages and Classes
In streaming/runtime/io/checkpointing/
SingleCheckpointBarrierHandler,
AbstractAlignedBarrierHandlerState,
AlternatingCollectingBarriersUnaligned
3.3.3. Unaligned Checkpoint
Instead of pausing, the operator reacts to the first barrier it sees from any channel. It immediately forwards the barrier downstream and continues processing all channels. The records that are already in the input/output buffers (in-flight data between the two barriers) are stored as part of the checkpoint state.
The result: checkpoint duration becomes independent of throughput and alignment time. Barriers travel through the DAG as fast as possible. The tradeoff is larger checkpoint sizes (in-flight data is included) and more I/O.
Note, Unaligned checkpoints:
require exactly-once mode and only one concurrent checkpoint is allowed with unaligned mode. So they will take slightly longer.
break with an implicit guarantee in respect to watermarks during recovery. On recovery, Flink generates watermarks after it restores in-flight data, which means pipelines that apply the latest watermark on each record may produce different results than with aligned checkpoints.
Flink also supports a hybrid approach. Checkpoints start aligned, but if alignment takes longer than a configured timeout (execution.checkpointing.aligned-checkpoint-timeout), the operator switches to unaligned mid-checkpoint. This gets the benefits of aligned checkpoints under normal conditions while avoiding the stalling problem under backpressure.
3.3.4. Incremental Checkpoints
Full checkpoints upload the entire state every time. For an operator holding 10 GB of state where only 200 MB changed, uploading the full 10 GB is wasteful.
Incremental checkpoints exploit how RocksDB stores data. Writes go into an in-memory MemTable. When full, it flushes to disk as an immutable SST file (Sorted String Table). A background compaction process merges smaller SST files into larger ones, discarding duplicates. The key property: SST files are never modified after creation, only created (by flush) or deleted (by compaction).
Going back to the example job, the Window operator [2] buffers events in RocksDB until the 10 second window fires. With incremental checkpoints enabled and 2 retained checkpoints:
Flink tracks which SST files are new or deleted between checkpoints and only uploads the delta.
The shared state registry tracks how many active checkpoints reference each file. When a checkpoint is pruned (retained count exceeded), Flink decrements the reference counts. Files that drop to 0 are deleted from storage.
The result: instead of uploading the full state each time, only new SST files are uploaded. The tradeoff is that recovery may need to reconstruct state from multiple incremental deltas, potentially making restores slower than with full checkpoints.
3.3.5. Savepoint
Savepoints use the same mechanism as checkpoints (barriers, state snapshots, source offsets) but are triggered manually by the user, not by the periodic scheduler.
The key differences:
Always aligned: unaligned mode does not apply to savepoints.
Do not expire: checkpoints are automatically cleaned up when newer ones complete. Savepoints persist until explicitly deleted. Triggered on demand: via CLI (flink savepoint
jobID) or REST API, not on a timer.Portable format: savepoints can be created in canonical format, a standardized representation that is compatible across state backends. A job checkpointed with
HashMapStateBackendcan be restored onEmbeddedRocksDBStateBackendfrom a canonical savepoint. Native format (default and preferred) is faster to create and restore but is tied to the specific state backend and does not support cross-backend restoration.
Savepoints are used for planned operations: upgrading application code, changing parallelism, migrating to a different cluster, or switching state backends. The workflow is: take a savepoint, stop the job, make changes, restart from the savepoint.
In the example job, if the parallelism of the Window operator needs to change from 2 to 4, a savepoint captures the current state (including Key Group assignments). On restart with the new parallelism, Flink redistributes the Key Groups across the 4 new subtasks and restores the state accordingly.
3.4. Recovery
When a failure occurs (TaskManager crash, network fault, user code exception, etc.), Flink stops the affected pipeline region (which for a single-region streaming job like this example, means the entire job) and rolls back to the latest completed checkpoint.
The recovery process:
The
JobManagerselects the most recent successfully completed checkpoint (all sinks acknowledged, all state stored durably).All operators are redeployed across available
TaskManagers.Each operator's state is restored from the checkpoint storage (Remote File System, S3/HDFS). The window operator gets back its buffered events, aggregation operators get back their partial results.
Source operators rewind to the offsets recorded in the checkpoint. For Kafka, this means resetting the consumer to the checkpointed partition offsets.
Processing resumes from that point. Every record after the checkpoint offset is reprocessed, but since the state has been rolled back to match, the end result is as if the failure never happened.
This is what gives Flink exactly-once processing semantics. Records between the checkpoint and the failure are reprocessed, but the state they are applied to has been rolled back to before those records were processed the first time. No double counting.
However, the source must support replay (rewinding to a previous position). Kafka, Kinesis, filesystem, etc., sources all support replay. If a source cannot rewind, exactly-once guarantees cannot be met.
4. Time
There are three notions of time:
Event Time (most common): The timestamp embedded in the event itself, representing when the event actually occurred. A sensor reading generated at
14:00:03carries that timestamp regardless of when Flink processes it.Processing Time: The wall clock of the machine running the operator at the moment it processes the event. Simple and fast, but non-deterministic. The same data replayed at a different speed produces different results.
Ingestion Time (least common/discouraged): The timestamp assigned when the event enters Flink. More stable than processing time, but still does not reflect actual event occurrence.
In the example job, TumblingEventTimeWindows.of(Time.seconds(10)) uses event time. The window boundaries are determined by the timestamps in the data, not by when the records happen to arrive. This makes the results deterministic and reproducible.
4.1. Disorder Problem
Processing time is always monotonically increasing, the wall clock only moves forward. Event time has no such guarantee. In distributed systems, events produced in order can arrive at Flink out of order due to network delays, partitioning, or upstream buffering.
A window covering t=1 to t=5 cannot simply close when it sees t=6, because t=4 or t=5 might still be in transit. The system needs a way to know when it is safe to fire the window.
4.2. Watermarks
Watermarks are Flink's solution to the disorder problem. A watermark is a special marker that flows through the data stream carrying a timestamp t. It declares: "no more events with a timestamp ≤ t will arrive."
When the window operator receives a watermark that passes the window's end time, it knows the window is complete and fires it. Until that watermark arrives, the window holds its state.
Watermarks flow inline with the data, just like checkpoint barriers. At operators with multiple inputs (after a shuffle), the effective watermark is the minimum across all input channels. The stream can only be as far along in event time as its slowest input.
The gap between the actual event time and the watermark is called the bounded out of orderness. A larger gap tolerates more disorder but increases latency (windows fire later) and state lifetime (buffered data is held longer).
4.3. Timers
Operators can register timers for a future point in event time or processing time. When the watermark (for event time) or the wall clock (for processing time) reaches the registered timestamp, the timer fires and triggers a callback.
Windows use timers internally. When a new window is created, the window operator registers an event time timer for the window's end time. When the watermark passes that time, the timer fires and the window emits its result.
Custom operators using ProcessFunction can register their own timers for use cases like session timeouts, delayed cleanup of expired state, or triggering periodic aggregations.
Relevant Packages and Classes
In flink-core/api/common/eventtime/
WatermarkStrategy,
WatermarkGenerator
In streaming/runtime/operators/, streaming/api/operators/
TimestampsAndWatermarksOperator,
InternalTimerService
5. Runtime
A running Flink cluster consists of two types of JVM processes: one JobManager and one or more TaskManagers.
5.1. Job Manager
The JobManager is the control plane. It contains three RPC endpoints running in the same JVM. TaskManagers are the data plane: worker processes that execute tasks. Communication between them splits into two layers: Pekko (formerly Akka) for control messages (scheduling, heartbeats, checkpoint triggers) and Netty for actual data exchange between tasks.
5.1.1. Dispatcher
The Dispatcher is the entry point for the cluster. It exposes the REST API, receives job submissions, and serves the Flink Web UI.
When a job arrives, the Dispatcher persists it durably via the ExecutionPlanWriter, then creates a JobManagerRunner which starts a JobMaster for that job. This persist-before-run design is what makes HA recovery possible: if the JobManager crashes and a new leader takes over, the new Dispatcher recovers persisted jobs from storage and re-creates their JobMasters.
In a session cluster, the Dispatcher lives for the lifetime of the cluster and handles multiple jobs. In application mode, it is scoped to a single application.
The Dispatcher also participates in leader election. A DispatcherLeaderProcess monitors whether this JobManager is the current leader. On gaining leadership, it reads recovered jobs from the ExecutionPlanStore and recovered dirty job results from the JobResultStore, then creates the actual Dispatcher instance with that recovery state.
5.1.2. Resource Manager
The ResourceManager owns the cluster's slot inventory. It maintains a registry of all TaskManagers and their slots, and a SlotManager that matches slot requests from JobMasters against available slots.
The flow:
TaskManagers start up and register with the
ResourceManagervia RPC, reporting how many slots they offer and each slot'sResourceProfile(CPU, memory).When a
JobMasterneeds slots, it declares resource requirements to the ResourceManager.The
SlotManagerchecks if existing free slots can satisfy the request. If yes, it sends anrequestSlotRPC to the TaskManager, telling it to allocate that slot for the specific job.If not enough free slots exist and the ResourceManager is backed by an active resource provider (Kubernetes, YARN), it requests new TaskManagers from the provider. In standalone mode, it can only wait for TaskManagers to register on their own.
The ResourceManager also monitors TaskManager health through heartbeats. If a TaskManager misses heartbeats, the ResourceManager declares it dead, removes its slots from the inventory, and notifies affected JobMasters.
Importantly, the ResourceManager knows nothing about job logic. It deals purely in slots: who has them, who needs them, and how to provision more.
Slot Allocation Flow:
5.1.3. Job Master
One JobMaster per running job. This is where the actual job execution is managed. Internally it contains two critical components:
5.1.3a. Scheduler
Scheduler decides when and where to deploy tasks. There are multiple scheduler implementations, such as:
DefaultSchedulerwithPipelinedRegionSchedulingStrategyfor streamingAdaptiveBatchSchedulerfor batch workloadsAdaptiveSchedulerfor reactive scaling (adjusts parallelism based on available slots)
The scheduler works with the SlotPool, which is the JobMaster's local view of allocated slots. The SlotPool uses a declarative resource model: it declares how many slots of what profile it needs, the ResourceManager fulfills them, and TaskManagers offer the allocated slots back to the JobMaster. Once slots are available, the scheduler assigns ExecutionVertex instances to them and triggers deployment.
For a pure streaming job like MyJob, the entire job is one pipelined region. On scheduling start, it finds all source regions and schedules them. Since everything is one region, all tasks launch at once.
For batch jobs with blocking shuffle boundaries, each stage is a separate region. Source regions are scheduled first. Downstream regions are scheduled only when their upstream blocking partitions become consumable. This saves resources by not starting downstream tasks that have nothing to consume yet.
5.1.3b. Checkpoint Coordinator
CheckpointCoordinator: triggers checkpoint barriers, tracks acknowledgements from all tasks, manages completed checkpoint metadata, and decides when to discard old checkpoints. This is the component that drives the entire checkpointing flow described in the earlier State section.
The JobMaster also handles failure recovery. When a task fails, it consults a FailoverStrategy (typically RestartPipelinedRegionFailoverStrategy) to determine which tasks need to be restarted, cancels them, and redeploys from the last checkpoint.
5.1.4. Job Lifecycle
A job, once accepted by the Dispatcher, moves through a state machine managed by the JobStatus. The typical happy path is straightforward: INITIALIZING → CREATED → RUNNING → FINISHED
INITIALIZING: The Dispatcher has received the job, but the JobMaster has not yet gained leadership or been fully created.CREATED: The JobMaster is ready. No tasks have been scheduled yet.RUNNING: At least some tasks are scheduled or executing. The job stays in this state until all tasks finish.FINISHED: All tasks completed successfully.
When a task fails during execution, the Scheduler evaluates whether the error is recoverable. If it is, the affected tasks are restarted. The job itself stays in RUNNING while individual tasks are restarted at the region level.
If the failure is unrecoverable (or restart attempts are exhausted), the job transitions through: RUNNING → FAILING → FAILED. FAILING cancels all remaining tasks. Once every task reaches a terminal state, the job moves to FAILED and exits.
When a user manually cancels a job (via the Web UI or CLI): RUNNING → CANCELLING → CANCELED. CANCELLING cancels all tasks. Once all tasks are in a terminal state, the job enters CANCELED.
Suspension (HA only): RUNNING → SUSPENDED. SUSPENDED only occurs when high availability is configured and the JobMaster loses leadership. The job is not removed from the HA store, it just means this particular JobMaster has stopped managing it. Another JobMaster (or the same one after regaining leadership) will pick the job back up and restart it.
5.2. Task Manager
The TaskManager is a JVM process that does the actual data processing. In Flink, this process is called TaskExecutor. Each cluster has one or more TaskExecutors, and each one registers with the ResourceManager on startup by sending a SlotReport listing all available task slots.
5.2.1. Task Slots
A TaskExecutor divides its resources into a fixed number of task slots. Each slot is a resource container with its own MemoryManager and a defined ResourceProfile (CPU, memory). The number of slots is configured via taskmanager.numberOfTaskSlots.
A slot has three states: ALLOCATED (assigned to a job by the ResourceManager, not yet in use by the JobMaster), ACTIVE (in use, tasks can be added), and RELEASING (tasks have failed, waiting to be fully emptied before the slot is freed).
The important detail: a slot can hold multiple tasks. The tasks map inside TaskSlot is keyed by ExecutionAttemptID, meaning multiple operator subtasks can share a single slot. This is where slot sharing comes in.
5.2.2. Task Slot Sharing
By default, Flink places all operators of a job into the same SlotSharingGroup. This means one subtask from each operator in the pipeline can be co-located in a single slot. For the running MyJob example:
The design motivation is twofold. First, it means a job with N pipeline stages does not need N × parallelism slots. The number of slots needed equals the maximum parallelism across all operators (here: 2). Second, co-locating a full pipeline slice in one slot enables forward connections to stay local (in-memory data exchange, no network serialization).
5.2.3. Task Execution Model
Each task runs in a dedicated thread and typically follows a simple internal pipeline: InputGate(s) → OperatorChain → ResultPartition(s)
The task reads records from its InputGate, passes them through the OperatorChain (the chained operators from the JobGraph), and writes output to its ResultPartition. Source tasks are the exception: they generate data directly, with no InputGate.
A ResultPartition is divided into SubPartitions, one per downstream consumer subtask. An InputGate is composed of InputChannels, one per upstream producer subtask.
The data exchange between ResultPartitions and InputGates goes through the ShuffleEnvironment. The default implementation is NettyShuffleEnvironment. If the producer and consumer are in the same TaskManager, data can be exchanged locally without going over the network.
For MyJob (Source+Map chained, parallelism 2 → Window parallelism 2 → Sink parallelism 1):
Source and Map are chained, so they share a thread with no serialization between them. The keyBy triggers an all-to-all shuffle: each SourceMap subtask's ResultPartition has 2 SubPartitions (one per Window subtask), and each Window subtask's InputGate has 2 InputChannels (one per SourceMap subtask).
Records are hashed by key and routed to the SubPartition responsible for that key group. Window to Sink has a parallelism change (2 → 1), so each Window subtask's ResultPartition has only 1 SubPartition (the single Sink), and the Sink's InputGate has 2 InputChannels (one per Window subtask).
5.2.4. Task Manager Services
In the post TaskManager and TaskExecutor may have been used interchangeably. To clarify, TaskManager is the process (the JVM). TaskExecutor is the main class running inside that process. In practice they refer to the same thing, but at different levels of abstraction.
When a TaskManager process starts, it initializes a set of shared services before any task is deployed. These services live for the lifetime of the process and are shared across all tasks running in it. They fall into a few categories.
Slot Management is central. The TaskSlotTable tracks which slots exist, which are free, and which tasks are running in each slot. The JobTable maps each active JobID to its JobMaster connection, so the TaskManager knows which JobMaster to report to for each task. The JobLeaderService monitors leadership changes for each job, so if a JobMaster fails over, the TaskManager reconnects to the new leader.
Network and Shuffle handles all data exchange. The ShuffleEnvironment (default: Netty) owns the buffer pools, creates ResultPartitions for task output and InputGates for task input. This is where credit based flow control and backpressure happen. The TaskExecutorPartitionTracker keeps track of which result partitions this TaskManager has produced, so they can be released when no longer needed.
Memory is handled by the per-slot MemoryManager (managed off-heap memory) and the IOManager (disk spill). Within managed memory, SharedResources enables reference-counted sharing of resources like RocksDB caches across operators in the same slot. State backends like RocksDB/ForSt and operators that sort or hash data use managed memory. The IOManager provides temporary file channels for spilling when memory is exhausted.
State and Checkpointing services support fault tolerance. The LocalStateStoresManager maintains local copies of state on disk for faster recovery (instead of always fetching from the distributed checkpoint store). The FileMergingManager is a newer optimization that merges many small checkpoint files into fewer larger ones to reduce file system pressure. The ChangelogStoragesManager supports the changelog state backend. The ChannelStateExecutorFactory handles snapshotting in-flight network buffers for unaligned checkpoints.
Classloading and Artifacts manages user code isolation. The LibraryCacheManager maintains per-job classloaders so that different jobs running on the same TaskManager do not interfere with each other. The PermanentBlobService downloads JAR files from the central BlobServer on the JobManager side. The FileCache handles files registered through the distributed cache API.
Connectivity keeps the TaskManager linked to the cluster. Two heartbeat managers run continuously: one toward the ResourceManager (reporting slot availability and resource usage) and one toward each JobMaster (reporting task status and metrics). If heartbeats stop, the other side assumes the TaskManager is dead and triggers failover. HAServices handles leader discovery so the TaskManager always knows who the current ResourceManager leader is.
When a task gets deployed into a slot, it receives references to these shared services. It does not create its own network stack. The NetworkBufferPool is shared across all tasks in the TaskManager, though each task gets its own LocalBufferPool drawn from it. Managed memory is scoped per slot: all tasks sharing a slot through slot sharing share the same MemoryManager, but tasks in different slots have independent memory budgets. Heartbeat connections are shared across the entire TaskManager process.
5.2.5. Task Manager Memory
A TaskManager is a single JVM process. Its total memory is carved into strictly defined regions at startup, each serving a different purpose. Unlike a typical Java application where the JVM manages one undifferentiated heap, Flink explicitly budgets every byte.
The first distinction is between what Flink controls (Total Flink Memory) and what the JVM needs for itself (Metaspace and Overhead). Together they form Total Process Memory, which is the container or process limit. When deploying on YARN or Kubernetes, Flink uses Total Process Memory to calculate the container request size.
Within Total Flink Memory, the heap is split into Framework and Task. Both live in the same JVM heap at runtime; Flink does not enforce isolation between them. The separation exists for budgeting: it ensures the framework always has enough headroom for coordination even when user code is memory intensive. Task Heap has no fixed default because it is the remainder after every other component is subtracted from Total Flink Memory.
The off-heap region covers Framework Off-Heap, Task Off-Heap, and Network Memory. All three are counted toward -XX:MaxDirectMemorySize. Network Memory is allocated as JVM direct memory (ByteBuffer.allocateDirect()), used exclusively for the network buffer pool that moves data between tasks. Framework and Task Off-Heap budget for both JVM direct memory and native memory; Flink counts their full configured amount toward the JVM direct memory limit as a conservative measure.
Managed Memory in practice is scoped per slot, not per task. Each slot gets its own MemoryManager with a budget of total managed memory divided by the number of slots. All tasks sharing a slot (through slot sharing) share this budget. For the MyJob example:
Managed Memory is different: it lives outside JVM direct memory entirely. For stateful operators using RocksDB, Flink reserves a budget and RocksDB allocates its own native memory through JNI. (invisible to -XX:MaxDirectMemorySize). This means Managed Memory and Network Memory never compete for the same JVM budget, and the state backend (RocksDB/ForSt) cannot accidentally starve the network layer.
The tradeoff is that if Managed Memory is misconfigured and the process exceeds its container limit, the OS kills the process rather than the JVM throwing a catchable exception.
5.3. Network
Flink's network stack sits inside flink-runtime and connects all subtasks across TaskManagers. It is the layer through which all shuffled data flows, making it a primary factor in both throughput and latency. Coordination between TaskManagers and the JobManager uses RPC (Pekko). Data transport between subtasks uses a lower level API built on Netty.
5.3.1. Physical Transport
In the example job, keyBy() introduces a network shuffle between SourceMap and Window. Records can no longer stay local to the subtask that produced them. Each record is hashed by its key and routed to whichever Window subtask is responsible for that key group. This is a full all-to-all connection: every SourceMap subtask must be able to send to every Window subtask.
As covered in the Task Execution Model, slot sharing places each pipeline slice into a single slot. With a small twist for this section, the two slots sit on two different TaskManagers. This means some connections are local (same TM) and some are remote (cross-TM, over TCP via Netty).
Whether a connection is local or remote depends entirely on where the subtasks land:
Each remote connection gets its own TCP channel. Consider a higher parallelism, i.e. parallelism 4 across two TaskManagers offering 2 slots each, multiple subtasks of the same task share a TaskManager. Their remote connections toward the same destination TaskManager are then multiplexed over a single TCP channel, reducing resource usage.
Each subtask's output is a ResultPartition, split into ResultSubpartitions, one per downstream consumer. In the example, each SourceMap subtask has a ResultPartition with 4 ResultSubpartitions (one for each Window subtask). Each Window subtask has a ResultPartition with 1 ResultSubpartition (the single Sink subtask).
On the receiving side, each subtask reads from an InputGate containing InputChannels, one per upstream producer. Each Window subtask's InputGate has 4 InputChannels (one from each SourceMap subtask). Sink's InputGate has 4 InputChannels (one from each Window subtask).
At this layer, Flink no longer deals with individual records. Data is serialized and packed into network buffers. Each subtask has its own local buffer pool, one on the sending side and one on the receiving side, bounded by: #channels × buffers-per-channel + floating-buffers-per-gate
With defaults of 2 exclusive buffers per channel and 8 floating buffers per gate, each Window subtask's receiving buffer pool is capped at 4 × 2 + 8 = 16 buffers. These are drawn from the NetworkBufferPool covered in the Memory Model section.
5.3.2. Credit-based Flow Control
Since all logical channels between two TaskManagers are multiplexed over a single TCP connection, a slow receiver on one channel could stall the connection entirely, throttling every other subtask sharing the wire. Credit-based flow control solves this by tracking buffer availability per logical channel, keeping backpressure isolated.
The core rule: a sender may only forward a buffer if the receiver has announced capacity for it. 1 buffer = 1 credit.
On the receiving side, each remote input channel has two kinds of buffers:
- Exclusive buffers (2 per channel): permanently assigned, never shared.
- Floating buffers (8 per gate): shared across all channels in the gate, borrowed on demand.
If there are not enough floating buffers available globally, each buffer pool gets a share proportional to its capacity of whatever is available. The cycle:
When a channel is established, the receiver announces its exclusive buffers as initial credits.
The sender tracks the credit score per subpartition. Each sent buffer decrements the credit by one. No credit, no sending.
Each buffer sent also carries the sender's current backlog size, how many buffers are still waiting in that subpartition's queue.
The receiver uses the backlog to request floating buffers from the gate's shared pool. It may get all, some, or none. If none are available, it registers as a listener and gets notified when one is recycled.
Every newly acquired buffer is announced back to the sender as a fresh credit, and the cycle continues.
If a receiver falls behind, its credits eventually hit 0. The sender stops forwarding buffers for that channel only. The TCP connection stays open, other channels on it continue normally. In the example: if one Window subtask on TM2 falls behind, its credit drops to 0. The SourceMap subtasks stop sending to it but keep sending to every other Window subtask. The shared TCP connection between TM1 and TM2 is never blocked.
Because one channel in a multiplex can no longer block another, overall resource utilization improves. Full control over how much data is "on the wire" also improves checkpoint alignment. Without flow control, a stalled receiver would still have the lower network stack's internal buffers filling up, and checkpoint barriers would queue behind all of that data, waiting for it to drain before alignment could begin. With credit-based control, there is far less data sitting in transit, so barriers propagate faster.
5.3.3. Buffer Flushing
The RecordWriter serializes each record into bytes on the heap, then writes those bytes into the network buffer currently assigned to the target subpartition. If the record doesn't fit, the remaining bytes spill into a new buffer. The deserializer on the receiving side (SpillingAdaptiveSpanningRecordDeserializer) handles reassembly, including records that span multiple 32 KB buffers.
A buffer becomes available for Netty to consume in three situations:
Buffer full: the writer finishes the buffer and requests a new one. The finished buffer is added to the subpartition queue, which notifies Netty.
Buffer timeout: a background thread (
OutputFlusher) periodically calls flush (default: every 100ms, configured viaexecution.buffer-timeout.interval). This notifies Netty to consume whatever has been written so far without closing the buffer. The buffer stays in the queue and keeps accumulating more data from the writer side.Special event: checkpoint barriers, end-of-partition events, etc. These finish all in-progress buffers immediately and add the event to every subpartition.
The buffer is added to the subpartition queue while still being written to (via the BufferBuilder / BufferConsumer pair). The writer appends through the BufferBuilder, Netty reads through the BufferConsumer. This avoids synchronization on every record, the two sides only coordinate through the buffer's reader and writer indices.
In low-throughput scenarios, the output flusher drives latency. In high-throughput scenarios, buffers fill up before the flusher fires and the system self-adjusts.
Cite this article as: Adesh Nalpet Adimurthy. (Feb 21, 2026). Apache Flink Internals. PyBlog. https://www.pyblog.xyz/flink-internals