/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class MetricsReporterIntegrationTest {
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String STREAM_INPUT = "STREAM_INPUT";
    private static final String STREAM_OUTPUT = "STREAM_OUTPUT";
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    static final Map<String, Object> METRIC_NAME_TO_INITIAL_VALUE = new HashMap<String, Object>();

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void before(TestInfo testInfo) throws InterruptedException {
        this.builder = new StreamsBuilder();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), testInfo);
        String appId = "app-" + safeTestName;
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", appId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("metric.reporters", MetricReporterImpl.class.getName());
    }

    @Test
    public void shouldBeAbleToProvideInitialMetricValueToMetricsReporter() {
        this.builder.stream(STREAM_INPUT, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).to(STREAM_OUTPUT, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        Topology topology = this.builder.build();
        KafkaStreams kafkaStreams = new KafkaStreams(topology, this.streamsConfiguration);
        kafkaStreams.metrics().keySet().forEach(metricName -> {
            Object initialMetricValue = METRIC_NAME_TO_INITIAL_VALUE.get(metricName.name());
            MatcherAssert.assertThat((Object)initialMetricValue, (Matcher)CoreMatchers.notNullValue());
        });
    }

    public static class MetricReporterImpl
    implements MetricsReporter {
        public void configure(Map<String, ?> configs) {
        }

        public void init(List<KafkaMetric> metrics) {
        }

        public void metricChange(KafkaMetric metric) {
            METRIC_NAME_TO_INITIAL_VALUE.put(metric.metricName().name(), metric.metricValue());
        }

        public void metricRemoval(KafkaMetric metric) {
        }

        public void close() {
        }
    }
}

