2525import org .apache .kafka .common .config .ConfigResource ;
2626import org .apache .kafka .common .errors .ApiException ;
2727import org .apache .kafka .common .errors .BrokerIdNotRegisteredException ;
28+ import org .apache .kafka .common .errors .BrokerNotAvailableException ;
2829import org .apache .kafka .common .errors .InvalidPartitionsException ;
2930import org .apache .kafka .common .errors .InvalidReplicaAssignmentException ;
3031import org .apache .kafka .common .errors .InvalidReplicationFactorException ;
@@ -162,6 +163,7 @@ static class Builder {
162163 private int defaultNumPartitions = 1 ;
163164 private boolean defaultDisklessEnable = false ;
164165 private boolean isDisklessStorageSystemEnabled = false ;
166+ private boolean isDisklessManagedReplicasEnabled = false ;
165167
166168 private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE ;
167169 private ConfigurationControlManager configurationControl = null ;
@@ -199,6 +201,11 @@ public Builder setDisklessStorageSystemEnabled(boolean isDisklessStorageSystemEn
199201 return this ;
200202 }
201203
204+ public Builder setDisklessManagedReplicasEnabled (boolean isDisklessManagedReplicasEnabled ) {
205+ this .isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled ;
206+ return this ;
207+ }
208+
202209 Builder setMaxElectionsPerImbalance (int maxElectionsPerImbalance ) {
203210 this .maxElectionsPerImbalance = maxElectionsPerImbalance ;
204211 return this ;
@@ -241,6 +248,7 @@ ReplicationControlManager build() {
241248 defaultNumPartitions ,
242249 defaultDisklessEnable ,
243250 isDisklessStorageSystemEnabled ,
251+ isDisklessManagedReplicasEnabled ,
244252 maxElectionsPerImbalance ,
245253 configurationControl ,
246254 clusterControl ,
@@ -318,8 +326,21 @@ static Map<String, String> translateCreationConfigs(CreatableTopicConfigCollecti
318326 */
319327 private final boolean defaultDisklessEnable ;
320328
329+ /**
330+ * When true, the diskless storage system is enabled, allowing diskless topics to be created.
331+ */
321332 private final boolean isDisklessStorageSystemEnabled ;
322333
334+ /**
335+ * When true, diskless topics use managed replicas with RF = rack_count (one replica per rack).
336+ * When false, diskless topics use legacy RF=1 behavior.
337+ *
338+ * <p>Phase 1 limitation: This config only affects topic creation. Add Partitions inherits
339+ * RF from existing partitions (correct behavior - maintains consistency within the topic).
340+ * See DISKLESS_MANAGED_RF.md for the full implementation roadmap.
341+ */
342+ private final boolean isDisklessManagedReplicasEnabled ;
343+
323344 /**
324345 * Maximum number of leader elections to perform during one partition leader balancing operation.
325346 */
@@ -410,6 +431,7 @@ private ReplicationControlManager(
410431 int defaultNumPartitions ,
411432 boolean defaultDisklessEnable ,
412433 boolean isDisklessStorageSystemEnabled ,
434+ boolean isDisklessManagedReplicasEnabled ,
413435 int maxElectionsPerImbalance ,
414436 ConfigurationControlManager configurationControl ,
415437 ClusterControlManager clusterControl ,
@@ -422,6 +444,7 @@ private ReplicationControlManager(
422444 this .defaultNumPartitions = defaultNumPartitions ;
423445 this .defaultDisklessEnable = defaultDisklessEnable ;
424446 this .isDisklessStorageSystemEnabled = isDisklessStorageSystemEnabled ;
447+ this .isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled ;
425448 this .maxElectionsPerImbalance = maxElectionsPerImbalance ;
426449 this .configurationControl = configurationControl ;
427450 this .createTopicPolicy = createTopicPolicy ;
@@ -761,6 +784,8 @@ private ApiError createTopic(ControllerRequestContext context,
761784 "when the diskless storage system is disabled. " +
762785 "Please enable the diskless storage system to create diskless topics." );
763786 }
787+ // Diskless RF: only -1 (system-computed from rack count) or 1 (backward compat) allowed.
788+ // Explicit RF > 1 rejected: users shouldn't need to know rack topology.
764789 if (Math .abs (topic .replicationFactor ()) != 1 ) {
765790 return new ApiError (Errors .INVALID_REPLICATION_FACTOR ,
766791 "Replication factor for diskless topics must be 1 or -1 to use the default value (1)." );
@@ -778,7 +803,7 @@ private ApiError createTopic(ControllerRequestContext context,
778803 "A manual partition assignment was specified, but numPartitions " +
779804 "was not set to -1." );
780805 }
781- if (disklessEnabled ) {
806+ if (disklessEnabled && ! isDisklessManagedReplicasEnabled ) {
782807 return new ApiError (INVALID_REQUEST ,
783808 "A manual partition assignment cannot be specified for diskless topics." );
784809 }
@@ -826,12 +851,15 @@ private ApiError createTopic(ControllerRequestContext context,
826851 } else {
827852 int numPartitions = topic .numPartitions () == -1 ?
828853 defaultNumPartitions : topic .numPartitions ();
829- short replicationFactor = topic .replicationFactor () == -1 ?
830- defaultReplicationFactor : topic .replicationFactor ();
854+ short classicReplicationFactor = topic .replicationFactor () == -1 ? defaultReplicationFactor : topic .replicationFactor ();
855+ short disklessReplicationFactor = disklessEnabled && isDisklessManagedReplicasEnabled ? rackCardinality () : 1 ;
856+ short replicationFactor = disklessEnabled ? disklessReplicationFactor : classicReplicationFactor ;
831857 try {
832858 TopicAssignment topicAssignment ;
833859 Predicate <Integer > brokerFilter ;
834- if (!disklessEnabled ) {
860+ // Diskless managed-replicas is equivalent to classic topic assignment,
861+ // but RF is defined by number of racks
862+ if (!disklessEnabled || isDisklessManagedReplicasEnabled ) {
835863 topicAssignment = clusterControl .replicaPlacer ().place (new PlacementSpec (
836864 0 ,
837865 numPartitions ,
@@ -901,6 +929,31 @@ private ApiError createTopic(ControllerRequestContext context,
901929 return ApiError .NONE ;
902930 }
903931
932+ /**
933+ * Computes the replication factor for diskless topics based on rack topology.
934+ * Returns the number of distinct racks in the cluster, ensuring one replica per rack.
935+ * If brokers have no rack configured, each broker is treated as its own "rack" (RF = broker count).
936+ *
937+ * @return the number of distinct racks as a short
938+ * @throws BrokerNotAvailableException if no brokers are registered
939+ * @throws InvalidReplicationFactorException if rack count exceeds Short.MAX_VALUE
940+ */
941+ private short rackCardinality () {
942+ final Collection <BrokerRegistration > brokerRegistrations = clusterControl .brokerRegistrations ().values ();
943+ final long racks = brokerRegistrations .stream ()
944+ .map (BrokerRegistration ::rack )
945+ .distinct ()
946+ .count ();
947+ if (racks > Short .MAX_VALUE ) {
948+ // Unfeasible but technically possible scenario.
949+ // Would require more than 32,768 brokers and each with a different rack
950+ throw new InvalidReplicationFactorException ("Unexpected scenario: rack cardinality is not within short range (" + racks + "). Failing topic creation." );
951+ }
952+ if (racks == 0 )
953+ throw new BrokerNotAvailableException ("No brokers available to create diskless topic." );
954+ return (short ) racks ;
955+ }
956+
904957 private List <ApiMessageAndVersion > validConfigRecords (CreatableTopic topic , List <ApiMessageAndVersion > configRecords , boolean disklessEnabled ) {
905958 final List <ApiMessageAndVersion > validConfigRecord = new ArrayList <>();
906959 boolean isDisklessEnableDefined = false ;
0 commit comments