Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 55 additions & 13 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataDelta, MetadataImage, TopicsDelta}
import org.apache.kafka.logger.StateChangeLogger
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache, PartitionRegistration}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TransactionVersion}
import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.config.ReplicationConfigs
Expand Down Expand Up @@ -1871,13 +1870,56 @@ class ReplicaManager(val config: KafkaConfig,
return
}

val (disklessFetchInfosWithoutTopicId, classicFetchInfos) = fetchInfos.partition { case (k, _) => _inklessMetadataView.isDisklessTopic(k.topic()) }
val disklessFetchInfosWithoutTopicId = new mutable.ArrayBuffer[(TopicIdPartition, PartitionData)]()
val classicFetchInfos = new mutable.ArrayBuffer[(TopicIdPartition, PartitionData)]()
val invalidDisklessFetchResponses = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionData)]()

fetchInfos.foreach { case (tp, partitionData) =>
val fetchInfo = tp -> partitionData
if (!_inklessMetadataView.isDisklessTopic(tp.topic())) {
classicFetchInfos += fetchInfo
} else {
val disklessStartOffset = _inklessMetadataView.getDisklessStartOffset(tp.topicPartition())
val shouldReadFromUnifiedLog =
disklessStartOffset != PartitionRegistration.NO_DISKLESS_START_OFFSET &&
partitionData.fetchOffset < disklessStartOffset

(shouldReadFromUnifiedLog, config.disklessManagedReplicasEnabled) match {
case (false, _) =>
disklessFetchInfosWithoutTopicId += fetchInfo
// Cannot read from UnifiedLog on a diskless topic if diskless managed replicas are not enabled.
case (true, false) =>
invalidDisklessFetchResponses += tp -> new FetchPartitionData(
Errors.INVALID_REQUEST,
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
MemoryRecords.EMPTY,
Optional.empty(),
OptionalLong.empty(),
Optional.empty(),
OptionalInt.empty(),
false
)
case (true, true) =>
classicFetchInfos += fetchInfo
}
}
Comment thread
giuseppelillo marked this conversation as resolved.
}

def respond(response: Seq[(TopicIdPartition, FetchPartitionData)]): Unit =
responseCallback(response ++ invalidDisklessFetchResponses)

if (classicFetchInfos.isEmpty && disklessFetchInfosWithoutTopicId.isEmpty && invalidDisklessFetchResponses.nonEmpty) {
respond(Seq.empty)
return
}

