From 1068083f0daf624ea4f2e8d5e730839e30d94c1f Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 27 Mar 2026 10:41:36 -0500 Subject: [PATCH] fix(inkless:consume): handle error path in lastOffsetForLeaderEpoch for diskless topics When FetchOffsetHandler returns an error from the control plane, timestampAndOffset is empty. Check for the error first and use UNDEFINED_EPOCH_OFFSET as fallback instead of calling Optional.get() on an empty value. Co-Authored-By: Claude Opus 4.6 --- .../scala/kafka/server/ReplicaManager.scala | 14 +++-- .../kafka/server/ReplicaManagerTest.scala | 54 +++++++++++++++++-- 2 files changed, 60 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 32ee4069eb4..41980776270 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -41,6 +41,7 @@ import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartit import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult} +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse import org.apache.kafka.common.message.{DescribeLogDirsResponseData, DescribeProducersResponseData} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName @@ -2586,14 +2587,19 @@ class ReplicaManager(val config: KafkaConfig, inklessFetchOffsetHandlerJob.get .add(tp, partitionRequest) .thenApply[EpochEndOffset](epochEndOffset => { - val endOffset = epochEndOffset.timestampAndOffset().get().offset - val errorCode = epochEndOffset.exception() + val error = epochEndOffset.exception() .map[Errors](e => Errors.forException(e)) .orElse(Errors.NONE) - .code + if (error != Errors.NONE) { + warn(s"Error fetching offset for leader epoch from control plane for $tp: $error", + epochEndOffset.exception().orElse(null)) + } + val endOffset = epochEndOffset.timestampAndOffset() + .map[Long](_.offset) + .orElse(OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET) new EpochEndOffset() .setPartition(tp.partition()) - .setErrorCode(errorCode) + .setErrorCode(error.code) .setLeaderEpoch(leaderEpoch) .setEndOffset(endOffset) }) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index d4571d14e0c..037eca7d7a8 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -20,7 +20,7 @@ package kafka.server import com.yammer.metrics.core.{Gauge, Meter, Timer} import io.aiven.inkless.common.SharedState import io.aiven.inkless.config.InklessConfig -import io.aiven.inkless.consume.FetchHandler +import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler} import io.aiven.inkless.control_plane.{BatchInfo, BatchMetadata, ControlPlane, ControlPlaneException, FindBatchResponse} import kafka.server.metadata.InklessMetadataView import io.aiven.inkless.produce.AppendHandler @@ -40,9 +40,10 @@ import org.apache.kafka.clients.consumer.ShareAcquireMode import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig -import org.apache.kafka.common.errors.InvalidPidMappingException +import org.apache.kafka.common.errors.{InvalidPidMappingException, UnknownTopicOrPartitionException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.{DeleteRecordsResponseData, FetchResponseData, ShareFetchResponseData} +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic} import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RemoveTopicRecord, TopicRecord} import org.apache.kafka.common.metrics.Metrics @@ -83,7 +84,7 @@ import org.apache.kafka.server.util.timer.{MockTimer, SystemTimer} import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadResult, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadResult, LogSegments, OffsetResultHolder, ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Nested, Test} @@ -93,7 +94,7 @@ import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer -import org.mockito.{Answers, ArgumentCaptor, ArgumentMatchers, MockedConstruction} +import org.mockito.{Answers, ArgumentCaptor, ArgumentMatchers, MockedConstruction, Mockito} import java.io.{ByteArrayInputStream, File} import java.net.InetAddress @@ -6861,6 +6862,51 @@ class ReplicaManagerTest { // TODO: Add more fetch tests combinations, edge cases ara not covered yet. + @Test + def testLastOffsetForLeaderEpochDisklessWithControlPlaneError(): Unit = { + val errorResult = new OffsetResultHolder.FileRecordsOrError( + Optional.of(new UnknownTopicOrPartitionException("not found")), + Optional.empty() + ) + + val jobMock = Mockito.mock(classOf[FetchOffsetHandler.Job]) + when(jobMock.mustHandle(any())).thenReturn(true) + when(jobMock.add(any(), any())).thenReturn(CompletableFuture.completedFuture(errorResult)) + doNothing().when(jobMock).start() + + val fetchOffsetHandlerCtorInit: MockedConstruction.MockInitializer[FetchOffsetHandler] = { + case (handlerMock, _) => + when(handlerMock.createJob()).thenReturn(jobMock) + } + val fetchOffsetHandlerCtor = mockConstruction(classOf[FetchOffsetHandler], fetchOffsetHandlerCtorInit) + + val replicaManager = try { + createReplicaManager(List(disklessTopicPartition.topic())) + } finally { + fetchOffsetHandlerCtor.close() + } + + val requestedEpochInfo = Seq( + new OffsetForLeaderTopic() + .setTopic(disklessTopicPartition.topic()) + .setPartitions(util.List.of( + new OffsetForLeaderPartition() + .setPartition(disklessTopicPartition.partition()) + .setLeaderEpoch(0) + )) + ) + + val result = replicaManager.lastOffsetForLeaderEpoch(requestedEpochInfo) + + assertEquals(1, result.size) + val topicResult = result.head + assertEquals(disklessTopicPartition.topic(), topicResult.topic()) + assertEquals(1, topicResult.partitions().size()) + val partitionResult = topicResult.partitions().get(0) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, partitionResult.errorCode()) + assertEquals(OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET, partitionResult.endOffset()) + } + private def mockFetchHandler(disklessResponse: Map[TopicIdPartition, FetchPartitionData]) = { // We use constructor mocking here to inject a FetchHandler mock into ReplicaManager, // because ReplicaManager internally constructs its own FetchHandler instance and does not