Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -539,10 +539,14 @@ public void replay(PartitionRecord record) {
isReassignmentInProgress(prevPartInfo), isReassignmentInProgress(newPartInfo));
}

if (newPartInfo.hasPreferredLeader()) {
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
} else {
imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
// Diskless topics are excluded: the metadata transformer handles leader routing,
// so tracking preferred leader imbalance is unnecessary.
if (!isDisklessTopic(topicInfo.name)) {
if (newPartInfo.hasPreferredLeader()) {
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
} else {
imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
}
}
}

Expand Down Expand Up @@ -585,10 +589,12 @@ public void replay(PartitionChangeRecord record) {
record.topicId();
newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo);

if (newPartitionInfo.hasPreferredLeader()) {
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
} else {
imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
if (!isDisklessTopic(topicInfo.name)) {
if (newPartitionInfo.hasPreferredLeader()) {
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
} else {
imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
}
}

if (record.removingReplicas() != null || record.addingReplicas() != null) {
Expand Down Expand Up @@ -1452,8 +1458,7 @@ ControllerResult<InitDisklessLogResponseData> initDisklessLog(
}

TopicControlInfo topic = topics.get(topicId);
boolean isDisklessTopic = Boolean.parseBoolean(
configurationControl.currentTopicConfig(topic.name).getOrDefault(DISKLESS_ENABLE_CONFIG, "false"));
boolean isDisklessTopic = isDisklessTopic(topic.name);
if (isDisklessTopic) {
for (InitDisklessLogRequestData.PartitionData partitionData : topicData.partitions()) {
partitionResponses.add(new InitDisklessLogResponseData.PartitionResponse()
Expand Down Expand Up @@ -2120,6 +2125,16 @@ void maybeTriggerLeaderChangeForPartitionsWithoutPreferredLeader(
continue;
}

// Skip diskless topics: the metadata transformer handles leader routing,
// so controller-level preferred leader election is unnecessary.
// After reassignment, the leader may not match the preferred replica (replicas[0])
// because PartitionChangeBuilder.electLeader() prefers keeping the current leader.
// This is intentional — the metadata transformer routes requests independently of
// the preferred replica order.
if (isDisklessTopic(topic.name)) {
continue;
}

PartitionRegistration partition = topic.parts.get(topicPartition.partitionId());
if (partition == null) {
log.error("Skipping unknown imbalanced partition {}", topicPartition);
Expand Down Expand Up @@ -2449,9 +2464,7 @@ void generateLeaderAndIsrUpdates(String context,
.setAllowReplicationFactorChange(allowRFChange);
int successfulAlterations = 0, totalAlterations = 0;
for (ReassignableTopic topic : request.topics()) {
boolean effectiveRFChange = allowRFChange
&& !Boolean.parseBoolean(configurationControl.currentTopicConfig(topic.name()).getOrDefault(
DISKLESS_ENABLE_CONFIG, "false"));
boolean effectiveRFChange = allowRFChange && !isDisklessTopic(topic.name());
ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
setName(topic.name());
for (ReassignablePartition partition : topic.partitions()) {
Expand Down Expand Up @@ -2586,23 +2599,63 @@ Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
List<Integer> currentReplicas = Replicas.toList(part.replicas);
PartitionReassignmentReplicas reassignment =
new PartitionReassignmentReplicas(currentAssignment, targetAssignment);

boolean isDiskless = isDisklessTopic(topics.get(tp.topicId()).name);

// Diskless topics don't use local directories — skip the directory check in leader election.
// Any active (unfenced, not in controlled shutdown) broker can lead a diskless partition,
// since it reads all data from object storage rather than local disk.
IntPredicate leaderAcceptor = isDiskless
? clusterControl::isActive
: new LeaderAcceptor(clusterControl, part);

PartitionChangeBuilder builder = new PartitionChangeBuilder(
part,
tp.topicId(),
tp.partitionId(),
new LeaderAcceptor(clusterControl, part),
leaderAcceptor,
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topics.get(tp.topicId()).name)
);
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}
if (!reassignment.removing().isEmpty()) {
builder.setTargetRemoving(reassignment.removing());
}
if (!reassignment.adding().isEmpty()) {
builder.setTargetAdding(reassignment.adding());

if (isDiskless) {
// Diskless: data is in object storage, no replica sync needed.
// Apply target replicas directly — skip the staged adding/removing process
// (no addingReplicas/removingReplicas). This is safe because:
//
// 1. No offline risk: the reassignment is rejected if all target brokers are
// fenced (see activeIsr check below). As long as at least one target broker
// is active, PartitionChangeBuilder.electLeader() will elect it as leader
// since the leaderAcceptor above only requires isActive (no directory check).
//
// 2. Cache warming: new leaders start with a cold InklessMetadataView cache,
// which may cause higher fetch latency and increased object storage reads
// until the cache is populated. This is a transient effect — the architecture
// already assumes brokers are interchangeable since all data lives in object
// storage. A future optimization could pre-warm target brokers' caches before
// completing the reassignment.
if (!target.replicas().equals(currentReplicas)) {
List<Integer> activeIsr = target.replicas().stream()
.filter(clusterControl::isActive)
.toList();
if (activeIsr.isEmpty()) {
throw new InvalidReplicaAssignmentException(
"None of the target replicas " + target.replicas() + " are active.");
}
builder.setTargetReplicas(target.replicas());
builder.setTargetIsr(activeIsr);
}
} else {
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}
if (!reassignment.removing().isEmpty()) {
builder.setTargetRemoving(reassignment.removing());
}
if (!reassignment.adding().isEmpty()) {
builder.setTargetAdding(reassignment.adding());
}
}
return builder.setDefaultDirProvider(clusterDescriber).build();
}
Expand Down Expand Up @@ -2825,6 +2878,12 @@ private void validatePartitionReplicationFactorUnchanged(PartitionRegistration p
}
}

private boolean isDisklessTopic(String topicName) {
return Boolean.parseBoolean(
configurationControl.currentTopicConfig(topicName)
.getOrDefault(DISKLESS_ENABLE_CONFIG, "false"));
}
Comment thread
jeqo marked this conversation as resolved.

private record IneligibleReplica(int replicaId, String reason) {

@Override
Expand Down
Loading
Loading