inklessSharedState match {
case None =>
if (disklessFetchInfosWithoutTopicId.nonEmpty) {
error(s"Received diskless fetch request for topics ${disklessFetchInfosWithoutTopicId.map(_._1.topic()).distinct.mkString(", ")} but diskless storage system is not enabled. " +
s"Replying an empty response.")
responseCallback(Seq.empty)
respond(Seq.empty)
return
}
case Some(_) =>
Expand All @@ -1890,7 +1932,7 @@ class ReplicaManager(val config: KafkaConfig,
_inklessMetadataView.getTopicId(topicIdPartition.topic()) match {
case Uuid.ZERO_UUID =>
error(s"Got null topic id from KRaft metadata for diskless topic ${topicIdPartition.topic()}")
responseCallback(Seq.empty)
respond(Seq.empty)
return
case topicId =>
new TopicIdPartition(topicId, topicIdPartition.topicPartition()) -> partitionData
Expand All @@ -1900,10 +1942,10 @@ class ReplicaManager(val config: KafkaConfig,
}
}


if (params.isFromFollower && disklessFetchInfos.nonEmpty) {
warn("Diskless topics are not supported for follower fetch requests. " +
s"Request from follower ${params.replicaId} contains diskless topics: ${disklessFetchInfos.map(_._1.topic()).mkString(", ")}")
if (!config.disklessManagedReplicasEnabled && params.isFromFollower && disklessFetchInfos.nonEmpty) {
warn(s"Follower fetch from replica ${params.replicaId} for diskless topics " +
s"${disklessFetchInfos.map(_._1.topic()).distinct.mkString(", ")} " +
s"rejected: managed replicas are not enabled.")
responseCallback(Seq.empty)
return
}
Expand All @@ -1929,7 +1971,7 @@ class ReplicaManager(val config: KafkaConfig,
quota = quota,
maxWaitMs = Some(maxWaitMs),
minBytes = Some(minBytes),
responseCallback = responseCallback,
responseCallback = respond,
)

// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
Expand Down Expand Up @@ -1994,7 +2036,7 @@ class ReplicaManager(val config: KafkaConfig,
// 6) has a preferred read replica
if (remoteFetchInfos.isEmpty && disklessFetchInfos.isEmpty && (params.maxWaitMs <= 0 || bytesReadable >= params.minBytes || errorReadingData ||
hasDivergingEpoch || hasPreferredReadReplica)) {
responseCallback(fetchPartitionData)
respond(fetchPartitionData)
} else {
// construct the fetch results from the read results
val fetchPartitionStatus = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]
Expand Down Expand Up @@ -2036,10 +2078,10 @@ class ReplicaManager(val config: KafkaConfig,
}
}
logReadResultMap.putAll(disklessFetchResults)
processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResultMap, fetchPartitionStatus)
processRemoteFetches(remoteFetchInfos, params, respond, logReadResultMap, fetchPartitionStatus)
} else {
if (disklessFetchInfos.isEmpty && (bytesReadable >= params.minBytes || params.maxWaitMs <= 0)) {
responseCallback(fetchPartitionData)
respond(fetchPartitionData)
} else {
delayedResponse(fetchPartitionStatus)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ package kafka.server.metadata
import io.aiven.inkless.control_plane.MetadataView
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Node, TopicIdPartition, Uuid}
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.storage.internals.log.LogConfig

import java.util
Expand Down Expand Up @@ -82,6 +83,13 @@ class InklessMetadataView(val metadataCache: KRaftMetadataCache, val defaultConf
.collect(Collectors.toSet[TopicIdPartition]())
}

def getDisklessStartOffset(topicPartition: TopicPartition): Long = {
Option(metadataCache.currentImage().topics().getTopic(topicPartition.topic()))
.flatMap(topicImage => Option(topicImage.partitions().get(topicPartition.partition())))
.map(_.disklessStartOffset)
.getOrElse(PartitionRegistration.NO_DISKLESS_START_OFFSET)
}

override def getTopicConfig(topicName: String): LogConfig = topicConfigs.computeIfAbsent(topicName, t => {
val props = metadataCache.topicConfig(t)
if (props.isEmpty) new LogConfig(getDefaultConfig)
Expand Down
198 changes: 179 additions & 19 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, PartitionRegistration}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition, TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
Expand Down Expand Up @@ -6338,25 +6338,183 @@ class ReplicaManagerTest {
}

@Test
def testFetchFailDisklessWhenFromReplica(): Unit = {
// Given a topic partition that is diskless, we should not be able to fetch from it from a follower.
val replicaManager = createReplicaManager(List(disklessTopicPartition.topic()))
val fetchParams = new FetchParams(
1, 1L, // follower fetch
10L, 100, 200, FetchIsolation.HIGH_WATERMARK, Optional.empty()
)
val fetchInfos = Seq(
disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 10L, 0L, 123, Optional.empty())
def testFetchDisklessBelowStartOffsetReadsFromClassicLogWhenManagedReplicasEnabled(): Unit = {
val fetchHandlerCtor = mockFetchHandler(Map.empty)
try {
val cp = mock(classOf[ControlPlane])
// Given managed replicas enabled
val replicaManager = spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp), disklessManagedReplicasEnabled = true))

// Given a diskless topic with disklessStartOffset = 100
when(replicaManager.inklessMetadataView().getDisklessStartOffset(disklessTopicPartition.topicPartition())).thenReturn(100L)

// Given a classic log read result for offsets below diskless start offset
doReturn(Seq(disklessTopicPartition ->
new LogReadResult(
new FetchDataInfo(new LogOffsetMetadata(1L, 0L, 0), RECORDS),
Optional.empty(), 10L, 0L, 10L, 0L, 0L, OptionalLong.empty(), Errors.NONE
))
).when(replicaManager).readFromLog(any(), any(), any(), any())

// When fetching messages below the diskless start offset
val fetchParams = new FetchParams(
-1, -1L,
0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()
)
val fetchInfos = Seq(
disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty())
)

@volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
responseData = response.toMap
}
replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)

// Then the request is served from the unified log and not from diskless fetch handler
assertNotNull(responseData)
assertEquals(1, responseData.size)
assertEquals(RECORDS, responseData(disklessTopicPartition).records)
verify(replicaManager, times(1)).readFromLog(any(), any(), any(), any())
verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any())
verify(cp, never()).findBatches(any(), any(), any())
} finally {
fetchHandlerCtor.close()
}
}

@Test
def testFetchDisklessBelowStartOffsetFailsWhenManagedReplicasDisabled(): Unit = {
val fetchHandlerCtor = mockFetchHandler(Map.empty)
try {
val cp = mock(classOf[ControlPlane])
// Given managed replicas are disabled
val replicaManager = spy(createReplicaManager(
List(disklessTopicPartition.topic()),
controlPlane = Some(cp),
disklessManagedReplicasEnabled = false,
))

// Given a diskless topic with disklessStartOffset = 100
when(replicaManager.inklessMetadataView().getDisklessStartOffset(disklessTopicPartition.topicPartition())).thenReturn(100L)

// When fetching messages below the diskless start offset
val fetchParams = new FetchParams(
-1, -1L,
0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()
)
val fetchInfos = Seq(
disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty())
)

@volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
responseData = response.toMap
}
replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)

