Skip to content

Commit 35c2440

Browse files
committed
feat(controller:diskless): enable immediate partition reassignment (#537)
* feat(controller:diskless): enable immediate partition reassignment For diskless topics, partition reassignment now completes immediately without the staged process (addingReplicas/removingReplicas). Since data is stored in object storage rather than local disk, there is nothing to sync between replicas. Changes: - Skip setting targetRemoving/targetAdding for diskless topics - Use target replicas directly instead of merged replica list - Update tests to expect immediate completion (no ongoing reassignment) (cherry picked from commit 1982f0e) # Conflicts: # metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
1 parent eeb3e5c commit 35c2440

2 files changed

Lines changed: 256 additions & 50 deletions

File tree

metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java

Lines changed: 81 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -539,10 +539,14 @@ public void replay(PartitionRecord record) {
539539
isReassignmentInProgress(prevPartInfo), isReassignmentInProgress(newPartInfo));
540540
}
541541

542-
if (newPartInfo.hasPreferredLeader()) {
543-
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
544-
} else {
545-
imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
542+
// Diskless topics are excluded: the metadata transformer handles leader routing,
543+
// so tracking preferred leader imbalance is unnecessary.
544+
if (!isDisklessTopic(topicInfo.name)) {
545+
if (newPartInfo.hasPreferredLeader()) {
546+
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
547+
} else {
548+
imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
549+
}
546550
}
547551
}
548552

@@ -585,10 +589,12 @@ public void replay(PartitionChangeRecord record) {
585589
record.topicId();
586590
newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo);
587591

