/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class MetricsIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final long timeout = 60000L;
    private static final String APPLICATION_ID_VALUE = "stream-metrics-test";
    private static final String STREAM_CLIENT_NODE_METRICS = "stream-metrics";
    private static final String STREAM_THREAD_NODE_METRICS = "stream-metrics";
    private static final String STREAM_TASK_NODE_METRICS = "stream-task-metrics";
    private static final String STREAM_PROCESSOR_NODE_METRICS = "stream-processor-node-metrics";
    private static final String STREAM_CACHE_NODE_METRICS = "stream-record-cache-metrics";
    private static final String STREAM_STORE_IN_MEMORY_STATE_METRICS = "stream-in-memory-state-metrics";
    private static final String STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS = "stream-in-memory-lru-state-metrics";
    private static final String STREAM_STORE_ROCKSDB_STATE_METRICS = "stream-rocksdb-state-metrics";
    private static final String STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS = "stream-rocksdb-window-state-metrics";
    private static final String STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS = "stream-rocksdb-session-state-metrics";
    private static final String VERSION = "version";
    private static final String COMMIT_ID = "commit-id";
    private static final String APPLICATION_ID = "application-id";
    private static final String TOPOLOGY_DESCRIPTION = "topology-description";
    private static final String STATE = "state";
    private static final String PUT_LATENCY_AVG = "put-latency-avg";
    private static final String PUT_LATENCY_MAX = "put-latency-max";
    private static final String PUT_IF_ABSENT_LATENCY_AVG = "put-if-absent-latency-avg";
    private static final String PUT_IF_ABSENT_LATENCY_MAX = "put-if-absent-latency-max";
    private static final String GET_LATENCY_AVG = "get-latency-avg";
    private static final String GET_LATENCY_MAX = "get-latency-max";
    private static final String DELETE_LATENCY_AVG = "delete-latency-avg";
    private static final String DELETE_LATENCY_MAX = "delete-latency-max";
    private static final String PUT_ALL_LATENCY_AVG = "put-all-latency-avg";
    private static final String PUT_ALL_LATENCY_MAX = "put-all-latency-max";
    private static final String ALL_LATENCY_AVG = "all-latency-avg";
    private static final String ALL_LATENCY_MAX = "all-latency-max";
    private static final String RANGE_LATENCY_AVG = "range-latency-avg";
    private static final String RANGE_LATENCY_MAX = "range-latency-max";
    private static final String FLUSH_LATENCY_AVG = "flush-latency-avg";
    private static final String FLUSH_LATENCY_MAX = "flush-latency-max";
    private static final String RESTORE_LATENCY_AVG = "restore-latency-avg";
    private static final String RESTORE_LATENCY_MAX = "restore-latency-max";
    private static final String PUT_RATE = "put-rate";
    private static final String PUT_TOTAL = "put-total";
    private static final String PUT_IF_ABSENT_RATE = "put-if-absent-rate";
    private static final String PUT_IF_ABSENT_TOTAL = "put-if-absent-total";
    private static final String GET_RATE = "get-rate";
    private static final String DELETE_RATE = "delete-rate";
    private static final String DELETE_TOTAL = "delete-total";
    private static final String PUT_ALL_RATE = "put-all-rate";
    private static final String PUT_ALL_TOTAL = "put-all-total";
    private static final String ALL_RATE = "all-rate";
    private static final String ALL_TOTAL = "all-total";
    private static final String RANGE_RATE = "range-rate";
    private static final String RANGE_TOTAL = "range-total";
    private static final String FLUSH_RATE = "flush-rate";
    private static final String FLUSH_TOTAL = "flush-total";
    private static final String RESTORE_RATE = "restore-rate";
    private static final String RESTORE_TOTAL = "restore-total";
    private static final String PROCESS_LATENCY_AVG = "process-latency-avg";
    private static final String PROCESS_LATENCY_MAX = "process-latency-max";
    private static final String PUNCTUATE_LATENCY_AVG = "punctuate-latency-avg";
    private static final String PUNCTUATE_LATENCY_MAX = "punctuate-latency-max";
    private static final String CREATE_LATENCY_AVG = "create-latency-avg";
    private static final String CREATE_LATENCY_MAX = "create-latency-max";
    private static final String DESTROY_LATENCY_AVG = "destroy-latency-avg";
    private static final String DESTROY_LATENCY_MAX = "destroy-latency-max";
    private static final String PROCESS_RATE = "process-rate";
    private static final String PROCESS_TOTAL = "process-total";
    private static final String PUNCTUATE_RATE = "punctuate-rate";
    private static final String PUNCTUATE_TOTAL = "punctuate-total";
    private static final String CREATE_RATE = "create-rate";
    private static final String CREATE_TOTAL = "create-total";
    private static final String DESTROY_RATE = "destroy-rate";
    private static final String DESTROY_TOTAL = "destroy-total";
    private static final String FORWARD_TOTAL = "forward-total";
    private static final String STREAM_STRING = "stream";
    private static final String COMMIT_LATENCY_AVG = "commit-latency-avg";
    private static final String COMMIT_LATENCY_MAX = "commit-latency-max";
    private static final String POLL_LATENCY_AVG = "poll-latency-avg";
    private static final String POLL_LATENCY_MAX = "poll-latency-max";
    private static final String COMMIT_RATE = "commit-rate";
    private static final String COMMIT_TOTAL = "commit-total";
    private static final String POLL_RATE = "poll-rate";
    private static final String POLL_TOTAL = "poll-total";
    private static final String TASK_CREATED_RATE = "task-created-rate";
    private static final String TASK_CREATED_TOTAL = "task-created-total";
    private static final String TASK_CLOSED_RATE = "task-closed-rate";
    private static final String TASK_CLOSED_TOTAL = "task-closed-total";
    private static final String SKIPPED_RECORDS_RATE = "skipped-records-rate";
    private static final String SKIPPED_RECORDS_TOTAL = "skipped-records-total";
    private static final String RECORD_LATENESS_AVG = "record-lateness-avg";
    private static final String RECORD_LATENESS_MAX = "record-lateness-max";
    private static final String HIT_RATIO_AVG_BEFORE_24 = "hitRatio-avg";
    private static final String HIT_RATIO_MIN_BEFORE_24 = "hitRatio-min";
    private static final String HIT_RATIO_MAX_BEFORE_24 = "hitRatio-max";
    private static final String HIT_RATIO_AVG = "hit-ratio-avg";
    private static final String HIT_RATIO_MIN = "hit-ratio-min";
    private static final String HIT_RATIO_MAX = "hit-ratio-max";
    private static final String BYTES_WRITTEN_RATE = "bytes-written-rate";
    private static final String BYTES_WRITTEN_TOTAL = "bytes-written-total";
    private static final String BYTES_READ_RATE = "bytes-read-rate";
    private static final String BYTES_READ_TOTAL = "bytes-read-total";
    private static final String MEMTABLE_BYTES_FLUSHED_RATE = "memtable-bytes-flushed-rate";
    private static final String MEMTABLE_BYTES_FLUSHED_TOTAL = "memtable-bytes-flushed-total";
    private static final String MEMTABLE_HIT_RATIO = "memtable-hit-ratio";
    private static final String MEMTABLE_FLUSH_TIME_AVG = "memtable-flush-time-avg";
    private static final String MEMTABLE_FLUSH_TIME_MIN = "memtable-flush-time-min";
    private static final String MEMTABLE_FLUSH_TIME_MAX = "memtable-flush-time-max";
    private static final String WRITE_STALL_DURATION_AVG = "write-stall-duration-avg";
    private static final String WRITE_STALL_DURATION_TOTAL = "write-stall-duration-total";
    private static final String BLOCK_CACHE_DATA_HIT_RATIO = "block-cache-data-hit-ratio";
    private static final String BLOCK_CACHE_INDEX_HIT_RATIO = "block-cache-index-hit-ratio";
    private static final String BLOCK_CACHE_FILTER_HIT_RATIO = "block-cache-filter-hit-ratio";
    private static final String BYTES_READ_DURING_COMPACTION_RATE = "bytes-read-compaction-rate";
    private static final String BYTES_WRITTEN_DURING_COMPACTION_RATE = "bytes-written-compaction-rate";
    private static final String COMPACTION_TIME_AVG = "compaction-time-avg";
    private static final String COMPACTION_TIME_MIN = "compaction-time-min";
    private static final String COMPACTION_TIME_MAX = "compaction-time-max";
    private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
    private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors-total";
    private static final String TIME_WINDOWED_AGGREGATED_STREAM_STORE = "time-windowed-aggregated-stream-store";
    private static final String SESSION_AGGREGATED_STREAM_STORE = "session-aggregated-stream-store";
    private static final String MY_STORE_IN_MEMORY = "myStoreInMemory";
    private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
    private static final String MY_STORE_LRU_MAP = "myStoreLruMap";
    private static final String STREAM_INPUT = "STREAM_INPUT";
    private static final String STREAM_OUTPUT_1 = "STREAM_OUTPUT_1";
    private static final String STREAM_OUTPUT_2 = "STREAM_OUTPUT_2";
    private static final String STREAM_OUTPUT_3 = "STREAM_OUTPUT_3";
    private static final String STREAM_OUTPUT_4 = "STREAM_OUTPUT_4";
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", APPLICATION_ID_VALUE);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("metrics.recording.level", Sensor.RecordingLevel.DEBUG.name);
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0xA00000L);
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
    }

    @After
    public void after() throws InterruptedException {
        CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
    }

    private void startApplication() throws InterruptedException {
        Topology topology = this.builder.build();
        this.kafkaStreams = new KafkaStreams(topology, this.streamsConfiguration);
        this.verifyStateMetric(KafkaStreams.State.CREATED);
        this.verifyTopologyDescriptionMetric(topology.describe().toString());
        this.verifyApplicationIdMetric(APPLICATION_ID_VALUE);
        this.kafkaStreams.start();
        TestUtils.waitForCondition(() -> this.kafkaStreams.state() == KafkaStreams.State.RUNNING, (long)60000L, () -> "Kafka Streams application did not reach state RUNNING in 60000 ms");
    }

    private void produceRecordsForTwoSegments(Duration segmentInterval) throws Exception {
        MockTime mockTime = new MockTime(Math.max(segmentInterval.toMillis(), 60000L));
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT, Collections.singletonList(new KeyValue((Object)1, (Object)"A")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), mockTime.milliseconds());
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT, Collections.singletonList(new KeyValue((Object)1, (Object)"B")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), mockTime.milliseconds());
    }

    private void waitUntilAllRecordsAreConsumed() throws Exception {
        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)"consumerApp", LongDeserializer.class, LongDeserializer.class, (Properties)new Properties()), STREAM_OUTPUT_1, 2);
    }

    private void closeApplication() throws Exception {
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
        long timeout = 60000L;
        TestUtils.waitForCondition(() -> this.kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING, (long)60000L, () -> "Kafka Streams application did not reach state NOT_RUNNING in 60000 ms");
    }

    @Test
    public void shouldAddMetricsOnAllLevelsWithBuiltInMetricsVersion0100To23() throws Exception {
        this.shouldAddMetricsOnAllLevels("0.10.0-2.3");
    }

    private void shouldAddMetricsOnAllLevels(String builtInMetricsVersion) throws Exception {
        this.builder.stream(STREAM_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).to(STREAM_OUTPUT_1, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.builder.table(STREAM_OUTPUT_1, Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)MY_STORE_IN_MEMORY)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_2);
        this.builder.table(STREAM_OUTPUT_2, Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_3);
        this.builder.table(STREAM_OUTPUT_3, Materialized.as((KeyValueBytesStoreSupplier)Stores.lruMap((String)MY_STORE_LRU_MAP, (int)10000)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_4);
        this.startApplication();
        this.verifyStateMetric(KafkaStreams.State.RUNNING);
        this.checkThreadLevelMetrics();
        this.checkTaskLevelMetrics();
        this.checkProcessorLevelMetrics();
        this.checkKeyValueStoreMetricsByGroup(STREAM_STORE_IN_MEMORY_STATE_METRICS);
        this.checkKeyValueStoreMetricsByGroup(STREAM_STORE_ROCKSDB_STATE_METRICS);
        this.checkKeyValueStoreMetricsByGroup(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS);
        this.checkRocksDBMetricsByTag("rocksdb-state-id");
        this.checkCacheMetrics(builtInMetricsVersion);
        this.closeApplication();
        this.checkMetricsDeregistration();
    }

    @Test
    public void shouldAddMetricsForWindowStore() throws Exception {
        Duration windowSize = Duration.ofMillis(50L);
        this.builder.stream(STREAM_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).groupByKey().windowedBy((Windows)TimeWindows.of((Duration)windowSize).grace(Duration.ZERO)).aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue, Materialized.as((String)TIME_WINDOWED_AGGREGATED_STREAM_STORE).withValueSerde(Serdes.Long()).withRetention(windowSize)).toStream().map((key, value) -> KeyValue.pair((Object)value, (Object)value)).to(STREAM_OUTPUT_1, Produced.with((Serde)Serdes.Long(), (Serde)Serdes.Long()));
        this.produceRecordsForTwoSegments(windowSize);
        this.startApplication();
        this.verifyStateMetric(KafkaStreams.State.RUNNING);
        this.waitUntilAllRecordsAreConsumed();
        this.checkWindowStoreMetrics();
        this.checkRocksDBMetricsByTag("rocksdb-window-state-id");
        this.closeApplication();
        this.checkMetricsDeregistration();
    }

    @Test
    public void shouldAddMetricsForSessionStore() throws Exception {
        Duration inactivityGap = Duration.ofMillis(50L);
        this.builder.stream(STREAM_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).groupByKey().windowedBy(SessionWindows.with((Duration)inactivityGap).grace(Duration.ZERO)).aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue, (aggKey, leftAggValue, rightAggValue) -> leftAggValue, Materialized.as((String)SESSION_AGGREGATED_STREAM_STORE).withValueSerde(Serdes.Long()).withRetention(inactivityGap)).toStream().map((key, value) -> KeyValue.pair((Object)value, (Object)value)).to(STREAM_OUTPUT_1, Produced.with((Serde)Serdes.Long(), (Serde)Serdes.Long()));
        this.produceRecordsForTwoSegments(inactivityGap);
        this.startApplication();
        this.verifyStateMetric(KafkaStreams.State.RUNNING);
        this.waitUntilAllRecordsAreConsumed();
        this.checkSessionStoreMetrics();
        this.checkRocksDBMetricsByTag("rocksdb-session-state-id");
        this.closeApplication();
        this.checkMetricsDeregistration();
    }

    private void verifyStateMetric(KafkaStreams.State state) {
        List metricsList = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().name().equals(STATE) && m.metricName().group().equals("stream-metrics")).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)metricsList.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((Metric)metricsList.get(0)).metricValue(), (Matcher)CoreMatchers.is((Object)state));
    }

    private void verifyTopologyDescriptionMetric(String topologyDescription) {
        List metricsList = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().name().equals(TOPOLOGY_DESCRIPTION) && m.metricName().group().equals("stream-metrics")).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)metricsList.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((Metric)metricsList.get(0)).metricValue(), (Matcher)CoreMatchers.is((Object)topologyDescription));
    }

    private void verifyApplicationIdMetric(String applicationId) {
        List metricsList = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().name().equals(APPLICATION_ID) && m.metricName().group().equals("stream-metrics")).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)metricsList.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((Metric)metricsList.get(0)).metricValue(), (Matcher)CoreMatchers.is((Object)applicationId));
    }

    private void checkThreadLevelMetrics() {
        List<Metric> listMetricThread = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals("stream-metrics")).collect(Collectors.toList());
        this.checkMetricByName(listMetricThread, VERSION, 1);
        this.checkMetricByName(listMetricThread, COMMIT_ID, 1);
        this.checkMetricByName(listMetricThread, APPLICATION_ID, 1);
        this.checkMetricByName(listMetricThread, TOPOLOGY_DESCRIPTION, 1);
        this.checkMetricByName(listMetricThread, STATE, 1);
        this.checkMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricThread, POLL_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricThread, POLL_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 1);
        this.checkMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 1);
        this.checkMetricByName(listMetricThread, COMMIT_RATE, 1);
        this.checkMetricByName(listMetricThread, COMMIT_TOTAL, 1);
        this.checkMetricByName(listMetricThread, POLL_RATE, 1);
        this.checkMetricByName(listMetricThread, POLL_TOTAL, 1);
        this.checkMetricByName(listMetricThread, PROCESS_RATE, 1);
        this.checkMetricByName(listMetricThread, PROCESS_TOTAL, 1);
        this.checkMetricByName(listMetricThread, PUNCTUATE_RATE, 1);
        this.checkMetricByName(listMetricThread, PUNCTUATE_TOTAL, 1);
        this.checkMetricByName(listMetricThread, TASK_CREATED_RATE, 1);
        this.checkMetricByName(listMetricThread, TASK_CREATED_TOTAL, 1);
        this.checkMetricByName(listMetricThread, TASK_CLOSED_RATE, 1);
        this.checkMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1);
        this.checkMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1);
        this.checkMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1);
    }

    private void checkTaskLevelMetrics() {
        List<Metric> listMetricTask = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList());
        this.checkMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5);
        this.checkMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5);
        this.checkMetricByName(listMetricTask, COMMIT_RATE, 5);
        this.checkMetricByName(listMetricTask, COMMIT_TOTAL, 5);
        this.checkMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
        this.checkMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
    }

    private void checkProcessorLevelMetrics() {
        List<Metric> listMetricProcessor = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList());
        this.checkMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18);
        this.checkMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18);
        this.checkMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18);
        this.checkMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_MAX, 18);
        this.checkMetricByName(listMetricProcessor, CREATE_LATENCY_AVG, 18);
        this.checkMetricByName(listMetricProcessor, CREATE_LATENCY_MAX, 18);
        this.checkMetricByName(listMetricProcessor, DESTROY_LATENCY_AVG, 18);
        this.checkMetricByName(listMetricProcessor, DESTROY_LATENCY_MAX, 18);
        this.checkMetricByName(listMetricProcessor, PROCESS_RATE, 18);
        this.checkMetricByName(listMetricProcessor, PROCESS_TOTAL, 18);
        this.checkMetricByName(listMetricProcessor, PUNCTUATE_RATE, 18);
        this.checkMetricByName(listMetricProcessor, PUNCTUATE_TOTAL, 18);
        this.checkMetricByName(listMetricProcessor, CREATE_RATE, 18);
        this.checkMetricByName(listMetricProcessor, CREATE_TOTAL, 18);
        this.checkMetricByName(listMetricProcessor, DESTROY_RATE, 18);
        this.checkMetricByName(listMetricProcessor, DESTROY_TOTAL, 18);
        this.checkMetricByName(listMetricProcessor, FORWARD_TOTAL, 18);
    }

    private void checkRocksDBMetricsByTag(String tag) {
        List<Metric> listMetricStore = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals("stream-state-metrics") && m.metricName().tags().containsKey(tag)).collect(Collectors.toList());
        this.checkMetricByName(listMetricStore, BYTES_WRITTEN_RATE, 1);
        this.checkMetricByName(listMetricStore, BYTES_WRITTEN_TOTAL, 1);
        this.checkMetricByName(listMetricStore, BYTES_READ_RATE, 1);
        this.checkMetricByName(listMetricStore, BYTES_READ_TOTAL, 1);
        this.checkMetricByName(listMetricStore, MEMTABLE_BYTES_FLUSHED_RATE, 1);
        this.checkMetricByName(listMetricStore, MEMTABLE_BYTES_FLUSHED_TOTAL, 1);
        this.checkMetricByName(listMetricStore, MEMTABLE_HIT_RATIO, 1);
        this.checkMetricByName(listMetricStore, WRITE_STALL_DURATION_AVG, 1);
        this.checkMetricByName(listMetricStore, WRITE_STALL_DURATION_TOTAL, 1);
        this.checkMetricByName(listMetricStore, BLOCK_CACHE_DATA_HIT_RATIO, 1);
        this.checkMetricByName(listMetricStore, BLOCK_CACHE_INDEX_HIT_RATIO, 1);
        this.checkMetricByName(listMetricStore, BLOCK_CACHE_FILTER_HIT_RATIO, 1);
        this.checkMetricByName(listMetricStore, BYTES_READ_DURING_COMPACTION_RATE, 1);
        this.checkMetricByName(listMetricStore, BYTES_WRITTEN_DURING_COMPACTION_RATE, 1);
        this.checkMetricByName(listMetricStore, NUMBER_OF_OPEN_FILES, 1);
        this.checkMetricByName(listMetricStore, NUMBER_OF_FILE_ERRORS, 1);
    }

    private void checkKeyValueStoreMetricsByGroup(String group) {
        List<Metric> listMetricStore = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(group)).collect(Collectors.toList());
        this.checkMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, GET_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, GET_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, ALL_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, ALL_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, PUT_RATE, 2);
        this.checkMetricByName(listMetricStore, PUT_TOTAL, 2);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2);
        this.checkMetricByName(listMetricStore, GET_RATE, 2);
        this.checkMetricByName(listMetricStore, DELETE_RATE, 2);
        this.checkMetricByName(listMetricStore, DELETE_TOTAL, 2);
        this.checkMetricByName(listMetricStore, PUT_ALL_RATE, 2);
        this.checkMetricByName(listMetricStore, PUT_ALL_TOTAL, 2);
        this.checkMetricByName(listMetricStore, ALL_RATE, 2);
        this.checkMetricByName(listMetricStore, ALL_TOTAL, 2);
        this.checkMetricByName(listMetricStore, RANGE_RATE, 2);
        this.checkMetricByName(listMetricStore, RANGE_TOTAL, 2);
        this.checkMetricByName(listMetricStore, FLUSH_RATE, 2);
        this.checkMetricByName(listMetricStore, FLUSH_TOTAL, 2);
        this.checkMetricByName(listMetricStore, RESTORE_RATE, 2);
        this.checkMetricByName(listMetricStore, RESTORE_TOTAL, 2);
    }

    private void checkMetricsDeregistration() {
        List listMetricAfterClosingApp = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)listMetricAfterClosingApp.size(), (Matcher)CoreMatchers.is((Object)0));
    }

    private void checkCacheMetrics(String builtInMetricsVersion) {
        List<Metric> listMetricCache = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList());
        this.checkMetricByName(listMetricCache, builtInMetricsVersion.equals("latest") ? HIT_RATIO_AVG : HIT_RATIO_AVG_BEFORE_24, builtInMetricsVersion.equals("latest") ? 3 : 6);
        this.checkMetricByName(listMetricCache, builtInMetricsVersion.equals("latest") ? HIT_RATIO_MIN : HIT_RATIO_MIN_BEFORE_24, builtInMetricsVersion.equals("latest") ? 3 : 6);
        this.checkMetricByName(listMetricCache, builtInMetricsVersion.equals("latest") ? HIT_RATIO_MAX : HIT_RATIO_MAX_BEFORE_24, builtInMetricsVersion.equals("latest") ? 3 : 6);
    }

    private void checkWindowStoreMetrics() {
        List<Metric> listMetricStore = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS)).collect(Collectors.toList());
        this.checkMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, PUT_RATE, 2);
        this.checkMetricByName(listMetricStore, PUT_TOTAL, 2);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
        this.checkMetricByName(listMetricStore, GET_RATE, 0);
        this.checkMetricByName(listMetricStore, DELETE_RATE, 0);
        this.checkMetricByName(listMetricStore, DELETE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_RATE, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
        this.checkMetricByName(listMetricStore, ALL_RATE, 0);
        this.checkMetricByName(listMetricStore, ALL_TOTAL, 0);
        this.checkMetricByName(listMetricStore, RANGE_RATE, 0);
        this.checkMetricByName(listMetricStore, RANGE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, FLUSH_RATE, 2);
        this.checkMetricByName(listMetricStore, FLUSH_TOTAL, 2);
        this.checkMetricByName(listMetricStore, RESTORE_RATE, 2);
        this.checkMetricByName(listMetricStore, RESTORE_TOTAL, 2);
    }

    private void checkSessionStoreMetrics() {
        List<Metric> listMetricStore = new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS)).collect(Collectors.toList());
        this.checkMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
        this.checkMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
        this.checkMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
        this.checkMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
        this.checkMetricByName(listMetricStore, PUT_RATE, 2);
        this.checkMetricByName(listMetricStore, PUT_TOTAL, 2);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
        this.checkMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
        this.checkMetricByName(listMetricStore, GET_RATE, 0);
        this.checkMetricByName(listMetricStore, DELETE_RATE, 0);
        this.checkMetricByName(listMetricStore, DELETE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_RATE, 0);
        this.checkMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
        this.checkMetricByName(listMetricStore, ALL_RATE, 0);
        this.checkMetricByName(listMetricStore, ALL_TOTAL, 0);
        this.checkMetricByName(listMetricStore, RANGE_RATE, 0);
        this.checkMetricByName(listMetricStore, RANGE_TOTAL, 0);
        this.checkMetricByName(listMetricStore, FLUSH_RATE, 2);
        this.checkMetricByName(listMetricStore, FLUSH_TOTAL, 2);
        this.checkMetricByName(listMetricStore, RESTORE_RATE, 2);
        this.checkMetricByName(listMetricStore, RESTORE_TOTAL, 2);
    }

    private void checkMetricByName(List<Metric> listMetric, String metricName, int numMetric) {
        List metrics = listMetric.stream().filter(m -> m.metricName().name().equals(metricName)).collect(Collectors.toList());
        Assert.assertEquals((String)("Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size()), (long)numMetric, (long)metrics.size());
        for (Metric m2 : metrics) {
            Assert.assertNotNull((String)("Metric:'" + m2.metricName() + "' must be not null"), (Object)m2.metricValue());
        }
    }
}

