/*
 * Decompiled with CFR 0.152.
 */
package com.github.charithe.kafka;

import com.github.charithe.kafka.EphemeralKafkaBroker;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.server.KafkaConfig;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;

public class EphemeralKafkaCluster
implements AutoCloseable {
    private static final int ALLOCATE_RANDOM_PORT = -1;
    private int numBroker;
    private TestingServer zookeeper;
    private final List<EphemeralKafkaBroker> brokers = new ArrayList<EphemeralKafkaBroker>();

    private EphemeralKafkaCluster(int numBroker, int zookeeperPort, Properties brokerProperties) throws Exception {
        this.zookeeper = new TestingServer(zookeeperPort);
        this.numBroker = numBroker;
        for (int i = 0; i < numBroker; ++i) {
            this.addBroker(brokerProperties);
        }
    }

    public static EphemeralKafkaCluster create(int numBroker) throws Exception {
        return EphemeralKafkaCluster.create(numBroker, -1);
    }

    public static EphemeralKafkaCluster create(int numBroker, int zookeeperPort) throws Exception {
        return EphemeralKafkaCluster.create(numBroker, zookeeperPort, new Properties());
    }

    public static EphemeralKafkaCluster create(int numBroker, int zookeeperPort, Properties brokerProperties) throws Exception {
        return new EphemeralKafkaCluster(numBroker, zookeeperPort, brokerProperties);
    }

    public boolean isHealthy() {
        return this.brokers.stream().filter(b -> !b.isRunning()).count() == 0L;
    }

    public boolean isRunning() {
        return this.brokers.stream().filter(EphemeralKafkaBroker::isRunning).count() > 0L;
    }

    public void stop() throws IOException, ExecutionException, InterruptedException {
        CompletableFuture.allOf((CompletableFuture[])this.brokers.stream().map(EphemeralKafkaBroker::stopBrokerAsync).toArray(CompletableFuture[]::new)).get();
        this.brokers.clear();
        this.zookeeper.stop();
    }

    private EphemeralKafkaBroker addBroker(Properties overrideBrokerProperties) throws Exception {
        int brokerPort = InstanceSpec.getRandomPort();
        Properties brokerConfigProperties = new Properties();
        brokerConfigProperties.setProperty(KafkaConfig.BrokerIdProp(), this.brokers.size() + "");
        brokerConfigProperties.setProperty(KafkaConfig.ZkConnectProp(), this.zookeeper.getConnectString());
        brokerConfigProperties.setProperty(KafkaConfig.ControlledShutdownEnableProp(), "false");
        brokerConfigProperties.setProperty(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
        brokerConfigProperties.setProperty(KafkaConfig.DeleteTopicEnableProp(), "true");
        brokerConfigProperties.setProperty(KafkaConfig.SslEnabledProtocolsProp(), "false");
        brokerConfigProperties.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), "true");
        brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "300");
        brokerConfigProperties.setProperty(KafkaConfig.ReplicaFetchWaitMaxMsProp(), "100");
        brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "10");
        brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), this.numBroker + "");
        brokerConfigProperties.setProperty(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), "1");
        brokerConfigProperties.setProperty(KafkaConfig.ZkSessionTimeoutMsProp(), "200");
        brokerConfigProperties.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsDoc(), "200");
        brokerConfigProperties.setProperty(KafkaConfig.AdvertisedListenersProp(), "PLAINTEXT://localhost:" + brokerPort);
        brokerConfigProperties.setProperty(KafkaConfig.MinInSyncReplicasProp(), Math.max(1, this.numBroker - 1) + "");
        if (!overrideBrokerProperties.isEmpty()) {
            brokerConfigProperties.putAll((Map<?, ?>)overrideBrokerProperties);
        }
        EphemeralKafkaBroker broker = new EphemeralKafkaBroker(this.zookeeper, brokerPort, brokerConfigProperties);
        broker.start().get();
        this.brokers.add(broker);
        return broker;
    }

    public List<EphemeralKafkaBroker> getBrokers() {
        return Collections.unmodifiableList(this.brokers);
    }

    public String connectionString() {
        return this.brokers.stream().filter(EphemeralKafkaBroker::isRunning).map(EphemeralKafkaBroker::getBrokerList).map(Optional::get).collect(Collectors.joining(","));
    }

    public Properties producerConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.connectionString());
        props.put("acks", "all");
        props.put("batch.size", "100");
        props.put("client.id", "kafka-junit");
        props.put("request.timeout.ms", "5000");
        props.put("max.in.flight.requests.per.connection", "1");
        props.put("retries", (Object)Integer.MAX_VALUE);
        props.put("linger.ms", (Object)0);
        return props;
    }

    public Properties consumerConfig() {
        return this.consumerConfig(true);
    }

    public Properties consumerConfig(boolean enableAutoCommit) {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.connectionString());
        props.put("group.id", "kafka-junit-consumer");
        props.put("enable.auto.commit", String.valueOf(enableAutoCommit));
        props.put("auto.commit.interval.ms", "100");
        props.put("auto.offset.reset", "earliest");
        props.put("heartbeat.interval.ms", "100");
        props.put("session.timeout.ms", "200");
        props.put("fetch.max.wait.ms", "500");
        props.put("metadata.max.age.ms", "100");
        return props;
    }

    public void createTopics(String ... topics) throws ExecutionException, InterruptedException {
        HashMap<String, String> adminConfigs = new HashMap<String, String>();
        adminConfigs.put("bootstrap.servers", this.connectionString());
        try (AdminClient admin = AdminClient.create(adminConfigs);){
            List newTopics = Stream.of(topics).map(t -> new NewTopic(t, this.numBroker, (short)this.numBroker)).collect(Collectors.toList());
            CreateTopicsResult createTopics = admin.createTopics(newTopics);
            createTopics.all().get();
        }
    }

    @Override
    public void close() throws Exception {
        this.stop();
    }
}

