Skip to content

Commit 259d178

Browse files
committed
feat(metadata:diskless): implement managed replicas for diskless topics
When diskless.managed.rf.enable=true, new diskless topics are created with RF=rack_count using standard KRaft replica placement instead of legacy RF=1. Changes: - Compute RF from rack cardinality via rackCardinality() - Use standard replicaPlacer.place() for rack-aware assignment - Allow manual replica assignments when managed replicas enabled - Add checkstyle suppression for extended createTopic method Phase 1 limitations: - Add Partitions inherits RF from existing partitions (Phase 3) - Transformer not updated, uses legacy routing (Phase 2) - Integration tests deferred to Phase 2 (See #478 docs/inkless/ts-unification/DISKLESS_MANAGED_RF.md)
1 parent 1f7d557 commit 259d178

4 files changed

Lines changed: 811 additions & 6 deletions

File tree

checkstyle/suppressions.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,9 @@
317317
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager|MetaPropertiesEnsemble).java"/>
318318
<suppress checks="NPathComplexity"
319319
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager|ClusterControlManager|MetadataDelta|MetaPropertiesEnsemble).java"/>
320+
<!-- ReplicationControlManager.createTopic method extended for diskless managed replicas (Phase 1) -->
321+
<suppress checks="MethodLength"
322+
files="ReplicationControlManager.java"/>
320323
<suppress checks="BooleanExpressionComplexity"
321324
files="(MetadataImage).java"/>
322325
<suppress checks="ImportControl"

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,6 +1573,7 @@ private QuorumController(
15731573
setDefaultNumPartitions(defaultNumPartitions).
15741574
setDefaultDisklessEnable(defaultDisklessEnable).
15751575
setDisklessStorageSystemEnabled(disklessStorageSystemEnabled).
1576+
setDisklessManagedReplicasEnabled(disklessManagedReplicasEnabled).
15761577
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
15771578
setConfigurationControl(configurationControl).
15781579
setClusterControl(clusterControl).

metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.kafka.common.config.ConfigResource;
2626
import org.apache.kafka.common.errors.ApiException;
2727
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
28+
import org.apache.kafka.common.errors.BrokerNotAvailableException;
2829
import org.apache.kafka.common.errors.InvalidPartitionsException;
2930
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
3031
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
@@ -162,6 +163,7 @@ static class Builder {
162163
private int defaultNumPartitions = 1;
163164
private boolean defaultDisklessEnable = false;
164165
private boolean isDisklessStorageSystemEnabled = false;
166+
private boolean isDisklessManagedReplicasEnabled = false;
165167

166168
private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
167169
private ConfigurationControlManager configurationControl = null;
@@ -199,6 +201,11 @@ public Builder setDisklessStorageSystemEnabled(boolean isDisklessStorageSystemEn
199201
return this;
200202
}
201203

204+
public Builder setDisklessManagedReplicasEnabled(boolean isDisklessManagedReplicasEnabled) {
205+
this.isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled;
206+
return this;
207+
}
208+
202209
Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
203210
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
204211
return this;
@@ -241,6 +248,7 @@ ReplicationControlManager build() {
241248
defaultNumPartitions,
242249
defaultDisklessEnable,
243250
isDisklessStorageSystemEnabled,
251+
isDisklessManagedReplicasEnabled,
244252
maxElectionsPerImbalance,
245253
configurationControl,
246254
clusterControl,
@@ -318,8 +326,21 @@ static Map<String, String> translateCreationConfigs(CreatableTopicConfigCollecti
318326
*/
319327
private final boolean defaultDisklessEnable;
320328

329+
/**
330+
* When true, the diskless storage system is enabled, allowing diskless topics to be created.
331+
*/
321332
private final boolean isDisklessStorageSystemEnabled;
322333

334+
/**
335+
* When true, diskless topics use managed replicas with RF = rack_count (one replica per rack).
336+
* When false, diskless topics use legacy RF=1 behavior.
337+
*
338+
* <p>Phase 1 limitation: This config only affects topic creation. Add Partitions inherits
339+
* RF from existing partitions (correct behavior - maintains consistency within the topic).
340+
* See DISKLESS_MANAGED_RF.md for the full implementation roadmap.
341+
*/
342+
private final boolean isDisklessManagedReplicasEnabled;
343+
323344
/**
324345
* Maximum number of leader elections to perform during one partition leader balancing operation.
325346
*/
@@ -410,6 +431,7 @@ private ReplicationControlManager(
410431
int defaultNumPartitions,
411432
boolean defaultDisklessEnable,
412433
boolean isDisklessStorageSystemEnabled,
434+
boolean isDisklessManagedReplicasEnabled,
413435
int maxElectionsPerImbalance,
414436
ConfigurationControlManager configurationControl,
415437
ClusterControlManager clusterControl,
@@ -422,6 +444,7 @@ private ReplicationControlManager(
422444
this.defaultNumPartitions = defaultNumPartitions;
423445
this.defaultDisklessEnable = defaultDisklessEnable;
424446
this.isDisklessStorageSystemEnabled = isDisklessStorageSystemEnabled;
447+
this.isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled;
425448
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
426449
this.configurationControl = configurationControl;
427450
this.createTopicPolicy = createTopicPolicy;
@@ -761,6 +784,8 @@ private ApiError createTopic(ControllerRequestContext context,
761784
"when the diskless storage system is disabled. " +
762785
"Please enable the diskless storage system to create diskless topics.");
763786
}
787+
// Diskless RF: only -1 (system-computed from rack count) or 1 (backward compat) allowed.
788+
// Explicit RF > 1 rejected: users shouldn't need to know rack topology.
764789
if (Math.abs(topic.replicationFactor()) != 1) {
765790
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
766791
"Replication factor for diskless topics must be 1 or -1 to use the default value (1).");
@@ -778,7 +803,7 @@ private ApiError createTopic(ControllerRequestContext context,
778803
"A manual partition assignment was specified, but numPartitions " +
779804
"was not set to -1.");
780805
}
781-
if (disklessEnabled) {
806+
if (disklessEnabled && !isDisklessManagedReplicasEnabled) {
782807
return new ApiError(INVALID_REQUEST,
783808
"A manual partition assignment cannot be specified for diskless topics.");
784809
}
@@ -826,12 +851,15 @@ private ApiError createTopic(ControllerRequestContext context,
826851
} else {
827852
int numPartitions = topic.numPartitions() == -1 ?
828853
defaultNumPartitions : topic.numPartitions();
829-
short replicationFactor = topic.replicationFactor() == -1 ?
830-
defaultReplicationFactor : topic.replicationFactor();
854+
short classicReplicationFactor = topic.replicationFactor() == -1 ? defaultReplicationFactor : topic.replicationFactor();
855+
short disklessReplicationFactor = disklessEnabled && isDisklessManagedReplicasEnabled ? rackCardinality() : 1;
856+
short replicationFactor = disklessEnabled ? disklessReplicationFactor : classicReplicationFactor;
831857
try {
832858
TopicAssignment topicAssignment;
833859
Predicate<Integer> brokerFilter;
834-
if (!disklessEnabled) {
860+
// Diskless managed-replicas is equivalent to classic topic assignment,
861+
// but RF is defined by number of racks
862+
if (!disklessEnabled || isDisklessManagedReplicasEnabled) {
835863
topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec(
836864
0,
837865
numPartitions,
@@ -901,6 +929,31 @@ private ApiError createTopic(ControllerRequestContext context,
901929
return ApiError.NONE;
902930
}
903931

932+
/**
933+
* Computes the replication factor for diskless topics based on rack topology.
934+
* Returns the number of distinct racks in the cluster, ensuring one replica per rack.
935+
* If brokers have no rack configured, each broker is treated as its own "rack" (RF = broker count).
936+
*
937+
* @return the number of distinct racks as a short
938+
* @throws BrokerNotAvailableException if no brokers are registered
939+
* @throws InvalidReplicationFactorException if rack count exceeds Short.MAX_VALUE
940+
*/
941+
private short rackCardinality() {
942+
final Collection<BrokerRegistration> brokerRegistrations = clusterControl.brokerRegistrations().values();
943+
final long racks = brokerRegistrations.stream()
944+
.map(BrokerRegistration::rack)
945+
.distinct()
946+
.count();
947+
if (racks > Short.MAX_VALUE) {
948+
// Unfeasible but technically possible scenario.
949+
// Would require more than 32,768 brokers and each with a different rack
950+
throw new InvalidReplicationFactorException("Unexpected scenario: rack cardinality is not within short range (" + racks + "). Failing topic creation.");
951+
}
952+
if (racks == 0)
953+
throw new BrokerNotAvailableException("No brokers available to create diskless topic.");
954+
return (short) racks;
955+
}
956+
904957
private List<ApiMessageAndVersion> validConfigRecords(CreatableTopic topic, List<ApiMessageAndVersion> configRecords, boolean disklessEnabled) {
905958
final List<ApiMessageAndVersion> validConfigRecord = new ArrayList<>();
906959
boolean isDisklessEnableDefined = false;

0 commit comments

Comments
 (0)