Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/inkless/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,22 @@ Under ``inkless.``
* Valid Values: [0,...]
* Importance: low

``fetch.hedge.total.time.threshold.ms``
Total time threshold in milliseconds to trigger a hedge request. When a storage fetch has not completed within this threshold, a competing hedge request is submitted. The first request to complete wins; the other continues in the background and its result is ignored. Set to 0 to disable total-time-based hedging. When both hedging thresholds are enabled, this value must be strictly greater than fetch.hedge.ttfb.threshold.ms.

* Type: long
* Default: 0
* Valid Values: [0,...]
* Importance: low

``fetch.hedge.ttfb.threshold.ms``
Time-to-first-byte threshold in milliseconds to trigger a hedge request. When a storage fetch has not received its first byte within this threshold, a competing hedge request is submitted. This catches stuck connections early, before the total-time threshold. Set to 0 to disable TTFB-based hedging. When both hedging thresholds are enabled, fetch.hedge.total.time.threshold.ms must be strictly greater than this value.

Comment thread
jeqo marked this conversation as resolved.
* Type: long
* Default: 0
* Valid Values: [0,...]
* Importance: low
Comment thread
jeqo marked this conversation as resolved.

``fetch.lagging.consumer.thread.pool.size``
Thread pool size for lagging consumer fetch requests (consumers reading old data). Set to 0 to disable the lagging consumer feature (all requests will use the recent data path). The default value of 16 is designed as approximately half of the default fetch.data.thread.pool.size (32), providing sufficient capacity for typical cold storage access patterns while leaving headroom for the hot path. The queue capacity is automatically set to thread.pool.size * 100, providing burst buffering (e.g., 16 threads = 1600 queue capacity ≈ 8 seconds buffer at 200 req/s). Tune based on lagging consumer SLA and expected load patterns.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,23 @@ public class InklessConfig extends AbstractConfig {
// Tune based on storage backend capacity and budget constraints.
private static final int FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_DEFAULT = 200;

public static final String FETCH_HEDGE_TTFB_THRESHOLD_MS_CONFIG = "fetch.hedge.ttfb.threshold.ms";
public static final String FETCH_HEDGE_TTFB_THRESHOLD_MS_DOC = "Time-to-first-byte threshold in milliseconds to trigger a hedge request. "
+ "When a storage fetch has not received its first byte within this threshold, a competing hedge request is submitted. "
+ "This catches stuck connections early, before the total-time threshold. "
+ "Set to 0 to disable TTFB-based hedging. "
+ "When both hedging thresholds are enabled, fetch.hedge.total.time.threshold.ms must be strictly greater than this value.";
private static final long FETCH_HEDGE_TTFB_THRESHOLD_MS_DEFAULT = 0;

public static final String FETCH_HEDGE_TOTAL_TIME_THRESHOLD_MS_CONFIG = "fetch.hedge.total.time.threshold.ms";
public static final String FETCH_HEDGE_TOTAL_TIME_THRESHOLD_MS_DOC = "Total time threshold in milliseconds to trigger a hedge request. "
+ "When a storage fetch has not completed within this threshold, a competing hedge request is submitted. "
+ "The first request to complete wins; the other continues in the background and its result is ignored. "
+ "Set to 0 to disable total-time-based hedging. "
+ "When both hedging thresholds are enabled, this value must be strictly greater than "
+ FETCH_HEDGE_TTFB_THRESHOLD_MS_CONFIG + ".";
private static final long FETCH_HEDGE_TOTAL_TIME_THRESHOLD_MS_DEFAULT = 0;

public static final String FETCH_FIND_BATCHES_MAX_BATCHES_PER_PARTITION_CONFIG = "fetch.find.batches.max.per.partition";
public static final String FETCH_FIND_BATCHES_MAX_BATCHES_PER_PARTITION_DOC = "The maximum number of batches to find per partition when processing a fetch request. "
+ "A value of 0 means all available batches are fetched. "
Expand Down Expand Up @@ -406,6 +423,22 @@ public static ConfigDef configDef() {
ConfigDef.Importance.MEDIUM,
FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_DOC
);
configDef.define(
FETCH_HEDGE_TOTAL_TIME_THRESHOLD_MS_CONFIG,
ConfigDef.Type.LONG,
FETCH_HEDGE_TOTAL_TIME_THRESHOLD_MS_DEFAULT,
ConfigDef.Range.atLeast(0),
ConfigDef.Importance.LOW,
FETCH_HEDGE_TOTAL_TIME_THRESHOLD_MS_DOC
);
configDef.define(
FETCH_HEDGE_TTFB_THRESHOLD_MS_CONFIG,
ConfigDef.Type.LONG,
FETCH_HEDGE_TTFB_THRESHOLD_MS_DEFAULT,
ConfigDef.Range.atLeast(0),
ConfigDef.Importance.LOW,
FETCH_HEDGE_TTFB_THRESHOLD_MS_DOC
);
configDef.define(
FETCH_FIND_BATCHES_MAX_BATCHES_PER_PARTITION_CONFIG,
ConfigDef.Type.INT,
Expand Down Expand Up @@ -508,6 +541,24 @@ private static ConfigDef validate(final Map<String, ?> props) {
);
}

final long hedgeTtfbMs =
((Number) parsedProps.get(FETCH_HEDGE_TTFB_THRESHOLD_MS_CONFIG)).longValue();
final long hedgeTotalTimeMs =
((Number) parsedProps.get(FETCH_HEDGE_TOTAL_TIME_THRESHOLD_MS_CONFIG)).longValue();

// When both hedging triggers are enabled, total-time threshold must be > TTFB threshold.
// TTFB fires early to catch stuck connections; total-time is a broader safety net.
// If total-time <= TTFB, the total-time timer would fire first (or simultaneously),
// making TTFB redundant and defeating the two-tier design.
if (hedgeTtfbMs > 0 && hedgeTotalTimeMs > 0 && hedgeTotalTimeMs <= hedgeTtfbMs) {
throw new ConfigException(
FETCH_HEDGE_TOTAL_TIME_THRESHOLD_MS_CONFIG,
hedgeTotalTimeMs,
FETCH_HEDGE_TOTAL_TIME_THRESHOLD_MS_CONFIG + " (" + hedgeTotalTimeMs + "ms) must be greater than "
+ FETCH_HEDGE_TTFB_THRESHOLD_MS_CONFIG + " (" + hedgeTtfbMs + "ms) when both are enabled."
);
}

return configDef;
}