588-
if (newPartitionInfo.hasPreferredLeader()) {
589-
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
590-
} else {
591-
imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
592+
if (!isDisklessTopic(topicInfo.name)) {
593+
if (newPartitionInfo.hasPreferredLeader()) {
594+
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
595+
} else {
596+
imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
597+
}
592598
}
593599

594600
if (record.removingReplicas() != null || record.addingReplicas() != null) {
@@ -1446,8 +1452,7 @@ ControllerResult<InitDisklessLogResponseData> initDisklessLog(
14461452
}
14471453

14481454
TopicControlInfo topic = topics.get(topicId);
1449-
boolean isDisklessTopic = Boolean.parseBoolean(
1450-
configurationControl.currentTopicConfig(topic.name).getOrDefault(DISKLESS_ENABLE_CONFIG, "false"));
1455+
boolean isDisklessTopic = isDisklessTopic(topic.name);
14511456
if (isDisklessTopic) {
14521457
for (InitDisklessLogRequestData.PartitionData partitionData : topicData.partitions()) {
14531458
partitionResponses.add(new InitDisklessLogResponseData.PartitionResponse()
@@ -2115,6 +2120,16 @@ void maybeTriggerLeaderChangeForPartitionsWithoutPreferredLeader(
21152120
continue;
21162121
}
21172122

2123+
// Skip diskless topics: the metadata transformer handles leader routing,
2124+
// so controller-level preferred leader election is unnecessary.
2125+
// After reassignment, the leader may not match the preferred replica (replicas[0])
2126+
// because PartitionChangeBuilder.electLeader() prefers keeping the current leader.
2127+
// This is intentional — the metadata transformer routes requests independently of
2128+
// the preferred replica order.
2129+
if (isDisklessTopic(topic.name)) {
2130+
continue;
2131+
}
2132+
21182133
PartitionRegistration partition = topic.parts.get(topicPartition.partitionId());
21192134
if (partition == null) {
21202135
log.error("Skipping unknown imbalanced partition {}", topicPartition);
@@ -2444,9 +2459,7 @@ void generateLeaderAndIsrUpdates(String context,
24442459
.setAllowReplicationFactorChange(allowRFChange);
24452460
int successfulAlterations = 0, totalAlterations = 0;
24462461
for (ReassignableTopic topic : request.topics()) {
2447-
boolean effectiveRFChange = allowRFChange
2448-
&& !Boolean.parseBoolean(configurationControl.currentTopicConfig(topic.name()).getOrDefault(
2449-
DISKLESS_ENABLE_CONFIG, "false"));
2462+
boolean effectiveRFChange = allowRFChange && !isDisklessTopic(topic.name());
24502463
ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
24512464
setName(topic.name());
24522465
for (ReassignablePartition partition : topic.partitions()) {
@@ -2581,23 +2594,63 @@ Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
25812594
List<Integer> currentReplicas = Replicas.toList(part.replicas);
25822595
PartitionReassignmentReplicas reassignment =
25832596
new PartitionReassignmentReplicas(currentAssignment, targetAssignment);
2597+
2598+
boolean isDiskless = isDisklessTopic(topics.get(tp.topicId()).name);
2599+
2600+
// Diskless topics don't use local directories — skip the directory check in leader election.
2601+
// Any active (unfenced, not in controlled shutdown) broker can lead a diskless partition,
2602+
// since it reads all data from object storage rather than local disk.
2603+
IntPredicate leaderAcceptor = isDiskless
2604+
? clusterControl::isActive
2605+
: new LeaderAcceptor(clusterControl, part);
2606+
25842607
PartitionChangeBuilder builder = new PartitionChangeBuilder(
25852608
part,
25862609
tp.topicId(),
25872610
tp.partitionId(),
2588-
new LeaderAcceptor(clusterControl, part),
2611+
leaderAcceptor,
25892612
featureControl.metadataVersionOrThrow(),
25902613
getTopicEffectiveMinIsr(topics.get(tp.topicId()).name)
25912614
);
25922615
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
2593-
if (!reassignment.replicas().equals(currentReplicas)) {
2594-
builder.setTargetReplicas(reassignment.replicas());
2595-
}
2596-
if (!reassignment.removing().isEmpty()) {
2597-
builder.setTargetRemoving(reassignment.removing());
2598-
}
2599-
if (!reassignment.adding().isEmpty()) {
2600-
builder.setTargetAdding(reassignment.adding());
2616+
2617+
if (isDiskless) {
2618+
// Diskless: data is in object storage, no replica sync needed.
2619+
// Apply target replicas directly — skip the staged adding/removing process
2620+
// (no addingReplicas/removingReplicas). This is safe because:
2621+
//
2622+
// 1. No offline risk: the reassignment is rejected if all target brokers are
2623+
// fenced (see activeIsr check below). As long as at least one target broker
2624+
// is active, PartitionChangeBuilder.electLeader() will elect it as leader
2625+
// since the leaderAcceptor above only requires isActive (no directory check).
2626+
//
2627+
// 2. Cache warming: new leaders start with a cold InklessMetadataView cache,
2628+
// which may cause higher fetch latency and increased object storage reads
2629+
// until the cache is populated. This is a transient effect — the architecture
2630+
// already assumes brokers are interchangeable since all data lives in object
2631+
// storage. A future optimization could pre-warm target brokers' caches before
2632+
// completing the reassignment.
2633+
if (!target.replicas().equals(currentReplicas)) {
2634+
List<Integer> activeIsr = target.replicas().stream()
2635+
.filter(clusterControl::isActive)
2636+
.toList();
2637+
if (activeIsr.isEmpty()) {
2638+
throw new InvalidReplicaAssignmentException(
2639+
"None of the target replicas " + target.replicas() + " are active.");
2640+
}
2641+
builder.setTargetReplicas(target.replicas());
2642+
builder.setTargetIsr(activeIsr);
2643+
}
2644+
} else {
2645+
if (!reassignment.replicas().equals(currentReplicas)) {
2646+
builder.setTargetReplicas(reassignment.replicas());
2647+
}
2648+
if (!reassignment.removing().isEmpty()) {
2649+
builder.setTargetRemoving(reassignment.removing());
2650+
}
2651+
if (!reassignment.adding().isEmpty()) {
2652+
builder.setTargetAdding(reassignment.adding());
2653+
}
26012654
}
26022655
return builder.setDefaultDirProvider(clusterDescriber).build();
26032656
}
@@ -2820,6 +2873,12 @@ private void validatePartitionReplicationFactorUnchanged(PartitionRegistration p
28202873
}
28212874
}
28222875

2876+
private boolean isDisklessTopic(String topicName) {
2877+
return Boolean.parseBoolean(
2878+
configurationControl.currentTopicConfig(topicName)
2879+
.getOrDefault(DISKLESS_ENABLE_CONFIG, "false"));
2880+
}
2881+
28232882
private static final class IneligibleReplica {
28242883
private final int replicaId;
28252884
private final String reason;

0 commit comments

Comments
 (0)