From 3440b23ce421b55fb5517453e3ea3ad9ddf71102 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 27 Mar 2026 11:26:31 -0500 Subject: [PATCH] fix(inkless:migration): bypass diskless/remote-storage mutual exclusion for classic-to-diskless migration When diskless.allow.from.classic.enable is true and a topic is being migrated from classic (remote.storage.enable=true) to diskless, skip the mutual exclusion validation that prevents both configs from being set simultaneously. The bypass only triggers when all conditions are met: - diskless.allow.from.classic.enable is true (broker config) - diskless.enable is being explicitly set to true (request) - remote.storage.enable was previously set to true (existing topic config) Co-Authored-By: Claude Opus 4.6 --- .../test/scala/unit/kafka/log/LogConfigTest.scala | 7 +++++-- .../kafka/storage/internals/log/LogConfig.java | 14 +++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 7af1a6ee0b..d0c29568a3 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -569,9 +569,12 @@ class LogConfigTest { assertValid(existingWithoutDisklessOrRemote, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true) assertValid(existingWithDisklessFalse, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true) assertValid(existingWithDisklessTrue, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true) - // Mutual exclusion still applies even with allowFromClassic + // Mutual exclusion still applies when existing remote.storage.enable=false assertInvalid(existingWithRemoteFalse, setDisklessTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true) - assertInvalid(existingWithRemoteTrue, setDisklessTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true) + // Classic-to-diskless migration: setting diskless.enable=true on a topic with remote.storage.enable=true is valid. + // In the controller flow, only the requested configs (here, diskless.enable=true) are passed as props; the + // existing configs (here, remote.storage.enable=true) come from existingWithRemoteTrue. + assertValid(existingWithRemoteTrue, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true) // Case 2: set diskless.enable=false with allowFromClassic=true - disabling diskless is still forbidden val setDisklessFalse = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "false") diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index ceae6b0653..77e4260d85 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -522,9 +522,17 @@ private static void validateDiskless(Map existingConfigs, validateDisklessTransition(isCreation, isDisklessExplicitlySet, isDisklessEnabled, wasDisklessEnabled, isDisklessAllowFromClassicEnabled); // Only one between diskless.enable and remote.storage.enable can be set, no matter the value. - final boolean hasExplicitDiskless = isDisklessExplicitlySet || wasDisklessExplicitlySet; - final boolean hasExplicitRemoteStorage = isRemoteStorageExplicitlySet || wasRemoteStorageExplicitlySet; - validateDisklessAndRemoteStorageMutualExclusion(isDisklessExplicitlySet, isRemoteStorageExplicitlySet, hasExplicitDiskless, hasExplicitRemoteStorage); + // Exception: when classic-to-diskless migration is allowed, we permit setting diskless.enable + // on a topic that already has remote.storage.enable (the migration flow handles this). + final boolean wasRemoteStorageEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); + final boolean isClassicToDisklessMigration = isDisklessAllowFromClassicEnabled + && isDisklessExplicitlySet && isDisklessEnabled && !isRemoteStorageExplicitlySet + && wasRemoteStorageExplicitlySet && wasRemoteStorageEnabled; + if (!isClassicToDisklessMigration) { + final boolean hasExplicitDiskless = isDisklessExplicitlySet || wasDisklessExplicitlySet; + final boolean hasExplicitRemoteStorage = isRemoteStorageExplicitlySet || wasRemoteStorageExplicitlySet; + validateDisklessAndRemoteStorageMutualExclusion(isDisklessExplicitlySet, isRemoteStorageExplicitlySet, hasExplicitDiskless, hasExplicitRemoteStorage); + } } private static void validateDisklessTransition(boolean isCreation,