Expand Down Expand Up @@ -633,6 +684,14 @@ public int fetchLaggingConsumerRequestRateLimit() {
return getInt(FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_CONFIG);
}

public long fetchHedgeTtfbThresholdMs() {
return getLong(FETCH_HEDGE_TTFB_THRESHOLD_MS_CONFIG);
}

public long fetchHedgeTotalTimeThresholdMs() {
return getLong(FETCH_HEDGE_TOTAL_TIME_THRESHOLD_MS_CONFIG);
}

public int maxBatchesPerPartitionToFind() {
return getInt(FETCH_FIND_BATCHES_MAX_BATCHES_PER_PARTITION_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public FetchHandler(final SharedState state) {
state.config().fetchLaggingConsumerThresholdMs(),
state.config().fetchLaggingConsumerRequestRateLimit(),
state.config().fetchLaggingConsumerThreadPoolSize(),
state.config().fetchHedgeTtfbThresholdMs(),
state.config().fetchHedgeTotalTimeThresholdMs(),
state.config().maxBatchesPerPartitionToFind()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -76,6 +83,10 @@ public class FetchPlanner implements Supplier<List<FetchPlanner.FetchRequestWith
private final ObjectFetcher laggingObjectFetcher;
private final long laggingConsumerThresholdMs;
private final Bucket laggingRateLimiter;
private final ScheduledExecutorService hedgeScheduler;
private final long hedgeTtfbThresholdMs;
private final long hedgeTotalTimeThresholdMs;
private final ConcurrentHashMap<CompletableFuture<?>, AtomicBoolean> hedgeGuards;
private final Map<TopicIdPartition, FindBatchResponse> batchCoordinates;
private final InklessFetchMetrics metrics;

Expand All @@ -90,6 +101,10 @@ public FetchPlanner(
long laggingConsumerThresholdMs,
Bucket laggingRateLimiter,
ExecutorService laggingFetchDataExecutor,
ScheduledExecutorService hedgeScheduler,
long hedgeTtfbThresholdMs,
long hedgeTotalTimeThresholdMs,
ConcurrentHashMap<CompletableFuture<?>, AtomicBoolean> hedgeGuards,
Map<TopicIdPartition, FindBatchResponse> batchCoordinates,
InklessFetchMetrics metrics
) {
Expand All @@ -103,6 +118,10 @@ public FetchPlanner(
this.laggingFetchDataExecutor = laggingFetchDataExecutor;
this.laggingConsumerThresholdMs = laggingConsumerThresholdMs;
this.laggingRateLimiter = laggingRateLimiter;
this.hedgeScheduler = hedgeScheduler;
this.hedgeTtfbThresholdMs = hedgeTtfbThresholdMs;
this.hedgeTotalTimeThresholdMs = hedgeTotalTimeThresholdMs;
this.hedgeGuards = hedgeGuards;
this.batchCoordinates = batchCoordinates;
this.metrics = metrics;
}
Expand Down Expand Up @@ -229,24 +248,31 @@ private CompletableFuture<FileExtent> submitSingleRequest(final ObjectFetchReque
if (!request.lagging()) {
// Hot path: up-to-date consumers use cache + recentDataExecutor
metrics.recordRecentDataRequest();
return cache.computeIfAbsent(
// Per-caller TTFB signal: set by the load function's TTFB callback when first byte arrives.
// On a cache dedup (computeIfAbsent returns an existing in-flight future), the load function
// doesn't run, so this flag stays false — but with per-key hedge dedup (hedgeGuards),
// at most one hedge fires per primary regardless of how many callers have stale TTFB flags.
final AtomicBoolean firstByteReceived = new AtomicBoolean(false);
final CompletableFuture<FileExtent> primary = cache.computeIfAbsent(
request.toCacheKey(),
k -> fetchFileExtent(objectFetcher, request),
k -> fetchFileExtent(objectFetcher, request, firstByteReceived),
fetchDataExecutor
);
return withHedge(primary, objectFetcher, request, fetchDataExecutor, firstByteReceived);
Comment thread
jeqo marked this conversation as resolved.
} else {
// Cold path: lagging consumers bypass cache, use dedicated executor with rate limiting.
// Cache bypass rationale: Objects are multi-partition blobs, caching them would evict hot data
// and provide little benefit to the lagging consumer. Backpressure via AbortPolicy: queue full
// → RejectedExecutionException → Kafka error handler → consumer backs off (fetch purgatory).
metrics.recordLaggingConsumerRequest();
try {
return CompletableFuture.supplyAsync(() -> {
final AtomicBoolean firstByteReceived = new AtomicBoolean(false);
final CompletableFuture<FileExtent> primary = CompletableFuture.supplyAsync(() -> {
// Apply rate limiting if configured (rate limit > 0)
if (laggingRateLimiter != null) {
applyRateLimit(); // InterruptedException here is wrapped in FetchException
}
return fetchFileExtent(laggingObjectFetcher, request);
return fetchFileExtent(laggingObjectFetcher, request, firstByteReceived);
},
laggingFetchDataExecutor
).whenComplete((result, throwable) -> {
Expand All @@ -265,6 +291,7 @@ private CompletableFuture<FileExtent> submitSingleRequest(final ObjectFetchReque
}
}
});
return withHedge(primary, laggingObjectFetcher, request, laggingFetchDataExecutor, firstByteReceived);
} catch (final RejectedExecutionException e) {
// Sync rejection (executor shut down or queue full at submission) - return failed future
// instead of propagating exception. This allows allOfFileExtents to handle the failure
Expand All @@ -277,6 +304,118 @@ private CompletableFuture<FileExtent> submitSingleRequest(final ObjectFetchReque
}
}

// Wraps a primary future with hedging: schedules timer(s) that fire a competing fetch if the
// primary is too slow. Two triggers: TTFB (stuck connection) and total-time (slow transfer).
// Timers run on the hedge scheduler; actual fetches run on the data executor.
//
// Race resolution: hedge calls primary.complete(value) — CF.complete() is a CAS, first caller wins.
// This makes hedging transparent to the cache (cache holds a reference to primary).
//
// Per-key dedup: hedgeFired is shared across all callers of the same primary (via hedgeGuards map).
// On cache dedup, N concurrent callers share the same primary and the same guard — at most one
// hedge fires per primary, preventing hedge storms under high fan-out. The guard is removed
// when the primary completes.
//
// Early exits: returns primary as-is when disabled (null scheduler) or already complete (cache hit).
// Fast failures propagate immediately — hedging mitigates slow responses, not errors.
private CompletableFuture<FileExtent> withHedge(
final CompletableFuture<FileExtent> primary,
final ObjectFetcher fetcher,
final ObjectFetchRequest request,
final ExecutorService executor,
final AtomicBoolean firstByteReceived
) {
if (hedgeScheduler == null || primary.isDone()) {
return primary;
}

// Per-key dedup: all callers sharing the same primary (via cache dedup) share one guard.
// At most one hedge fires per primary, regardless of concurrent caller count.
final AtomicBoolean hedgeFired = hedgeGuards.computeIfAbsent(primary, k -> {
k.whenComplete((v, e) -> hedgeGuards.remove(k));
return new AtomicBoolean(false);
});
final List<ScheduledFuture<?>> timers = new ArrayList<>(2);

try {
// TTFB trigger: fire hedge if first byte hasn't arrived within threshold
if (hedgeTtfbThresholdMs > 0) {
timers.add(hedgeScheduler.schedule(() -> {
if (!primary.isDone() && !firstByteReceived.get()) {
tryFireHedge(hedgeFired, primary, fetcher, request, executor,
metrics::recordHedgeTtfbTriggered);
}
}, hedgeTtfbThresholdMs, TimeUnit.MILLISECONDS));
Comment thread
jeqo marked this conversation as resolved.
}

// Total-time trigger: fire hedge if primary hasn't completed within threshold
if (hedgeTotalTimeThresholdMs > 0) {
timers.add(hedgeScheduler.schedule(() -> {
if (!primary.isDone()) {
tryFireHedge(hedgeFired, primary, fetcher, request, executor,
metrics::recordHedgeTotalTimeTriggered);
}
}, hedgeTotalTimeThresholdMs, TimeUnit.MILLISECONDS));
}
} catch (final RejectedExecutionException e) {
// Scheduler shut down — cancel any timer that was already scheduled before the failure,
// then fall back to primary without hedging.
timers.forEach(timer -> timer.cancel(false));
return primary;
}

if (timers.isEmpty()) {
// Both thresholds are 0 — no hedging
return primary;
}

// Cancel timers when primary completes (naturally or via hedge)
primary.whenComplete((value, error) ->
timers.forEach(timer -> timer.cancel(false))
);

return primary;
}

// Fires a single hedge request. Runs on the hedge scheduler thread — must never block.
// All operations are non-blocking: CAS, meter marks, and supplyAsync (only enqueues a task;
// actual I/O runs on the data executor). Executor-full → RejectedExecutionException thrown
// immediately and caught.
//
// The losing fetch (primary or hedge) is not cancelled — CF.cancel() doesn't interrupt S3/GCS
// I/O, so the request runs to completion regardless. The loser's complete() is a no-op.
private void tryFireHedge(
final AtomicBoolean hedgeFired,
final CompletableFuture<FileExtent> primary,
final ObjectFetcher fetcher,
final ObjectFetchRequest request,
final ExecutorService executor,
final Runnable triggerMetric
) {
// There is a small race between the timer's !primary.isDone() check and this CAS —
// the primary can complete in between, causing an unnecessary hedge. This is intentionally
// non-blocking: eliminating the race would require synchronizing with primary completion,
// adding contention on the hot path. The trade-off is an occasional redundant fetch whose
// primary.complete() is a no-op. HedgeWonRate remains accurate; HedgeRequestRate may be
// slightly inflated, which is acceptable for monitoring purposes.
if (hedgeFired.compareAndSet(false, true)) {
metrics.recordHedgeRequest();
triggerMetric.run();
try {
final CompletableFuture<FileExtent> hedge = CompletableFuture.supplyAsync(
Comment thread
jeqo marked this conversation as resolved.
() -> fetchFileExtent(fetcher, request), executor
);
hedge.whenComplete((value, error) -> {
if (error == null && primary.complete(value)) {
metrics.recordHedgeWon();
}
});
Comment thread
jeqo marked this conversation as resolved.
} catch (final RejectedExecutionException ignored) {
// Executor full — fall back to primary only
}
}
}

// Applies request-based rate limiting by blocking executor thread until token available.
// Always records wait time (including zero-wait) for accurate latency histogram.
// Note: If interrupted, the duration is still recorded before the exception is thrown.
Expand All @@ -300,6 +439,37 @@ private void applyRateLimit() {
}, metrics::recordRateLimitWaitTime);
}

/**
* Fetches a file extent from remote storage, signaling when the first byte is received.
* The {@code firstByteReceived} flag is set by the TTFB callback before the metrics callback,
* enabling the TTFB hedge trigger to detect stuck connections.
*/
private FileExtent fetchFileExtent(
final ObjectFetcher fetcher,
final ObjectFetchRequest request,
final AtomicBoolean firstByteReceived
) {
final Consumer<Long> ttfbCallback = ttfbMs -> {
firstByteReceived.set(true);
metrics.fetchFirstByteFinished(ttfbMs);
};
try {
final FileFetchJob job = new FileFetchJob(
time,
fetcher,
request.objectKey(),
request.byteRange(),
metrics::fetchFileFinished,
ttfbCallback
);
final FileExtent fileExtent = job.call();
metrics.cacheEntrySize(fileExtent.data().length);
return fileExtent;
} catch (final Exception e) {
throw new FileFetchException(e);
}
}

/**
* Fetches a file extent from remote storage.
*
Expand Down
Loading
Loading