// Then the request fails
assertNotNull(responseData)
assertEquals(1, responseData.size)
assertEquals(Errors.INVALID_REQUEST, responseData(disklessTopicPartition).error)
assertEquals(MemoryRecords.EMPTY, responseData(disklessTopicPartition).records)
verify(replicaManager, never()).readFromLog(any(), any(), any(), any())
verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any())
verify(cp, never()).findBatches(any(), any(), any())
} finally {
fetchHandlerCtor.close()
}
}

@Test
def testFetchFailDisklessWhenFromReplicaAndUnmanagedReplicas(): Unit = {
val fetchHandlerCtor = mockFetchHandler(Map.empty)
try {
// Given a topic partition that is diskless and managed replicas are disabled
val replicaManager = createReplicaManager(List(disklessTopicPartition.topic()), disklessManagedReplicasEnabled = false)
val fetchParams = new FetchParams(
1, 1L, // follower fetch
10L, 100, 200, FetchIsolation.HIGH_WATERMARK, Optional.empty()
)
val fetchInfos = Seq(
disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 10L, 0L, 123, Optional.empty())
)

@volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
responseData = response.toMap
}

// When we try to fetch messages from the diskless topic partition with a follower fetch,
replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)

// Then we should get an immediate empty response.
assertNotNull(responseData)
assertEquals(0, responseData.size)
verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any())
} finally {
fetchHandlerCtor.close()
}
}

@ParameterizedTest(name = "testFetchDisklessAtOrAboveStartOffsetUsesDiskless with managedReplicasEnabled: {0}")
@ValueSource(booleans = Array(true, false))
def testFetchDisklessAtOrAboveDisklessStartOffset(managedReplicasEnabled: Boolean): Unit = {
val disklessResponse = Map(disklessTopicPartition ->
new FetchPartitionData(
Errors.NONE,
110L, 100L,
RECORDS,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)
)
@volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
responseData = response.toMap
val fetchHandlerCtor = mockFetchHandler(disklessResponse)
try {
val batchMetadata = mock(classOf[BatchMetadata])
when(batchMetadata.topicIdPartition()).thenReturn(disklessTopicPartition)
val batch = mock(classOf[BatchInfo])
when(batch.metadata()).thenReturn(batchMetadata)
val findBatchResponse = mock(classOf[FindBatchResponse])
when(findBatchResponse.batches()).thenReturn(util.List.of(batch))
when(findBatchResponse.highWatermark()).thenReturn(110L)
when(findBatchResponse.estimatedByteSize(100L)).thenReturn(RECORDS.sizeInBytes())
when(findBatchResponse.errors()).thenReturn(Errors.NONE)
val cp = mock(classOf[ControlPlane])
when(cp.findBatches(any(), any(), any())).thenReturn(util.List.of(findBatchResponse))
val replicaManager = spy(createReplicaManager(
List(disklessTopicPartition.topic()),
controlPlane = Some(cp),
disklessManagedReplicasEnabled = managedReplicasEnabled,
))

// Given a diskless topic with disklessStartOffset = 100
when(replicaManager.inklessMetadataView().getDisklessStartOffset(disklessTopicPartition.topicPartition())).thenReturn(100L)

// When fetching messages at the diskless start offset
val fetchParams = new FetchParams(
-1, -1L,
0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()
)
val fetchInfos = Seq(
disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 100L, 0L, 1024, Optional.empty())
)

@volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
responseData = response.toMap
}
replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)

// Then the request is served from diskless fetch handler and not from the unified log
assertNotNull(responseData)
assertEquals(1, responseData.size)
assertEquals(Errors.NONE, responseData(disklessTopicPartition).error)
assertEquals(RECORDS, responseData(disklessTopicPartition).records)
verify(replicaManager, never()).readFromLog(any(), any(), any(), any())
verify(fetchHandlerCtor.constructed().get(0), times(1)).handle(any(), any())
verify(cp, times(1)).findBatches(any(), any(), any())
} finally {
fetchHandlerCtor.close()
}
// When we try to fetch messages from the diskless topic partition with a follower fetch,
replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)
// Then we should get an immediate empty response.
assertNotNull(responseData)
assertEquals(0, responseData.size)
}

@Test
Expand Down Expand Up @@ -6925,9 +7083,11 @@ class ReplicaManagerTest {
private def createReplicaManager(
disklessTopics: Seq[String],
controlPlane: Option[ControlPlane] = None,
topicIdMapping: Map[String, Uuid] = Map.empty
topicIdMapping: Map[String, Uuid] = Map.empty,
disklessManagedReplicasEnabled: Boolean = false
): ReplicaManager = {
val props = TestUtils.createBrokerConfig(1, logDirCount = 2)
props.put(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, disklessManagedReplicasEnabled.toString)
Comment thread
giuseppelillo marked this conversation as resolved.
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties()))
val sharedState = mock(classOf[SharedState], Answers.RETURNS_DEEP_STUBS)
Expand Down
Loading