/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import kafka.common.KafkaException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.NotRunning;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IterableAssert;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import scala.Option;
import scala.collection.Seq;

public class EmbeddedKafkaBroker
implements InitializingBean,
DisposableBean {
    private static final Log logger = LogFactory.getLog(EmbeddedKafkaBroker.class);
    public static final String BEAN_NAME = "embeddedKafka";
    public static final String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
    public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT = "spring.embedded.zookeeper.connect";
    private static final int DEFAULT_ADMIN_TIMEOUT = 30;
    private final int count;
    private final boolean controlledShutdown;
    private final Set<String> topics;
    private final int partitionsPerTopic;
    private final List<KafkaServer> kafkaServers = new ArrayList<KafkaServer>();
    private final Map<String, Object> brokerProperties = new HashMap<String, Object>();
    private EmbeddedZookeeper zookeeper;
    private ZkClient zookeeperClient;
    private String zkConnect;
    private int[] kafkaPorts;
    private int adminTimeout = 30;

    public EmbeddedKafkaBroker(int count) {
        this(count, false, new String[0]);
    }

    public EmbeddedKafkaBroker(int count, boolean controlledShutdown, String ... topics) {
        this(count, controlledShutdown, 2, topics);
    }

    public EmbeddedKafkaBroker(int count, boolean controlledShutdown, int partitions, String ... topics) {
        this.count = count;
        this.kafkaPorts = new int[this.count];
        this.controlledShutdown = controlledShutdown;
        this.topics = topics != null ? new HashSet<String>(Arrays.asList(topics)) : new HashSet<String>();
        this.partitionsPerTopic = partitions;
    }

    public EmbeddedKafkaBroker brokerProperties(Map<String, String> brokerProperties) {
        this.brokerProperties.putAll(brokerProperties);
        return this;
    }

    public EmbeddedKafkaBroker brokerProperty(String property, Object value) {
        this.brokerProperties.put(property, value);
        return this;
    }

    public EmbeddedKafkaBroker kafkaPorts(int ... kafkaPorts) {
        Assert.isTrue((kafkaPorts.length == this.count ? 1 : 0) != 0, (String)("A port must be provided for each instance [" + this.count + "], provided: " + Arrays.toString(kafkaPorts) + ", use 0 for a random port"));
        this.kafkaPorts = kafkaPorts;
        return this;
    }

    public void setAdminTimeout(int adminTimeout) {
        this.adminTimeout = adminTimeout;
    }

    public void afterPropertiesSet() {
        this.zookeeper = new EmbeddedZookeeper();
        int zkConnectionTimeout = 6000;
        int zkSessionTimeout = 6000;
        this.zkConnect = "127.0.0.1:" + this.zookeeper.port();
        this.zookeeperClient = new ZkClient(this.zkConnect, zkSessionTimeout, zkConnectionTimeout, (ZkSerializer)ZKStringSerializer$.MODULE$);
        this.kafkaServers.clear();
        for (int i = 0; i < this.count; ++i) {
            Properties brokerConfigProperties = this.createBrokerProperties(i);
            brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
            brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
            brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
            brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(), String.valueOf(Long.MAX_VALUE));
            if (this.brokerProperties != null) {
                this.brokerProperties.forEach(brokerConfigProperties::put);
            }
            KafkaServer server = TestUtils.createServer((KafkaConfig)new KafkaConfig((Map)brokerConfigProperties), (Time)Time.SYSTEM);
            this.kafkaServers.add(server);
            if (this.kafkaPorts[i] != 0) continue;
            this.kafkaPorts[i] = TestUtils.boundPort((KafkaServer)server, (SecurityProtocol)SecurityProtocol.PLAINTEXT);
        }
        this.createKafkaTopics(this.topics);
        System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, this.getBrokersAsString());
        System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, this.getZookeeperConnectionString());
    }

    private Properties createBrokerProperties(int i) {
        return TestUtils.createBrokerConfig((int)i, (String)this.zkConnect, (boolean)this.controlledShutdown, (boolean)true, (int)this.kafkaPorts[i], (Option)Option.apply(null), (Option)Option.apply(null), (Option)Option.apply(null), (boolean)true, (boolean)false, (int)0, (boolean)false, (int)0, (boolean)false, (int)0, (Option)Option.apply(null), (int)1, (boolean)false);
    }

    public void addTopics(String ... topics) {
        Assert.notNull((Object)this.zookeeper, (String)"Broker must be started before this method can be called");
        HashSet<String> set = new HashSet<String>(Arrays.asList(topics));
        this.createKafkaTopics(set);
        this.topics.addAll(set);
    }

    public void addTopics(NewTopic ... topics) {
        Assert.notNull((Object)this.zookeeper, (String)"Broker must be started before this method can be called");
        for (NewTopic topic : topics) {
            Assert.isTrue((boolean)this.topics.add(topic.name()), () -> "topic already exists: " + topic);
            Assert.isTrue((topic.replicationFactor() <= this.count && (topic.replicasAssignments() == null || topic.replicasAssignments().size() <= this.count) ? 1 : 0) != 0, () -> "Embedded kafka does not support the requested replication factor: " + topic);
        }
        this.doWithAdmin(admin -> this.createTopics((AdminClient)admin, Arrays.asList(topics)));
    }

    private void createKafkaTopics(Set<String> topics) {
        this.doWithAdmin(admin -> this.createTopics((AdminClient)admin, topics.stream().map(t -> new NewTopic(t, this.partitionsPerTopic, (short)this.count)).collect(Collectors.toList())));
    }

    private void createTopics(AdminClient admin, List<NewTopic> newTopics) {
        CreateTopicsResult createTopics = admin.createTopics(newTopics);
        try {
            createTopics.all().get((long)this.adminTimeout, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            throw new KafkaException((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {
        HashMap<String, String> adminConfigs = new HashMap<String, String>();
        adminConfigs.put("bootstrap.servers", this.getBrokersAsString());
        AdminClient admin = null;
        try {
            admin = AdminClient.create(adminConfigs);
            callback.accept(admin);
        }
        finally {
            if (admin != null) {
                admin.close((long)this.adminTimeout, TimeUnit.SECONDS);
            }
        }
    }

    public void destroy() {
        System.getProperties().remove(SPRING_EMBEDDED_KAFKA_BROKERS);
        System.getProperties().remove(SPRING_EMBEDDED_ZOOKEEPER_CONNECT);
        for (KafkaServer kafkaServer : this.kafkaServers) {
            try {
                if (kafkaServer.brokerState().currentState() != NotRunning.state()) {
                    kafkaServer.shutdown();
                    kafkaServer.awaitShutdown();
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                CoreUtils.delete((Seq)kafkaServer.config().logDirs());
            }
            catch (Exception exception) {}
        }
        try {
            this.zookeeperClient.close();
        }
        catch (ZkInterruptedException zkInterruptedException) {
            // empty catch block
        }
        try {
            this.zookeeper.shutdown();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public Set<String> getTopics() {
        return new HashSet<String>(this.topics);
    }

    public List<KafkaServer> getKafkaServers() {
        return this.kafkaServers;
    }

    public KafkaServer getKafkaServer(int id) {
        return this.kafkaServers.get(id);
    }

    public EmbeddedZookeeper getZookeeper() {
        return this.zookeeper;
    }

    public ZkClient getZkClient() {
        return this.zookeeperClient;
    }

    public String getZookeeperConnectionString() {
        return this.zkConnect;
    }

    public BrokerAddress getBrokerAddress(int i) {
        KafkaServer kafkaServer = this.kafkaServers.get(i);
        return new BrokerAddress("127.0.0.1", kafkaServer.config().port());
    }

    public BrokerAddress[] getBrokerAddresses() {
        ArrayList<BrokerAddress> addresses = new ArrayList<BrokerAddress>();
        for (int i = 0; i < this.kafkaPorts.length; ++i) {
            addresses.add(new BrokerAddress("127.0.0.1", this.kafkaPorts[i]));
        }
        return addresses.toArray(new BrokerAddress[addresses.size()]);
    }

    public int getPartitionsPerTopic() {
        return this.partitionsPerTopic;
    }

    public void bounce(BrokerAddress brokerAddress) {
        for (KafkaServer kafkaServer : this.getKafkaServers()) {
            if (!brokerAddress.equals(new BrokerAddress(kafkaServer.config().hostName(), kafkaServer.config().port()))) continue;
            kafkaServer.shutdown();
            kafkaServer.awaitShutdown();
        }
    }

    public void restart(int index) throws Exception {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(10, Collections.singletonMap(Exception.class, true));
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(100L);
        backOffPolicy.setMaxInterval(1000L);
        backOffPolicy.setMultiplier(2.0);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy((RetryPolicy)retryPolicy);
        retryTemplate.setBackOffPolicy((BackOffPolicy)backOffPolicy);
        retryTemplate.execute(context -> {
            this.kafkaServers.get(index).startup();
            return null;
        });
    }

    public String getBrokersAsString() {
        StringBuilder builder = new StringBuilder();
        for (BrokerAddress brokerAddress : this.getBrokerAddresses()) {
            builder.append(brokerAddress.toString()).append(',');
        }
        return builder.substring(0, builder.length() - 1);
    }

    public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer) throws Exception {
        this.consumeFromEmbeddedTopics(consumer, this.topics.toArray(new String[0]));
    }

    public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) throws Exception {
        this.consumeFromEmbeddedTopics(consumer, topic);
    }

    public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String ... topics) throws Exception {
        HashSet<String> diff = new HashSet<String>(Arrays.asList(topics));
        diff.removeAll(new HashSet<String>(this.topics));
        ((IterableAssert)Assertions.assertThat(this.topics).as("topic(s):'" + diff + "' are not in embedded topic list", new Object[0])).containsAll(new HashSet<String>(Arrays.asList(topics)));
        final AtomicBoolean assigned = new AtomicBoolean();
        consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                assigned.set(true);
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("partitions assigned: " + partitions));
                }
            }
        });
        ConsumerRecords records = null;
        int n = 0;
        while (!assigned.get() && n++ < 600) {
            records = consumer.poll(Duration.ofMillis(100L));
        }
        if (records != null && records.count() > 0) {
            ConsumerRecords theRecords = records;
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Records received on initial poll for assignment; re-seeking to beginning; " + records.partitions().stream().flatMap(p -> theRecords.records(p).stream()).map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()).collect(Collectors.toList())));
            }
            consumer.seekToBeginning((Collection)records.partitions());
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)assigned.get()).as("Failed to be assigned partitions from the embedded topics", new Object[0])).isTrue();
        logger.debug((Object)"Subscription Initiated");
    }
}

