feat(inkless): Set migrating partitions in KRaft metadata#580
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a “migration pending” state for classic→diskless partition migration and wires it through controller metadata updates and broker fetch behavior so partitions can be marked pending in KRaft metadata before the actual diskless start offset is committed.
Changes:
- Introduce
CLASSIC_TO_DISKLESS_MIGRATION_PENDING = -2as a new sentinel forclassicToDisklessStartOffset. - Accept
classicToDisklessStartOffset = -2inInitDisklessLogand add controller logic to mark partitions pending when enabling diskless via incremental alter configs. - Update ReplicaManager fetch routing and add unit tests covering pending-state round trips, merges, and fetch/init behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java | Adds a new sentinel constant for “migration pending”. |
| metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java | Treats non-negative offsets as “already initialized” and introduces a helper to mark partitions migration-pending. |
| metadata/src/main/java/org/apache/kafka/controller/QuorumController.java | Appends migration-pending partition records during incrementalAlterConfigs when diskless is enabled. |
| core/src/main/scala/kafka/server/ReplicaManager.scala | Routes fetches to UnifiedLog when migration is pending and adjusts control-plane init trigger to handle any negative→nonnegative transition. |
| metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java | Adds tests for pending-state record round-trip and merge semantics. |
| metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java | Adds tests ensuring InitDisklessLog accepts pending partitions and rejects already-initialized partitions. |
| core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | Adds fetch behavior tests for migration-pending partitions under managed/unmanaged replicas. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| List<ApiMessageAndVersion> allRecords = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); | ||
| allRecords.addAll(result.records()); | ||
| allRecords.addAll(migrationRecords); | ||
| return ControllerResult.of(allRecords, result.response()); |
There was a problem hiding this comment.
Seems there is a subtle bug here, causing the JDK17 CI failure while JDK25 passes.
configurationControl.incrementalAlterConfigs uses ControllerResult.atomicOf instead of ControllerResult.of -- trying to reproduce this locally.
There was a problem hiding this comment.
Changed to use ControllerResult.atomicOf, let's see how the ci goes
There was a problem hiding this comment.
Seems all passed now.
When service fetch requests, it's necessary to discern between a topic being migrated from classic to diskless but that has not yet committed the disklessStartOffset to KRaft (in this case fetch still need to go through UnifiedLog) and a full diskless topic (fetch goes through diskless read path). Introduce a new sentinel value (-2) for PartitionRegistration.classicToDisklessStartOffset that indicates that the partition is being migrated from classic to diskless.
When service fetch requests, it's necessary to discern between a topic being migrated from classic to diskless but that has not yet committed the disklessStartOffset to KRaft (in this case fetch still need to go through UnifiedLog) and a full diskless topic (fetch goes through diskless read path).
Introduce a new sentinel value (-2) for
PartitionRegistration.classicToDisklessStartOffsetthat indicates that the partition is being migrated from classic to diskless.Before these changes:
diskless.enable=false,PartitionRegistration.classicToDisklessStartOffset = -1diskless.enable=true,PartitionRegistration.classicToDisklessStartOffset = -1diskless.enable=true,PartitionRegistration.classicToDisklessStartOffset = -1diskless.enable=true,PartitionRegistration.classicToDisklessStartOffset = $disklessStartOffsetCase 2 and 3 are equal, even though 2 requires reading from UnifiedLog and 3 needs to read from diskless path.
After the change:
diskless.enable=false,PartitionRegistration.classicToDisklessStartOffset = -1diskless.enable=true,PartitionRegistration.classicToDisklessStartOffset = -1diskless.enable=true,PartitionRegistration.classicToDisklessStartOffset = -2diskless.enable=true,PartitionRegistration.classicToDisklessStartOffset = $disklessStartOffsetAll cases can be individually distinguished.