feat(inkless:consume): Add request hedging for storage reads#582
feat(inkless:consume): Add request hedging for storage reads#582
Conversation
5849c62 to
0df3545
Compare
There was a problem hiding this comment.
Pull request overview
Adds request hedging to Inkless storage reads to reduce tail latency by issuing a competing fetch when TTFB and/or total-time thresholds are exceeded, and records new hedge-related metrics.
Changes:
- Introduces hedging logic in
FetchPlannerwith independent TTFB and total-time triggers (max one hedge per primary). - Adds new config options + validation for hedging thresholds, with accompanying tests and docs updates.
- Adds hedge metrics to
InklessFetchMetricsand wires hedging config throughFetchHandler/Reader.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java | Implements hedging triggers and hedge request execution + metrics. |
| storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java | Creates/shuts down a hedge scheduler and passes thresholds into planning. |
| storage/inkless/src/main/java/io/aiven/inkless/consume/InklessFetchMetrics.java | Adds meters + record methods for hedge request/trigger/win metrics. |
| storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java | Threads new config values into Reader construction. |
| storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java | Defines new configs and validates total-time > TTFB when both enabled. |
| storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java | Adds hedging-focused tests and updates constructor call sites. |
| storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java | Updates Reader construction for new hedging parameters. |
| storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java | Adds validation tests for hedging threshold relationships. |
| docs/inkless/configs.rst | Documents new hedging configuration keys. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
c6a1543 to
4edd89c
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
4edd89c to
11e80ea
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
11e80ea to
7dddf57
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
c2406de to
116a86f
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
to mitigate tail latency When a storage fetch (S3/GCS/Azure) hasn't completed within a configurable threshold, a competing hedge request is fired. First to complete wins; the loser is best-effort cancelled. Applies to both hot (cached) and cold (lagging consumer) paths. Two independent hedge triggers (at most one hedge per primary): - TTFB: fetch.hedge.ttfb.threshold.ms — fires when first byte hasn't arrived, catching stuck connections early (default 0 = disabled) - Total time: fetch.hedge.total.time.threshold.ms — fires when the entire fetch is slow (default 0 = disabled) When both are enabled, total-time must be > TTFB (validated at startup). Metrics: HedgeRequestRate, HedgeWonRate Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
116a86f to
d6e76c6
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return primary; | ||
| } | ||
|
|
||
| final AtomicBoolean hedgeFired = new AtomicBoolean(false); |
There was a problem hiding this comment.
So this is created per caller and not per primary future? Or am I missing something?
There was a problem hiding this comment.
Per-caller, yes. But each caller maps to a single primary future.
There was a problem hiding this comment.
So it is possible to spawn multiple hedging futures here? But the thread pool is single threaded right? Should it be one per primary future?
There was a problem hiding this comment.
Right, not within the same request (this is guarded by the compare-and-set) but it can fire multiple hedge requests for the same primary (one per concurrent caller sharing the cached future). They all resolve correctly because primary.complete(value) is a compare-and-set as well -- first one wins, the rest are no-ops. Edge case, but should be handled gracefully.
Scheduler is single-threaded yes. It only schedules the timer callbacks. No need to have one per primary future -- the job is done on the data executors, this only triggers tasks.
I'm improving the docs to clarify this a bit better, and adding a test to cover the scenario where multiple hedge requests are fired.
There was a problem hiding this comment.
So what will be the boundary for the number of concurrent hedge fetches if there will be multiple consumers or groups trying to fetch exactly the same offsets? Do I understand correctly that every request will spawn an object storage fetch without any particular limit to number of those? Or am I missing anything? Can this lead to to some kind of hedge storm in the worst case?
There was a problem hiding this comment.
You are right. I was considering this an edge case but given the cost implication that could carry, and the complexity needed for this is not much, it does not justify not having it -- I'm adding a concurrent map for hedge guards to map primary future to hedge fired flag, so the maximum additional object storage fetches for concurrent requests on the same key should bound to one (the primary fetch + at most one hedge, regardless of caller count).
Overall the maximum number of hedge requests will be now bound to one per in-flight primary request.
When multiple consumers read the same partition/offset range concurrently, cache dedup means they share one primary future. Without dedup, each caller could independently spawn a hedge — up to N hedges for the same byte range. Adds a ConcurrentHashMap<CompletableFuture, AtomicBoolean> (hedgeGuards) in Reader, shared across all FetchPlanner instances. All callers of the same primary share one guard via computeIfAbsent — at most one hedge fires per primary. Entries are cleaned up on primary completion. Overhead: ~48 bytes per in-flight unique primary, one uncontended CAS per withHedge() call. Bounded by distinct in-flight cache keys, not caller count. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
3de5fd3 to
9478d0d
Compare
to mitigate tail latency
When a storage fetch (S3/GCS/Azure) hasn't completed within a configurable threshold, a competing hedge request is fired. First to complete wins; the loser is best-effort cancelled. Applies to both hot (cached) and cold (lagging consumer) paths.
Two independent hedge triggers (at most one hedge per primary):
When both are enabled, total-time must be > TTFB (validated at startup).
Metrics: HedgeRequestRate, HedgeWonRate