@@ -539,10 +539,14 @@ public void replay(PartitionRecord record) {
539539 isReassignmentInProgress (prevPartInfo ), isReassignmentInProgress (newPartInfo ));
540540 }
541541
542- if (newPartInfo .hasPreferredLeader ()) {
543- imbalancedPartitions .remove (new TopicIdPartition (record .topicId (), record .partitionId ()));
544- } else {
545- imbalancedPartitions .add (new TopicIdPartition (record .topicId (), record .partitionId ()));
542+ // Diskless topics are excluded: the metadata transformer handles leader routing,
543+ // so tracking preferred leader imbalance is unnecessary.
544+ if (!isDisklessTopic (topicInfo .name )) {
545+ if (newPartInfo .hasPreferredLeader ()) {
546+ imbalancedPartitions .remove (new TopicIdPartition (record .topicId (), record .partitionId ()));
547+ } else {
548+ imbalancedPartitions .add (new TopicIdPartition (record .topicId (), record .partitionId ()));
549+ }
546550 }
547551 }
548552
@@ -585,10 +589,12 @@ public void replay(PartitionChangeRecord record) {
585589 record .topicId ();
586590 newPartitionInfo .maybeLogPartitionChange (log , topicPart , prevPartitionInfo );
587591
588- if (newPartitionInfo .hasPreferredLeader ()) {
589- imbalancedPartitions .remove (new TopicIdPartition (record .topicId (), record .partitionId ()));
590- } else {
591- imbalancedPartitions .add (new TopicIdPartition (record .topicId (), record .partitionId ()));
592+ if (!isDisklessTopic (topicInfo .name )) {
593+ if (newPartitionInfo .hasPreferredLeader ()) {
594+ imbalancedPartitions .remove (new TopicIdPartition (record .topicId (), record .partitionId ()));
595+ } else {
596+ imbalancedPartitions .add (new TopicIdPartition (record .topicId (), record .partitionId ()));
597+ }
592598 }
593599
594600 if (record .removingReplicas () != null || record .addingReplicas () != null ) {
@@ -1446,8 +1452,7 @@ ControllerResult<InitDisklessLogResponseData> initDisklessLog(
14461452 }
14471453
14481454 TopicControlInfo topic = topics .get (topicId );
1449- boolean isDisklessTopic = Boolean .parseBoolean (
1450- configurationControl .currentTopicConfig (topic .name ).getOrDefault (DISKLESS_ENABLE_CONFIG , "false" ));
1455+ boolean isDisklessTopic = isDisklessTopic (topic .name );
14511456 if (isDisklessTopic ) {
14521457 for (InitDisklessLogRequestData .PartitionData partitionData : topicData .partitions ()) {
14531458 partitionResponses .add (new InitDisklessLogResponseData .PartitionResponse ()
@@ -2115,6 +2120,16 @@ void maybeTriggerLeaderChangeForPartitionsWithoutPreferredLeader(
21152120 continue ;
21162121 }
21172122
2123+ // Skip diskless topics: the metadata transformer handles leader routing,
2124+ // so controller-level preferred leader election is unnecessary.
2125+ // After reassignment, the leader may not match the preferred replica (replicas[0])
2126+ // because PartitionChangeBuilder.electLeader() prefers keeping the current leader.
2127+ // This is intentional — the metadata transformer routes requests independently of
2128+ // the preferred replica order.
2129+ if (isDisklessTopic (topic .name )) {
2130+ continue ;
2131+ }
2132+
21182133 PartitionRegistration partition = topic .parts .get (topicPartition .partitionId ());
21192134 if (partition == null ) {
21202135 log .error ("Skipping unknown imbalanced partition {}" , topicPartition );
@@ -2444,9 +2459,7 @@ void generateLeaderAndIsrUpdates(String context,
24442459 .setAllowReplicationFactorChange (allowRFChange );
24452460 int successfulAlterations = 0 , totalAlterations = 0 ;
24462461 for (ReassignableTopic topic : request .topics ()) {
2447- boolean effectiveRFChange = allowRFChange
2448- && !Boolean .parseBoolean (configurationControl .currentTopicConfig (topic .name ()).getOrDefault (
2449- DISKLESS_ENABLE_CONFIG , "false" ));
2462+ boolean effectiveRFChange = allowRFChange && !isDisklessTopic (topic .name ());
24502463 ReassignableTopicResponse topicResponse = new ReassignableTopicResponse ().
24512464 setName (topic .name ());
24522465 for (ReassignablePartition partition : topic .partitions ()) {
@@ -2581,23 +2594,63 @@ Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
25812594 List <Integer > currentReplicas = Replicas .toList (part .replicas );
25822595 PartitionReassignmentReplicas reassignment =
25832596 new PartitionReassignmentReplicas (currentAssignment , targetAssignment );
2597+
2598+ boolean isDiskless = isDisklessTopic (topics .get (tp .topicId ()).name );
2599+
2600+ // Diskless topics don't use local directories — skip the directory check in leader election.
2601+ // Any active (unfenced, not in controlled shutdown) broker can lead a diskless partition,
2602+ // since it reads all data from object storage rather than local disk.
2603+ IntPredicate leaderAcceptor = isDiskless
2604+ ? clusterControl ::isActive
2605+ : new LeaderAcceptor (clusterControl , part );
2606+
25842607 PartitionChangeBuilder builder = new PartitionChangeBuilder (
25852608 part ,
25862609 tp .topicId (),
25872610 tp .partitionId (),
2588- new LeaderAcceptor ( clusterControl , part ) ,
2611+ leaderAcceptor ,
25892612 featureControl .metadataVersionOrThrow (),
25902613 getTopicEffectiveMinIsr (topics .get (tp .topicId ()).name )
25912614 );
25922615 builder .setEligibleLeaderReplicasEnabled (featureControl .isElrFeatureEnabled ());
2593- if (!reassignment .replicas ().equals (currentReplicas )) {
2594- builder .setTargetReplicas (reassignment .replicas ());
2595- }
2596- if (!reassignment .removing ().isEmpty ()) {
2597- builder .setTargetRemoving (reassignment .removing ());
2598- }
2599- if (!reassignment .adding ().isEmpty ()) {
2600- builder .setTargetAdding (reassignment .adding ());
2616+
2617+ if (isDiskless ) {
2618+ // Diskless: data is in object storage, no replica sync needed.
2619+ // Apply target replicas directly — skip the staged adding/removing process
2620+ // (no addingReplicas/removingReplicas). This is safe because:
2621+ //
2622+ // 1. No offline risk: the reassignment is rejected if all target brokers are
2623+ // fenced (see activeIsr check below). As long as at least one target broker
2624+ // is active, PartitionChangeBuilder.electLeader() will elect it as leader
2625+ // since the leaderAcceptor above only requires isActive (no directory check).
2626+ //
2627+ // 2. Cache warming: new leaders start with a cold InklessMetadataView cache,
2628+ // which may cause higher fetch latency and increased object storage reads
2629+ // until the cache is populated. This is a transient effect — the architecture
2630+ // already assumes brokers are interchangeable since all data lives in object
2631+ // storage. A future optimization could pre-warm target brokers' caches before
2632+ // completing the reassignment.
2633+ if (!target .replicas ().equals (currentReplicas )) {
2634+ List <Integer > activeIsr = target .replicas ().stream ()
2635+ .filter (clusterControl ::isActive )
2636+ .toList ();
2637+ if (activeIsr .isEmpty ()) {
2638+ throw new InvalidReplicaAssignmentException (
2639+ "None of the target replicas " + target .replicas () + " are active." );
2640+ }
2641+ builder .setTargetReplicas (target .replicas ());
2642+ builder .setTargetIsr (activeIsr );
2643+ }
2644+ } else {
2645+ if (!reassignment .replicas ().equals (currentReplicas )) {
2646+ builder .setTargetReplicas (reassignment .replicas ());
2647+ }
2648+ if (!reassignment .removing ().isEmpty ()) {
2649+ builder .setTargetRemoving (reassignment .removing ());
2650+ }
2651+ if (!reassignment .adding ().isEmpty ()) {
2652+ builder .setTargetAdding (reassignment .adding ());
2653+ }
26012654 }
26022655 return builder .setDefaultDirProvider (clusterDescriber ).build ();
26032656 }
@@ -2820,6 +2873,12 @@ private void validatePartitionReplicationFactorUnchanged(PartitionRegistration p
28202873 }
28212874 }
28222875
2876+ private boolean isDisklessTopic (String topicName ) {
2877+ return Boolean .parseBoolean (
2878+ configurationControl .currentTopicConfig (topicName )
2879+ .getOrDefault (DISKLESS_ENABLE_CONFIG , "false" ));
2880+ }
2881+
28232882 private static final class IneligibleReplica {
28242883 private final int replicaId ;
28252884 private final String reason ;
0 commit comments