Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/inkless-nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ jobs:
timeout ${TIMEOUT_MINUTES}m \
./gradlew ${GRADLE_ARGS} :storage:inkless:test :storage:inkless:integrationTest && \
./gradlew ${GRADLE_ARGS} :metadata:test --tests "org.apache.kafka.controller.*" && \
./gradlew ${GRADLE_ARGS} :core:test --tests "*Inkless*" && \
./gradlew ${GRADLE_ARGS} :core:test --tests "*Inkless*" --tests "*Diskless*" && \
./gradlew ${GRADLE_ARGS} :core:test --tests "kafka.server.*" && \
./gradlew ${GRADLE_ARGS} :core:test --tests "kafka.api.*"
./gradlew ${GRADLE_ARGS} :core:test --tests "kafka.api.*" && \
./gradlew ${GRADLE_ARGS} :core:test --tests "kafka.log.*"
exitcode="$?"
echo "exitcode=$exitcode" >> $GITHUB_OUTPUT
- name: Archive JUnit HTML reports
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/inkless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ jobs:
timeout ${TIMEOUT_MINUTES}m \
./gradlew ${GRADLE_ARGS} :storage:inkless:test :storage:inkless:integrationTest && \
./gradlew ${GRADLE_ARGS} :metadata:test --tests "org.apache.kafka.controller.*" && \
./gradlew ${GRADLE_ARGS} :core:test --tests "*Inkless*"
./gradlew ${GRADLE_ARGS} :core:test --tests "*Inkless*" --tests "*Diskless*" --tests "kafka.log.LogConfigTest"
Comment thread
jeqo marked this conversation as resolved.
exitcode="$?"
echo "exitcode=$exitcode" >> $GITHUB_OUTPUT
- name: Archive JUnit HTML reports
Expand Down
47 changes: 44 additions & 3 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,50 @@ class LogConfigTest {
// Mutual exclusion still applies when existing remote.storage.enable=false
assertInvalid(existingWithRemoteFalse, setDisklessTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true)
// Classic-to-diskless migration: setting diskless.enable=true on a topic with remote.storage.enable=true is valid.
// In the controller flow, only the requested configs (here, diskless.enable=true) are passed as props; the
// existing configs (here, remote.storage.enable=true) come from existingWithRemoteTrue.
assertValid(existingWithRemoteTrue, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true)
// In the real controller flow (ConfigurationControlManager.validateAlterConfig), props contains the merged
// state of existing overrides + requested changes, so remote.storage.enable=true is included in props.
val setDisklessTrueWithExistingRemoteTrue = topicProps(
TopicConfig.DISKLESS_ENABLE_CONFIG -> "true",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true"
)
assertValid(existingWithRemoteTrue, setDisklessTrueWithExistingRemoteTrue, kafkaConfig, disklessAllowFromClassic = true)
// Migration not allowed without allowFromClassic even with merged props
assertInvalid(existingWithRemoteTrue, setDisklessTrueWithExistingRemoteTrue,
"It is invalid to enable diskless on an already existing topic.", kafkaConfig, disklessAllowFromClassic = false)

Comment thread
jeqo marked this conversation as resolved.
// Updating both diskless.enable=true and remote.storage.enable=false in the same request must be rejected
// by mutual exclusion, even when allowFromClassic=true.
val setDisklessTrueAndRemoteFalse = topicProps(
TopicConfig.DISKLESS_ENABLE_CONFIG -> "true",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "false"
)
assertInvalid(existingWithRemoteTrue, setDisklessTrueAndRemoteFalse,
mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true)

// Case 1c: Steady state after migration — a migrated topic has both diskless.enable=true and
// remote.storage.enable=true in existing configs. Updating an unrelated config (e.g. retention.ms)
// must not be blocked by mutual exclusion. In the controller path, props is the merged state,
// so both configs appear in requestedConfigs.
val existingMigrated = util.Map.of(
TopicConfig.DISKLESS_ENABLE_CONFIG, "true",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"
)
val setRetentionOnMigrated = topicProps(
TopicConfig.DISKLESS_ENABLE_CONFIG -> "true",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true",
TopicConfig.RETENTION_MS_CONFIG -> "86400000"
)
assertValid(existingMigrated, setRetentionOnMigrated, kafkaConfig, disklessAllowFromClassic = true)

Comment thread
jeqo marked this conversation as resolved.
// Case 1d: Enabling diskless while simultaneously disabling remote storage (with delete) must still
// be rejected. The migration bypass only applies when remote storage remains enabled.
val setDisklessTrueAndRemoteFalseWithDelete = topicProps(
TopicConfig.DISKLESS_ENABLE_CONFIG -> "true",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "false",
TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG -> "true"
)
assertInvalid(existingWithRemoteTrue, setDisklessTrueAndRemoteFalseWithDelete,
mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true)

// Case 2: set diskless.enable=false with allowFromClassic=true - disabling diskless is still forbidden
val setDisklessFalse = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,19 @@ private static void validateDiskless(Map<String, String> existingConfigs,
validateDisklessTransition(isCreation, isDisklessExplicitlySet, isDisklessEnabled, wasDisklessEnabled, isDisklessAllowFromClassicEnabled);

// Only one between diskless.enable and remote.storage.enable can be set, no matter the value.
// Exception: when classic-to-diskless migration is allowed, we permit setting diskless.enable
// on a topic that already has remote.storage.enable (the migration flow handles this).
// Exception: when classic-to-diskless migration is allowed, we permit diskless.enable=true
// on a topic that already had remote.storage.enable=true — both during the migration itself
// and in steady state afterward (migrated topics retain both configs).
// Note: in the controller path, requestedConfigs is the merged state (existing + changes),
// so we cannot distinguish "client set this" from "already existed". We detect the exception
// by checking that diskless is (or will be) enabled and remote storage was and remains enabled.
final boolean wasRemoteStorageEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"));
final boolean isClassicToDisklessMigration = isDisklessAllowFromClassicEnabled
&& isDisklessExplicitlySet && isDisklessEnabled && !isRemoteStorageExplicitlySet
&& wasRemoteStorageExplicitlySet && wasRemoteStorageEnabled;
if (!isClassicToDisklessMigration) {
final boolean requestedRemoteStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
final boolean isDisklessWithRemoteStorageLegacy = isDisklessAllowFromClassicEnabled
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not getting why this is called Legacy

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Maybe isMigratedFromClassicWithRemoteStorage or similar is better. @viktorsomogyi maybe this can be fixed as part of #556?

&& isDisklessEnabled
&& wasRemoteStorageExplicitlySet && wasRemoteStorageEnabled
&& requestedRemoteStorageEnabled;
Comment thread
jeqo marked this conversation as resolved.
if (!isDisklessWithRemoteStorageLegacy) {
final boolean hasExplicitDiskless = isDisklessExplicitlySet || wasDisklessExplicitlySet;
final boolean hasExplicitRemoteStorage = isRemoteStorageExplicitlySet || wasRemoteStorageExplicitlySet;
validateDisklessAndRemoteStorageMutualExclusion(isDisklessExplicitlySet, isRemoteStorageExplicitlySet, hasExplicitDiskless, hasExplicitRemoteStorage);
Comment thread
jeqo marked this conversation as resolved.
Expand Down
Loading