Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,10 @@ class BrokerServer(
retryTimeoutMs = 60000
)
initDisklessLogChannelManager.start()
maybeInitDisklessLogManager = sharedServer.inklessControlPlane.map { _ =>
maybeInitDisklessLogManager = sharedServer.inklessControlPlane.map { controlPlane =>
new InitDisklessLogManager(
controllerChannelManager = initDisklessLogChannelManager,
controlPlane = controlPlane,
scheduler = kafkaScheduler,
brokerId = config.brokerId,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch
Expand Down
128 changes: 123 additions & 5 deletions core/src/main/scala/kafka/server/InitDisklessLogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.server

import io.aiven.inkless.control_plane.{ControlPlane, InitDisklessLogProducerState => CpProducerState, InitDisklessLogRequest => CpInitRequest}
import kafka.cluster.Partition
import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
Expand Down Expand Up @@ -49,11 +50,19 @@ private[server] case class InitPartitionState(
partition: Partition,
topicId: Uuid,
state: InitState,
metadataPayload: Option[DisklessInitMetadata] = None,
retryAttempt: Int = 0
)

private[server] case class DisklessInitMetadata(
topicName: String,
disklessStartOffset: Long,
producerStates: util.List[CpProducerState]
)

class InitDisklessLogManager(
controllerChannelManager: NodeToControllerChannelManager,
controlPlane: ControlPlane,
scheduler: Scheduler,
brokerId: Int,
brokerEpochSupplier: () => Long
Expand Down Expand Up @@ -81,6 +90,54 @@ class InitDisklessLogManager(
private[server] def getInitState(tp: TopicPartition): Option[InitState] =
Option(tracked.get(tp)).map(_.state)

/**
* Handles already-applied diskless init metadata for a partition.
* This moves/keeps the partition in AwaitingMetadata and triggers a prompt
* control-plane init send, since metadata is committed and visible.
*
* @param partition Partition instance for the topic-partition to initialize.
* @param topicId Topic ID currently assigned to the partition.
* @param topicName Topic name used in the control-plane init payload.
* @param disklessStartOffset Start offset for diskless initialization; negative means absent.
* @param producerStates Producer state snapshots to seed the destination replica state.
*/
def initOnControlPlane(
partition: Partition,
topicId: Uuid,
topicName: String,
disklessStartOffset: Long,
producerStates: util.List[CpProducerState]
): Unit = {
if (disklessStartOffset < 0) {
warn(s"Received a negative disklessStartOffset ($disklessStartOffset) from the Controller for $topicName:$partition, skipping init on Diskless Coordinator.")
return
}

val tp = partition.topicPartition
val payload = DisklessInitMetadata(topicName, disklessStartOffset, producerStates)
val newState = InitPartitionState(
partition = partition,
topicId = topicId,
state = InitState.AwaitingMetadata,
metadataPayload = Some(payload),
)

if (tracked.putIfAbsent(tp, newState) == null) {
partition.maybeAddListener(this)
} else {
tracked.computeIfPresent(tp, (_, current) =>
current.copy(
partition = partition,
topicId = topicId,
state = InitState.AwaitingMetadata,
metadataPayload = Some(payload)
))
}

// Metadata is already committed and visible; trigger CP init promptly.
scheduleBatchSend(0L)
}

/**
* Register a sealed partition for migration. Registers this manager as a
* PartitionListener to receive HW advancement notifications. If HW already
Expand Down Expand Up @@ -196,13 +253,17 @@ class InitDisklessLogManager(
* and send a single InitDisklessLog request to the controller.
*/
private[server] def sendBatch(): Unit = {
val ready = tracked.asScala.filter { case (_, initPartitionState) =>
val readyForController = tracked.asScala.filter { case (_, initPartitionState) =>
initPartitionState.state == InitState.SendingToController
}.toMap

if (ready.isEmpty) return
val readyForControlPlane = tracked.asScala.filter { case (_, initPartitionState) =>
initPartitionState.state == InitState.AwaitingMetadata && initPartitionState.metadataPayload.isDefined
}.toMap

if (readyForController.isEmpty && readyForControlPlane.isEmpty) return

val validPartitions = ready.filter { case (tp, mps) =>
val validPartitions = readyForController.filter { case (tp, mps) =>
if (!mps.partition.isLeader) {
info(s"Partition $tp is no longer leader, removing from migration tracking")
tracked.remove(tp)
Expand All @@ -216,7 +277,10 @@ class InitDisklessLogManager(
}
}

if (validPartitions.isEmpty) return
if (validPartitions.isEmpty) {
if (readyForControlPlane.nonEmpty) initOnControlPlane(readyForControlPlane)
return
}

val topicDataMap = new util.LinkedHashMap[Uuid, util.List[InitDisklessLogRequestData.PartitionData]]()

Expand Down Expand Up @@ -271,6 +335,8 @@ class InitDisklessLogManager(
handleBatchException(partitionKeys, new RuntimeException("InitDisklessLog request timed out"))
}
})

if (readyForControlPlane.nonEmpty) initOnControlPlane(readyForControlPlane)
}

private def extractProducerStates(
Expand Down Expand Up @@ -306,7 +372,7 @@ class InitDisklessLogManager(
case Errors.NONE =>
info(s"InitDisklessLog succeeded for partition $tp, transitioning to AwaitingMetadata")
tracked.computeIfPresent(tp, (_, mps) =>
mps.copy(state = InitState.AwaitingMetadata, retryAttempt = 0))
mps.copy(state = InitState.AwaitingMetadata, metadataPayload = None, retryAttempt = 0))

case Errors.FENCED_LEADER_EPOCH | Errors.INVALID_REQUEST =>
info(s"InitDisklessLog for partition $tp returned permanent error $error, removing from tracking")
Expand Down Expand Up @@ -369,4 +435,56 @@ class InitDisklessLogManager(
private def computeBackoff(attempt: Int): Long = {
Math.min(initialRetryBackoffMs * (1L << Math.min(Math.max(attempt - 1, 0), 14)), maxRetryBackoffMs)
}

private def initOnControlPlane(readyForControlPlane: Map[TopicPartition, InitPartitionState]): Unit = {
val retriableAttempts = mutable.Map[TopicPartition, Int]()

readyForControlPlane.foreach { case (tp, mps) =>
val metadata = mps.metadataPayload.get
mps.partition.log match {
case None =>
warn(s"Partition $tp has no log while applying diskless metadata, scheduling retry")
val updated = tracked.computeIfPresent(tp, (_, current) => current.copy(retryAttempt = current.retryAttempt + 1))
if (updated != null) retriableAttempts.put(tp, updated.retryAttempt)
case Some(log) =>
val request = new CpInitRequest(
mps.topicId,
metadata.topicName,
tp.partition(),
log.logStartOffset,
metadata.disklessStartOffset,
metadata.producerStates
)

try {
val responses = controlPlane.initDisklessLog(util.List.of(request))
val response = Option(responses).flatMap(_.asScala.headOption)
response match {
case Some(r) if r.error() == Errors.NONE || r.error() == Errors.INVALID_REQUEST =>
info(s"Control-plane InitDisklessLog completed for $tp with ${r.error()}. Removing from tracking.")
tracked.remove(tp)
case Some(r) =>
warn(s"Control-plane InitDisklessLog for $tp returned retriable error ${r.error()}")
val updated = tracked.computeIfPresent(tp, (_, current) => current.copy(retryAttempt = current.retryAttempt + 1))
if (updated != null) retriableAttempts.put(tp, updated.retryAttempt)
case None =>
warn(s"Control-plane InitDisklessLog for $tp returned no response, scheduling retry")
val updated = tracked.computeIfPresent(tp, (_, current) => current.copy(retryAttempt = current.retryAttempt + 1))
if (updated != null) retriableAttempts.put(tp, updated.retryAttempt)
}
} catch {
case t: Throwable =>
warn(s"Control-plane InitDisklessLog for $tp failed with exception, scheduling retry", t)
val updated = tracked.computeIfPresent(tp, (_, current) => current.copy(retryAttempt = current.retryAttempt + 1))
if (updated != null) retriableAttempts.put(tp, updated.retryAttempt)
}
}
}

if (retriableAttempts.nonEmpty) {
val minBackoff = retriableAttempts.values.map(computeBackoff).min
warn(s"Scheduling control-plane InitDisklessLog retry for ${retriableAttempts.size} partition(s) in ${minBackoff}ms")
scheduleBatchSend(minBackoff)
}
}
}
66 changes: 65 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.server
import com.yammer.metrics.core.Meter
import io.aiven.inkless.common.SharedState
import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler}
import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse}
import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, InitDisklessLogProducerState}
import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer}
import io.aiven.inkless.merge.FileMerger
import io.aiven.inkless.produce.AppendHandler
Expand Down Expand Up @@ -2820,6 +2820,70 @@ class ReplicaManager(val config: KafkaConfig,
localChanges.directoryIds.forEach(maybeUpdateTopicAssignment)
}
}

initDisklessLogOnControlPlane(delta)
}

private def initDisklessLogOnControlPlane(delta: TopicsDelta): Unit = {
initDisklessLogManager.foreach { manager =>
delta.changedTopics().forEach { (topicId, topicDelta) =>
val topicName = topicDelta.name()
topicDelta.partitionChanges().forEach { (partitionId, partitionRegistration) =>
// Notify only on the specific transition from "not diskless-initialized"
// to "diskless-initialized" in metadata.
val previousPartition = Option(delta.image().getTopic(topicId)).flatMap { topicImage =>
Option(topicImage.partitions().get(partitionId))
}
val shouldInitDisklessLogOnControlPlane = previousPartition match {
case None => false
case Some(previous) =>
previous.disklessStartOffset == PartitionRegistration.NO_DISKLESS_START_OFFSET &&
partitionRegistration.disklessStartOffset >= 0
}
val tp = new TopicPartition(topicName, partitionId)
if (shouldInitDisklessLogOnControlPlane) {
// Send init only for partitions that currently have a local Partition instance.
onlinePartition(tp) match {
case Some(partition) =>
val producerStates = partitionRegistration.disklessProducerStates.asScala.map { producerState =>
new InitDisklessLogProducerState(
producerState.producerId(),
producerState.producerEpoch(),
producerState.baseSequence(),
producerState.lastSequence(),
producerState.assignedOffset(),
producerState.batchMaxTimestamp()
)
}.asJava
manager.initOnControlPlane(
partition = partition,
topicId = topicId,
topicName = topicName,
disklessStartOffset = partitionRegistration.disklessStartOffset,
producerStates = producerStates
)
case None =>
stateChangeLogger.info(
s"Skipping diskless init on control plane for $tp because the partition is not online locally."
)
}
} else {
previousPartition match {
case None =>
stateChangeLogger.info(
s"Skipping diskless init on control plane for $tp because no previous partition registration was found."
)
case Some(previous) =>
stateChangeLogger.info(
s"Skipping diskless init on control plane for $tp because transition did not match " +
s"${PartitionRegistration.NO_DISKLESS_START_OFFSET} -> >=0 " +
s"(previous=${previous.disklessStartOffset}, current=${partitionRegistration.disklessStartOffset})."
)
}
}
}
}
}
}

private def applyLocalLeadersDelta(
Expand Down
Loading
Loading