/*
 * Decompiled with CFR 0.152.
 */
package kafka.admin;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.admin.ConfigCommand;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.telemetry.ClientTelemetry;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import org.junit.jupiter.api.Assertions;

public class ClientTelemetryTest {
    @ClusterTest(types={Type.KRAFT}, brokers=3, serverProperties={@ClusterConfigProperty(key="metric.reporters", value="kafka.admin.ClientTelemetryTest$GetIdClientTelemetry")})
    public void testClientInstanceId(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", clusterInstance.bootstrapServers());
        configs.put("enable.metrics.push", true);
        try (Admin admin = Admin.create(configs);){
            String testTopicName = "test_topic";
            admin.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 1)));
            clusterInstance.waitForTopic(testTopicName, 1);
            HashMap<String, String> producerConfigs = new HashMap<String, String>();
            producerConfigs.put("bootstrap.servers", clusterInstance.bootstrapServers());
            producerConfigs.put("key.serializer", StringSerializer.class.getName());
            producerConfigs.put("value.serializer", StringSerializer.class.getName());
            try (KafkaProducer producer = new KafkaProducer(producerConfigs);){
                producer.send(new ProducerRecord(testTopicName, Integer.valueOf(0), null, (Object)"bar")).get();
                producer.flush();
                Uuid producerClientId = producer.clientInstanceId(Duration.ofSeconds(3L));
                Assertions.assertNotNull((Object)producerClientId);
                Assertions.assertEquals((Object)producerClientId, (Object)producer.clientInstanceId(Duration.ofSeconds(3L)));
            }
            HashMap<String, String> consumerConfigs = new HashMap<String, String>();
            consumerConfigs.put("bootstrap.servers", clusterInstance.bootstrapServers());
            consumerConfigs.put("group.id", UUID.randomUUID().toString());
            consumerConfigs.put("key.deserializer", StringDeserializer.class.getName());
            consumerConfigs.put("value.deserializer", StringDeserializer.class.getName());
            try (KafkaConsumer consumer = new KafkaConsumer(consumerConfigs);){
                consumer.assign(Collections.singletonList(new TopicPartition(testTopicName, 0)));
                consumer.seekToBeginning(Collections.singletonList(new TopicPartition(testTopicName, 0)));
                Uuid consumerClientId = consumer.clientInstanceId(Duration.ofSeconds(5L));
                Assertions.assertNull((Object)consumerClientId);
                ArrayList<String> values = new ArrayList<String>();
                ConsumerRecords records = consumer.poll(Duration.ofSeconds(1L));
                for (ConsumerRecord record : records) {
                    values.add((String)record.value());
                }
                Assertions.assertEquals((int)1, (int)values.size());
                Assertions.assertEquals((Object)"bar", values.get(0));
                consumerClientId = consumer.clientInstanceId(Duration.ofSeconds(3L));
                Assertions.assertNotNull((Object)consumerClientId);
                Assertions.assertEquals((Object)consumerClientId, (Object)consumer.clientInstanceId(Duration.ofSeconds(3L)));
            }
            Uuid uuid = admin.clientInstanceId(Duration.ofSeconds(3L));
            Assertions.assertNotNull((Object)uuid);
            Assertions.assertEquals((Object)uuid, (Object)admin.clientInstanceId(Duration.ofSeconds(3L)));
        }
    }

    @ClusterTest(types={Type.CO_KRAFT, Type.KRAFT})
    public void testIntervalMsParser(ClusterInstance clusterInstance) {
        List<String> alterOpts = Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--alter", "--entity-type", "client-metrics", "--entity-name", "test", "--add-config", "interval.ms=bbb");
        try (Admin client = clusterInstance.admin();){
            ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(ClientTelemetryTest.toArray(alterOpts));
            Throwable e = Assertions.assertThrows(ExecutionException.class, () -> ConfigCommand.alterConfig((Admin)client, (ConfigCommand.ConfigCommandOptions)addOpts));
            Assertions.assertTrue((boolean)e.getMessage().contains(InvalidConfigurationException.class.getSimpleName()));
        }
    }

    @ClusterTest(types={Type.KRAFT})
    public void testMetrics(ClusterInstance clusterInstance) {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", clusterInstance.bootstrapServers());
        List<String> expectedMetricsName = Arrays.asList("request-size-max", "io-wait-ratio", "response-total", "version", "io-time-ns-avg", "network-io-rate");
        try (Admin admin = Admin.create(configs);){
            Set actualMetricsName = admin.metrics().keySet().stream().map(MetricName::name).collect(Collectors.toSet());
            expectedMetricsName.forEach(expectedName -> Assertions.assertTrue((boolean)actualMetricsName.contains(expectedName), (String)String.format("actual metrics name: %s dont contains expected: %s", actualMetricsName, expectedName)));
            Assertions.assertTrue((boolean)actualMetricsName.containsAll(expectedMetricsName));
        }
    }

    private static String[] toArray(List<String> ... lists) {
        return (String[])Stream.of(lists).flatMap(Collection::stream).toArray(String[]::new);
    }

    public static class GetIdClientTelemetry
    implements ClientTelemetry,
    MetricsReporter {
        public void init(List<KafkaMetric> metrics) {
        }

        public void metricChange(KafkaMetric metric) {
        }

        public void metricRemoval(KafkaMetric metric) {
        }

        public void close() {
        }

        public void configure(Map<String, ?> configs) {
        }

        public ClientTelemetryReceiver clientReceiver() {
            return (context, payload) -> {};
        }
    }
}

