-
Notifications
You must be signed in to change notification settings - Fork 10
refactor(inkless:metrics): only add topic-type tag on all topic stats #472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
5a8f3ff
ee5b0d9
ca2dda9
679a580
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
|
|
||
| import com.yammer.metrics.core.Meter; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
|
|
@@ -32,9 +33,7 @@ | |
| public final class BrokerTopicMetrics { | ||
| public static final String MESSAGE_IN_PER_SEC = "MessagesInPerSec"; | ||
| public static final String BYTES_IN_PER_SEC = "BytesInPerSec"; | ||
| public static final String BYTES_IN_PER_SEC_DISKLESS_TOPIC = "BytesInPerSecDisklessTopic"; // The metric is BYTES_IN_PER_SEC | ||
| public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec"; | ||
| public static final String BYTES_OUT_PER_SEC_DISKLESS_TOPIC = "BytesOutPerSecDisklessTopic"; // The metric is BYTES_OUT_PER_SEC | ||
| public static final String BYTES_REJECTED_PER_SEC = "BytesRejectedPerSec"; | ||
| public static final String REPLICATION_BYTES_IN_PER_SEC = "ReplicationBytesInPerSec"; | ||
| public static final String REPLICATION_BYTES_OUT_PER_SEC = "ReplicationBytesOutPerSec"; | ||
|
|
@@ -60,10 +59,9 @@ public final class BrokerTopicMetrics { | |
| // For backward compatibility, we keep the old package name as metric group name. | ||
| private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "BrokerTopicMetrics"); | ||
| private final Map<String, String> tags; | ||
| private final Map<String, String> tagsForClassicTopic; | ||
| private final Map<String, String> tagsForDisklessTopic; | ||
| private final Map<String, MeterWrapper> metricTypeMap = new java.util.HashMap<>(); | ||
| private final Map<String, GaugeWrapper> metricGaugeTypeMap = new java.util.HashMap<>(); | ||
| private final boolean isAllTopicsStats; | ||
| private final Map<String, MeterWrapper> metricTypeMap = new HashMap<>(); | ||
| private final Map<String, GaugeWrapper> metricGaugeTypeMap = new HashMap<>(); | ||
|
|
||
| public BrokerTopicMetrics(boolean remoteStorageEnabled) { | ||
| this(Optional.empty(), remoteStorageEnabled); | ||
|
|
@@ -75,14 +73,11 @@ public BrokerTopicMetrics(String name, boolean remoteStorageEnabled) { | |
|
|
||
| private BrokerTopicMetrics(Optional<String> name, boolean remoteStorageEnabled) { | ||
| this.tags = name.map(s -> Map.of("topic", s)).orElse(Map.of()); | ||
| this.tagsForClassicTopic = name.map(s -> Map.of("topic", s, "topicType", "classic")).orElse(Map.of("topicType", "classic")); | ||
| this.tagsForDisklessTopic = name.map(s -> Map.of("topic", s, "topicType", "diskless")).orElse(Map.of("topicType", "diskless")); | ||
| this.isAllTopicsStats = name.isEmpty(); | ||
|
|
||
| metricTypeMap.put(MESSAGE_IN_PER_SEC, new MeterWrapper(MESSAGE_IN_PER_SEC, "messages")); | ||
| metricTypeMap.put(BYTES_IN_PER_SEC, new MeterWrapper(BYTES_IN_PER_SEC, "bytes", tagsForClassicTopic)); | ||
| metricTypeMap.put(BYTES_IN_PER_SEC_DISKLESS_TOPIC, new MeterWrapper(BYTES_IN_PER_SEC, "bytes", tagsForDisklessTopic)); | ||
| metricTypeMap.put(BYTES_OUT_PER_SEC, new MeterWrapper(BYTES_OUT_PER_SEC, "bytes", tagsForClassicTopic)); | ||
| metricTypeMap.put(BYTES_OUT_PER_SEC_DISKLESS_TOPIC, new MeterWrapper(BYTES_OUT_PER_SEC, "bytes", tagsForDisklessTopic)); | ||
| metricTypeMap.put(BYTES_IN_PER_SEC, new MeterWrapper(BYTES_IN_PER_SEC, "bytes")); | ||
| metricTypeMap.put(BYTES_OUT_PER_SEC, new MeterWrapper(BYTES_OUT_PER_SEC, "bytes")); | ||
| metricTypeMap.put(BYTES_REJECTED_PER_SEC, new MeterWrapper(BYTES_REJECTED_PER_SEC, "bytes")); | ||
| metricTypeMap.put(FAILED_PRODUCE_REQUESTS_PER_SEC, new MeterWrapper(FAILED_PRODUCE_REQUESTS_PER_SEC, "requests")); | ||
| metricTypeMap.put(FAILED_FETCH_REQUESTS_PER_SEC, new MeterWrapper(FAILED_FETCH_REQUESTS_PER_SEC, "requests")); | ||
|
|
@@ -133,15 +128,6 @@ public void closeMetric(String metricName) { | |
| if (mw != null) mw.close(); | ||
| GaugeWrapper mg = metricGaugeTypeMap.get(metricName); | ||
| if (mg != null) mg.close(); | ||
|
|
||
| if (BYTES_IN_PER_SEC.equals(metricName)) { | ||
| mw = metricTypeMap.get(BYTES_IN_PER_SEC_DISKLESS_TOPIC); | ||
| if (mw != null) mw.close(); | ||
| } | ||
| if (BYTES_OUT_PER_SEC.equals(metricName)) { | ||
| mw = metricTypeMap.get(BYTES_OUT_PER_SEC_DISKLESS_TOPIC); | ||
| if (mw != null) mw.close(); | ||
| } | ||
| } | ||
|
|
||
| public void close() { | ||
|
|
@@ -162,20 +148,82 @@ public Meter messagesInRate() { | |
| return metricTypeMap.get(MESSAGE_IN_PER_SEC).meter(); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the bytes in rate meter for topic-specific metrics. | ||
| * <p> | ||
| * This is a convenience method equivalent to calling {@link #bytesInRate(boolean)} with {@code false}. | ||
| * It is primarily intended for topic-specific metrics (when a topic name was specified in the constructor), | ||
| * where the {@code isDiskless} parameter is ignored and the meter is tagged only by topic name. | ||
| * <p> | ||
| * For all-topics stats (broker-level aggregation), prefer using {@link #bytesInRate(boolean)} explicitly | ||
| * to make the topic type tagging intention clear. | ||
| * | ||
| * @return the bytes in rate meter | ||
| */ | ||
| public Meter bytesInRate() { | ||
| return metricTypeMap.get(BYTES_IN_PER_SEC).meter(); | ||
| } | ||
|
|
||
| public Meter bytesInRateDisklessTopicType() { | ||
| return metricTypeMap.get(BYTES_IN_PER_SEC_DISKLESS_TOPIC).meter(); | ||
| } | ||
|
|
||
| return bytesInRate(false); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the bytes in rate meter, optionally tagged by topic type. | ||
| * <p> | ||
| * Behavior depends on whether this is an all-topics aggregated metric or topic-specific: | ||
| * <ul> | ||
| * <li><b>All-topics stats</b> (when no topic name was specified in constructor): | ||
| * Returns a meter tagged with {@code topicType=classic} or {@code topicType=diskless}. | ||
| * Separate meters are maintained for each topic type to allow independent tracking | ||
| * of classic vs diskless topic throughput at the broker level.</li> | ||
| * <li><b>Topic-specific stats</b> (when a topic name was specified in constructor): | ||
| * The {@code isDiskless} parameter is ignored. Returns the same meter regardless | ||
| * of the parameter value, as topic-specific metrics are only tagged by the topic | ||
| * name and do not include a {@code topicType} tag.</li> | ||
| * </ul> | ||
| * | ||
| * @param isDiskless if true, returns the diskless-tagged meter (for all-topics stats only); | ||
| * if false, returns the classic-tagged meter (for all-topics stats only) | ||
| * @return the bytes in rate meter | ||
| */ | ||
| public Meter bytesInRate(boolean isDiskless) { | ||
| return metricTypeMap.get(BYTES_IN_PER_SEC).meter(isDiskless); | ||
| } | ||
|
jeqo marked this conversation as resolved.
|
||
|
|
||
| /** | ||
| * Returns the bytes out rate meter for topic-specific metrics. | ||
| * <p> | ||
| * This is a convenience method equivalent to calling {@link #bytesOutRate(boolean)} with {@code false}. | ||
| * It is primarily intended for topic-specific metrics (when a topic name was specified in the constructor), | ||
| * where the {@code isDiskless} parameter is ignored and the meter is tagged only by topic name. | ||
| * <p> | ||
| * For all-topics stats (broker-level aggregation), prefer using {@link #bytesOutRate(boolean)} explicitly | ||
| * to make the topic type tagging intention clear. | ||
| * | ||
| * @return the bytes out rate meter | ||
| */ | ||
| public Meter bytesOutRate() { | ||
| return metricTypeMap.get(BYTES_OUT_PER_SEC).meter(); | ||
| } | ||
|
|
||
| public Meter bytesOutRateDisklessTopicType() { | ||
| return metricTypeMap.get(BYTES_OUT_PER_SEC_DISKLESS_TOPIC).meter(); | ||
| return bytesOutRate(false); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the bytes out rate meter, optionally tagged by topic type. | ||
| * <p> | ||
| * Behavior depends on whether this is an all-topics aggregated metric or topic-specific: | ||
| * <ul> | ||
| * <li><b>All-topics stats</b> (when no topic name was specified in constructor): | ||
| * Returns a meter tagged with {@code topicType=classic} or {@code topicType=diskless}. | ||
| * Separate meters are maintained for each topic type to allow independent tracking | ||
| * of classic vs diskless topic throughput at the broker level.</li> | ||
| * <li><b>Topic-specific stats</b> (when a topic name was specified in constructor): | ||
| * The {@code isDiskless} parameter is ignored. Returns the same meter regardless | ||
| * of the parameter value, as topic-specific metrics are only tagged by the topic | ||
| * name and do not include a {@code topicType} tag.</li> | ||
| * </ul> | ||
| * | ||
| * @param isDiskless if true, returns the diskless-tagged meter (for all-topics stats only); | ||
| * if false, returns the classic-tagged meter (for all-topics stats only) | ||
| * @return the bytes out rate meter | ||
| */ | ||
| public Meter bytesOutRate(boolean isDiskless) { | ||
| return metricTypeMap.get(BYTES_OUT_PER_SEC).meter(isDiskless); | ||
| } | ||
|
jeqo marked this conversation as resolved.
jeqo marked this conversation as resolved.
|
||
|
|
||
| public Meter bytesRejectedRate() { | ||
|
|
@@ -375,8 +423,19 @@ private class MeterWrapper { | |
| private final String eventType; | ||
| private final Map<String, String> metricTags; | ||
| private volatile Meter lazyMeter; | ||
| private final ConcurrentHashMap<String, Meter> lazyMetersWithTopicType = new ConcurrentHashMap<>(); | ||
| private final Lock meterLock = new ReentrantLock(); | ||
|
|
||
| // Closed flag to prevent race condition between meter() and close(): | ||
| // Without this flag, the following race is possible: | ||
| // Thread A: meter() -> computeIfAbsent() creates a new meter | ||
| // Thread B: close() -> forEach() removes meters -> clear() | ||
| // If Thread A's computeIfAbsent() executes after Thread B's forEach() but before clear(), | ||
| // a new meter would be created and registered with metricsGroup but never removed, | ||
| // causing a metric leak. The closed flag ensures that once close() starts, no new | ||
| // meters can be created. | ||
| private volatile boolean closed = false; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not a huge fan of adding this. Although adding another I think we could combine these two into private class LazyMeterWrapper {
private Meter lazyMeter;
private boolean closed;
}And instead of the two fields we'd have private volatile LazyMeterWrapper lazyMeterWrapper;This would save from one read of |
||
|
|
||
| public MeterWrapper(String metricType, String eventType) { | ||
| this(metricType, eventType, BrokerTopicMetrics.this.tags); | ||
| } | ||
|
|
@@ -386,19 +445,67 @@ public MeterWrapper(String metricType, | |
| Map<String, String> metricTags) { | ||
| this.metricType = metricType; | ||
| this.eventType = eventType; | ||
| this.metricTags = new java.util.HashMap<>(metricTags); | ||
| this.metricTags = new HashMap<>(metricTags); | ||
| if (this.metricTags.isEmpty()) { | ||
| meter(); // greedily initialize the general topic metrics | ||
|
jeqo marked this conversation as resolved.
|
||
| } else if (this.metricTags.size() == 1 && this.metricTags.containsKey("topicType")) { | ||
| meter(); // the metrics that only has topicType tag are also global | ||
| } | ||
| } | ||
|
|
||
| public Meter meter() { | ||
| return meter(false); | ||
| } | ||
|
|
||
| public Meter meter(boolean isDiskless) { | ||
| // Check closed flag first for both all-topics and topic-specific metrics. | ||
| // Throwing IllegalStateException prevents NPE at call sites (e.g., .mark()) | ||
| // and makes debugging easier by providing a clear error message. | ||
| if (closed) { | ||
| throw new IllegalStateException( | ||
| "Cannot access meter for metric '" + metricType + "' after BrokerTopicMetrics has been closed"); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this really needed? If the case of Let's move this? I don't see where NPE could be thrown in any code paths in this method, if moved. If the meter has been closed, Also, the the error message looks the same even if thrown later. |
||
|
|
||
| // Only add topicType tag for allTopicsStats (when no topic name was specified) | ||
| if (isAllTopicsStats) { | ||
| String topicType = isDiskless ? "diskless" : "classic"; | ||
| // Fast path: check if meter already exists (lock-free read) | ||
| Meter meter = lazyMetersWithTopicType.get(topicType); | ||
| if (meter != null) { | ||
| return meter; | ||
| } | ||
|
|
||
| // Slow path: create meter if not exists, protected by lock to prevent | ||
| // race with close() and avoid meter leaks | ||
| meterLock.lock(); | ||
| try { | ||
| // Double-check after acquiring lock | ||
| if (closed) { | ||
| throw new IllegalStateException( | ||
| "Cannot access meter for metric '" + metricType + "' after BrokerTopicMetrics has been closed"); | ||
| } | ||
| // Use computeIfAbsent for atomic insert, but we're already under lock | ||
| // so this is safe from the close() race | ||
| return lazyMetersWithTopicType.computeIfAbsent(topicType, tt -> { | ||
| Map<String, String> tagsWithTopicType = new HashMap<>(metricTags); | ||
| tagsWithTopicType.put("topicType", tt); | ||
| return metricsGroup.newMeter(metricType, eventType, TimeUnit.SECONDS, tagsWithTopicType); | ||
| }); | ||
| } finally { | ||
| meterLock.unlock(); | ||
| } | ||
| } | ||
|
|
||
| // For topic-specific metrics, use the meter without topicType tag. | ||
| // The isDiskless parameter is intentionally ignored here as documented | ||
| // in the public bytesInRate(boolean) and bytesOutRate(boolean) methods. | ||
| Meter meter = lazyMeter; | ||
| if (meter == null) { | ||
| meterLock.lock(); | ||
| try { | ||
| // Double-check after acquiring lock | ||
| if (closed) { | ||
| throw new IllegalStateException( | ||
| "Cannot access meter for metric '" + metricType + "' after BrokerTopicMetrics has been closed"); | ||
| } | ||
| meter = lazyMeter; | ||
| if (meter == null) { | ||
| meter = metricsGroup.newMeter(metricType, eventType, TimeUnit.SECONDS, metricTags); | ||
|
|
@@ -414,10 +521,20 @@ public Meter meter() { | |
| public void close() { | ||
| meterLock.lock(); | ||
| try { | ||
| // Set closed flag first to prevent new meters from being created | ||
| closed = true; | ||
|
|
||
| if (lazyMeter != null) { | ||
| metricsGroup.removeMetric(metricType, metricTags); | ||
| lazyMeter = null; | ||
| } | ||
| // Close all topic-type-specific meters | ||
| lazyMetersWithTopicType.forEach((topicType, meter) -> { | ||
| Map<String, String> tagsWithTopicType = new HashMap<>(metricTags); | ||
| tagsWithTopicType.put("topicType", topicType); | ||
| metricsGroup.removeMetric(metricType, tagsWithTopicType); | ||
| }); | ||
| lazyMetersWithTopicType.clear(); | ||
| } finally { | ||
| meterLock.unlock(); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make this consistent,
bytesInRate()in both.