/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.sink.kafka.util;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.flume.sink.kafka.util.KafkaLocal;
import org.apache.flume.sink.kafka.util.ZooKeeperLocal;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestUtil {
    private static final Logger logger = LoggerFactory.getLogger(TestUtil.class);
    private static final TestUtil instance = new TestUtil();
    private KafkaLocal kafkaServer;
    private boolean externalServers = true;
    private String kafkaServerUrl;
    private String kafkaServerSslUrl;
    private String zkServerUrl;
    private int kafkaLocalPort;
    private int kafkaLocalSslPort;
    private Properties clientProps;
    private int zkLocalPort;
    private KafkaConsumer<String, String> consumer;
    private AdminClient adminClient;

    private TestUtil() {
        this.init();
    }

    public static TestUtil getInstance() {
        return instance;
    }

    private void init() {
        try {
            Properties settings = new Properties();
            InputStream in = Class.class.getResourceAsStream("/testutil.properties");
            if (in != null) {
                settings.load(in);
            }
            this.externalServers = "true".equalsIgnoreCase(settings.getProperty("external-servers"));
            if (this.externalServers) {
                this.kafkaServerUrl = settings.getProperty("kafka-server-url");
                this.zkServerUrl = settings.getProperty("zk-server-url");
            } else {
                String hostname = InetAddress.getLocalHost().getHostName();
                this.zkLocalPort = this.getNextPort();
                this.kafkaLocalPort = this.getNextPort();
                this.kafkaLocalSslPort = this.getNextPort();
                this.kafkaServerUrl = hostname + ":" + this.kafkaLocalPort;
                this.kafkaServerSslUrl = hostname + ":" + this.kafkaLocalSslPort;
                this.zkServerUrl = hostname + ":" + this.zkLocalPort;
            }
            this.clientProps = this.createClientProperties();
        }
        catch (Exception e) {
            logger.error("Unexpected error", (Throwable)e);
            throw new RuntimeException("Unexpected error", e);
        }
    }

    private boolean startEmbeddedKafkaServer() {
        Properties kafkaProperties = new Properties();
        Properties zkProperties = new Properties();
        logger.info("Starting kafka server.");
        try {
            zkProperties.load(Class.class.getResourceAsStream("/zookeeper.properties"));
            zkProperties.setProperty("clientPort", Integer.toString(this.zkLocalPort));
            new ZooKeeperLocal(zkProperties);
            logger.info("ZooKeeper instance is successfully started on port " + this.zkLocalPort);
            kafkaProperties.load(Class.class.getResourceAsStream("/kafka-server.properties"));
            kafkaProperties.setProperty("zookeeper.connect", this.getZkUrl());
            kafkaProperties.put("listeners", String.format("PLAINTEXT://%s,SSL://%s", this.getKafkaServerUrl(), this.getKafkaServerSslUrl()));
            kafkaProperties.put("ssl.truststore.location", "src/test/resources/truststorefile.jks");
            kafkaProperties.put("ssl.truststore.password", "password");
            kafkaProperties.put("ssl.keystore.location", "src/test/resources/keystorefile.jks");
            kafkaProperties.put("ssl.keystore.password", "password");
            this.kafkaServer = new KafkaLocal(kafkaProperties);
            this.kafkaServer.start();
            logger.info("Kafka Server is successfully started on port " + this.kafkaLocalPort);
            return true;
        }
        catch (Exception e) {
            logger.error("Error starting the Kafka Server.", (Throwable)e);
            return false;
        }
    }

    private AdminClient getAdminClient() {
        if (this.adminClient == null) {
            Properties adminClientProps = this.createAdminClientProperties();
            this.adminClient = AdminClient.create((Properties)adminClientProps);
        }
        return this.adminClient;
    }

    private Properties createClientProperties() {
        Properties props = this.createAdminClientProperties();
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("consumer.timeout.ms", "10000");
        props.put("max.poll.interval.ms", "10000");
        return props;
    }

    private Properties createAdminClientProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.getKafkaServerUrl());
        props.put("group.id", "group_1");
        return props;
    }

    public void initTopicList(List<String> topics) {
        this.consumer = new KafkaConsumer(this.clientProps);
        this.consumer.subscribe(topics);
    }

    public void createTopics(List<String> topicNames, int numPartitions) {
        ArrayList<NewTopic> newTopics = new ArrayList<NewTopic>();
        for (String topicName : topicNames) {
            NewTopic newTopic = new NewTopic(topicName, numPartitions, 1);
            newTopics.add(newTopic);
        }
        CreateTopicsResult result = this.getAdminClient().createTopics(newTopics);
        Exception throwable = null;
        for (int i = 0; i < 10; ++i) {
            try {
                result.all().get(1L, TimeUnit.SECONDS);
                throwable = null;
                break;
            }
            catch (Exception e) {
                throwable = e;
                continue;
            }
        }
        if (throwable != null) {
            throw new RuntimeException("Error getting topic info", throwable);
        }
    }

    public void deleteTopic(String topicName) {
        this.getAdminClient().deleteTopics(Collections.singletonList(topicName));
    }

    public ConsumerRecords<String, String> getNextMessageFromConsumer(String topic) {
        return this.consumer.poll(Duration.ofMillis(1000L));
    }

    public void prepare() {
        if (this.externalServers) {
            return;
        }
        boolean startStatus = this.startEmbeddedKafkaServer();
        if (!startStatus) {
            throw new RuntimeException("Error starting the server!");
        }
        try {
            Thread.sleep(3000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        logger.info("Completed the prepare phase.");
    }

    public void tearDown() {
        logger.info("Shutting down the Kafka Consumer.");
        if (this.consumer != null) {
            this.consumer.close();
        }
        if (this.adminClient != null) {
            this.adminClient.close();
            this.adminClient = null;
        }
        try {
            Thread.sleep(3000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (this.kafkaServer != null) {
            logger.info("Shutting down the kafka Server.");
            this.kafkaServer.stop();
        }
        logger.info("Completed the tearDown phase.");
    }

    private synchronized int getNextPort() throws IOException {
        try (ServerSocket socket = new ServerSocket(0);){
            int n = socket.getLocalPort();
            return n;
        }
    }

    public String getZkUrl() {
        return this.zkServerUrl;
    }

    public String getKafkaServerUrl() {
        return this.kafkaServerUrl;
    }

    public String getKafkaServerSslUrl() {
        return this.kafkaServerSslUrl;
    }
}

