diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 4198077627..4ae4201a38 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -57,9 +57,8 @@ import org.apache.kafka.common.utils.{Exit, Time, Utils} import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig} import org.apache.kafka.image.{LocalReplicaChanges, MetadataDelta, MetadataImage, TopicsDelta} import org.apache.kafka.logger.StateChangeLogger -import org.apache.kafka.metadata.LeaderAndIsr +import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache, PartitionRegistration} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER -import org.apache.kafka.metadata.MetadataCache import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TransactionVersion} import org.apache.kafka.server.log.remote.TopicPartitionLog import org.apache.kafka.server.config.ReplicationConfigs @@ -1871,13 +1870,56 @@ class ReplicaManager(val config: KafkaConfig, return } - val (disklessFetchInfosWithoutTopicId, classicFetchInfos) = fetchInfos.partition { case (k, _) => _inklessMetadataView.isDisklessTopic(k.topic()) } + val disklessFetchInfosWithoutTopicId = new mutable.ArrayBuffer[(TopicIdPartition, PartitionData)]() + val classicFetchInfos = new mutable.ArrayBuffer[(TopicIdPartition, PartitionData)]() + val invalidDisklessFetchResponses = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionData)]() + + fetchInfos.foreach { case (tp, partitionData) => + val fetchInfo = tp -> partitionData + if (!_inklessMetadataView.isDisklessTopic(tp.topic())) { + classicFetchInfos += fetchInfo + } else { + val disklessStartOffset = _inklessMetadataView.getDisklessStartOffset(tp.topicPartition()) + val shouldReadFromUnifiedLog = + disklessStartOffset != PartitionRegistration.NO_DISKLESS_START_OFFSET && + partitionData.fetchOffset < disklessStartOffset + + (shouldReadFromUnifiedLog, config.disklessManagedReplicasEnabled) match { + case (false, _) => + disklessFetchInfosWithoutTopicId += fetchInfo + // Cannot read from UnifiedLog on a diskless topic if diskless managed replicas are not enabled. + case (true, false) => + invalidDisklessFetchResponses += tp -> new FetchPartitionData( + Errors.INVALID_REQUEST, + UnifiedLog.UNKNOWN_OFFSET, + UnifiedLog.UNKNOWN_OFFSET, + MemoryRecords.EMPTY, + Optional.empty(), + OptionalLong.empty(), + Optional.empty(), + OptionalInt.empty(), + false + ) + case (true, true) => + classicFetchInfos += fetchInfo + } + } + } + + def respond(response: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = + responseCallback(response ++ invalidDisklessFetchResponses) + + if (classicFetchInfos.isEmpty && disklessFetchInfosWithoutTopicId.isEmpty && invalidDisklessFetchResponses.nonEmpty) { + respond(Seq.empty) + return + } + inklessSharedState match { case None => if (disklessFetchInfosWithoutTopicId.nonEmpty) { error(s"Received diskless fetch request for topics ${disklessFetchInfosWithoutTopicId.map(_._1.topic()).distinct.mkString(", ")} but diskless storage system is not enabled. " + s"Replying an empty response.") - responseCallback(Seq.empty) + respond(Seq.empty) return } case Some(_) => @@ -1890,7 +1932,7 @@ class ReplicaManager(val config: KafkaConfig, _inklessMetadataView.getTopicId(topicIdPartition.topic()) match { case Uuid.ZERO_UUID => error(s"Got null topic id from KRaft metadata for diskless topic ${topicIdPartition.topic()}") - responseCallback(Seq.empty) + respond(Seq.empty) return case topicId => new TopicIdPartition(topicId, topicIdPartition.topicPartition()) -> partitionData @@ -1900,10 +1942,10 @@ class ReplicaManager(val config: KafkaConfig, } } - - if (params.isFromFollower && disklessFetchInfos.nonEmpty) { - warn("Diskless topics are not supported for follower fetch requests. " + - s"Request from follower ${params.replicaId} contains diskless topics: ${disklessFetchInfos.map(_._1.topic()).mkString(", ")}") + if (!config.disklessManagedReplicasEnabled && params.isFromFollower && disklessFetchInfos.nonEmpty) { + warn(s"Follower fetch from replica ${params.replicaId} for diskless topics " + + s"${disklessFetchInfos.map(_._1.topic()).distinct.mkString(", ")} " + + s"rejected: managed replicas are not enabled.") responseCallback(Seq.empty) return } @@ -1929,7 +1971,7 @@ class ReplicaManager(val config: KafkaConfig, quota = quota, maxWaitMs = Some(maxWaitMs), minBytes = Some(minBytes), - responseCallback = responseCallback, + responseCallback = respond, ) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation @@ -1994,7 +2036,7 @@ class ReplicaManager(val config: KafkaConfig, // 6) has a preferred read replica if (remoteFetchInfos.isEmpty && disklessFetchInfos.isEmpty && (params.maxWaitMs <= 0 || bytesReadable >= params.minBytes || errorReadingData || hasDivergingEpoch || hasPreferredReadReplica)) { - responseCallback(fetchPartitionData) + respond(fetchPartitionData) } else { // construct the fetch results from the read results val fetchPartitionStatus = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] @@ -2036,10 +2078,10 @@ class ReplicaManager(val config: KafkaConfig, } } logReadResultMap.putAll(disklessFetchResults) - processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResultMap, fetchPartitionStatus) + processRemoteFetches(remoteFetchInfos, params, respond, logReadResultMap, fetchPartitionStatus) } else { if (disklessFetchInfos.isEmpty && (bytesReadable >= params.minBytes || params.maxWaitMs <= 0)) { - responseCallback(fetchPartitionData) + respond(fetchPartitionData) } else { delayedResponse(fetchPartitionStatus) } diff --git a/core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala b/core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala index 80ea4cfe9f..a2fde62e20 100644 --- a/core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala +++ b/core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala @@ -21,7 +21,8 @@ package kafka.server.metadata import io.aiven.inkless.control_plane.MetadataView import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.{Node, TopicIdPartition, Uuid} +import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.metadata.PartitionRegistration import org.apache.kafka.storage.internals.log.LogConfig import java.util @@ -82,6 +83,13 @@ class InklessMetadataView(val metadataCache: KRaftMetadataCache, val defaultConf .collect(Collectors.toSet[TopicIdPartition]()) } + def getDisklessStartOffset(topicPartition: TopicPartition): Long = { + Option(metadataCache.currentImage().topics().getTopic(topicPartition.topic())) + .flatMap(topicImage => Option(topicImage.partitions().get(topicPartition.partition()))) + .map(_.disklessStartOffset) + .getOrElse(PartitionRegistration.NO_DISKLESS_START_OFFSET) + } + override def getTopicConfig(topicName: String): LogConfig = topicConfigs.computeIfAbsent(topicName, t => { val props = metadataCache.topicConfig(t) if (props.isEmpty) new LogConfig(getDefaultConfig) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 037eca7d7a..aa4ed6a830 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -66,7 +66,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, PartitionRegistration} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition, TransactionVersion} -import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs} +import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.log.remote.TopicPartitionLog import org.apache.kafka.server.log.remote.storage._ import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} @@ -6338,25 +6338,183 @@ class ReplicaManagerTest { } @Test - def testFetchFailDisklessWhenFromReplica(): Unit = { - // Given a topic partition that is diskless, we should not be able to fetch from it from a follower. - val replicaManager = createReplicaManager(List(disklessTopicPartition.topic())) - val fetchParams = new FetchParams( - 1, 1L, // follower fetch - 10L, 100, 200, FetchIsolation.HIGH_WATERMARK, Optional.empty() - ) - val fetchInfos = Seq( - disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 10L, 0L, 123, Optional.empty()) + def testFetchDisklessBelowStartOffsetReadsFromClassicLogWhenManagedReplicasEnabled(): Unit = { + val fetchHandlerCtor = mockFetchHandler(Map.empty) + try { + val cp = mock(classOf[ControlPlane]) + // Given managed replicas enabled + val replicaManager = spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp), disklessManagedReplicasEnabled = true)) + + // Given a diskless topic with disklessStartOffset = 100 + when(replicaManager.inklessMetadataView().getDisklessStartOffset(disklessTopicPartition.topicPartition())).thenReturn(100L) + + // Given a classic log read result for offsets below diskless start offset + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(1L, 0L, 0), RECORDS), + Optional.empty(), 10L, 0L, 10L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + // When fetching messages below the diskless start offset + val fetchParams = new FetchParams( + -1, -1L, + 0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + // Then the request is served from the unified log and not from diskless fetch handler + assertNotNull(responseData) + assertEquals(1, responseData.size) + assertEquals(RECORDS, responseData(disklessTopicPartition).records) + verify(replicaManager, times(1)).readFromLog(any(), any(), any(), any()) + verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any()) + verify(cp, never()).findBatches(any(), any(), any()) + } finally { + fetchHandlerCtor.close() + } + } + + @Test + def testFetchDisklessBelowStartOffsetFailsWhenManagedReplicasDisabled(): Unit = { + val fetchHandlerCtor = mockFetchHandler(Map.empty) + try { + val cp = mock(classOf[ControlPlane]) + // Given managed replicas are disabled + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = false, + )) + + // Given a diskless topic with disklessStartOffset = 100 + when(replicaManager.inklessMetadataView().getDisklessStartOffset(disklessTopicPartition.topicPartition())).thenReturn(100L) + + // When fetching messages below the diskless start offset + val fetchParams = new FetchParams( + -1, -1L, + 0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + // Then the request fails + assertNotNull(responseData) + assertEquals(1, responseData.size) + assertEquals(Errors.INVALID_REQUEST, responseData(disklessTopicPartition).error) + assertEquals(MemoryRecords.EMPTY, responseData(disklessTopicPartition).records) + verify(replicaManager, never()).readFromLog(any(), any(), any(), any()) + verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any()) + verify(cp, never()).findBatches(any(), any(), any()) + } finally { + fetchHandlerCtor.close() + } + } + + @Test + def testFetchFailDisklessWhenFromReplicaAndUnmanagedReplicas(): Unit = { + val fetchHandlerCtor = mockFetchHandler(Map.empty) + try { + // Given a topic partition that is diskless and managed replicas are disabled + val replicaManager = createReplicaManager(List(disklessTopicPartition.topic()), disklessManagedReplicasEnabled = false) + val fetchParams = new FetchParams( + 1, 1L, // follower fetch + 10L, 100, 200, FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 10L, 0L, 123, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + + // When we try to fetch messages from the diskless topic partition with a follower fetch, + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + // Then we should get an immediate empty response. + assertNotNull(responseData) + assertEquals(0, responseData.size) + verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any()) + } finally { + fetchHandlerCtor.close() + } + } + + @ParameterizedTest(name = "testFetchDisklessAtOrAboveStartOffsetUsesDiskless with managedReplicasEnabled: {0}") + @ValueSource(booleans = Array(true, false)) + def testFetchDisklessAtOrAboveDisklessStartOffset(managedReplicasEnabled: Boolean): Unit = { + val disklessResponse = Map(disklessTopicPartition -> + new FetchPartitionData( + Errors.NONE, + 110L, 100L, + RECORDS, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) ) - @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null - val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { - responseData = response.toMap + val fetchHandlerCtor = mockFetchHandler(disklessResponse) + try { + val batchMetadata = mock(classOf[BatchMetadata]) + when(batchMetadata.topicIdPartition()).thenReturn(disklessTopicPartition) + val batch = mock(classOf[BatchInfo]) + when(batch.metadata()).thenReturn(batchMetadata) + val findBatchResponse = mock(classOf[FindBatchResponse]) + when(findBatchResponse.batches()).thenReturn(util.List.of(batch)) + when(findBatchResponse.highWatermark()).thenReturn(110L) + when(findBatchResponse.estimatedByteSize(100L)).thenReturn(RECORDS.sizeInBytes()) + when(findBatchResponse.errors()).thenReturn(Errors.NONE) + val cp = mock(classOf[ControlPlane]) + when(cp.findBatches(any(), any(), any())).thenReturn(util.List.of(findBatchResponse)) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = managedReplicasEnabled, + )) + + // Given a diskless topic with disklessStartOffset = 100 + when(replicaManager.inklessMetadataView().getDisklessStartOffset(disklessTopicPartition.topicPartition())).thenReturn(100L) + + // When fetching messages at the diskless start offset + val fetchParams = new FetchParams( + -1, -1L, + 0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 100L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + // Then the request is served from diskless fetch handler and not from the unified log + assertNotNull(responseData) + assertEquals(1, responseData.size) + assertEquals(Errors.NONE, responseData(disklessTopicPartition).error) + assertEquals(RECORDS, responseData(disklessTopicPartition).records) + verify(replicaManager, never()).readFromLog(any(), any(), any(), any()) + verify(fetchHandlerCtor.constructed().get(0), times(1)).handle(any(), any()) + verify(cp, times(1)).findBatches(any(), any(), any()) + } finally { + fetchHandlerCtor.close() } - // When we try to fetch messages from the diskless topic partition with a follower fetch, - replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) - // Then we should get an immediate empty response. - assertNotNull(responseData) - assertEquals(0, responseData.size) } @Test @@ -6925,9 +7083,11 @@ class ReplicaManagerTest { private def createReplicaManager( disklessTopics: Seq[String], controlPlane: Option[ControlPlane] = None, - topicIdMapping: Map[String, Uuid] = Map.empty + topicIdMapping: Map[String, Uuid] = Map.empty, + disklessManagedReplicasEnabled: Boolean = false ): ReplicaManager = { val props = TestUtils.createBrokerConfig(1, logDirCount = 2) + props.put(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, disklessManagedReplicasEnabled.toString) val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties())) val sharedState = mock(classOf[SharedState], Answers.RETURNS_DEEP_STUBS)