/*
 * Decompiled with CFR 0.152.
 */
package net.mguenther.kafka.junit.provider;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.mguenther.kafka.junit.LeaderAndIsr;
import net.mguenther.kafka.junit.TopicConfig;
import net.mguenther.kafka.junit.TopicManager;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultTopicManager
implements TopicManager {
    private static final Logger log = LoggerFactory.getLogger(DefaultTopicManager.class);
    private static final int TIMEOUT_IN_MILLIS = 10000;
    private final Properties props;

    public DefaultTopicManager(String bootstrapServers) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("client.id", "kafka-junit-admin-client");
        this.props = props;
    }

    @Override
    public void createTopic(TopicConfig config) {
        try (AdminClient client = AdminClient.create((Properties)this.props);){
            NewTopic newTopic = TopicAdmin.defineTopic((String)config.getTopic()).partitions(config.getNumberOfPartitions()).replicationFactor((short)config.getNumberOfReplicas()).config(config.getPropertiesMap()).build();
            client.createTopics(Collections.singletonList(newTopic)).all().get(10000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            String message = "Caught an unexpected InterruptedException while trying to create topic '%s'.";
            throw new RuntimeException(String.format("Caught an unexpected InterruptedException while trying to create topic '%s'.", config.getTopic()), e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof TopicExistsException) {
                String message = "The topic '%s' already exists.";
                throw new RuntimeException(String.format("The topic '%s' already exists.", config.getTopic()), e.getCause());
            }
            String message = "Unable to create topic '%s'.";
            throw new RuntimeException(String.format("Unable to create topic '%s'.", config.getTopic()), e.getCause());
        }
        catch (TimeoutException e) {
            String message = "A timeout occurred while trying to create topic '%s'.";
            throw new RuntimeException(String.format("A timeout occurred while trying to create topic '%s'.", config.getTopic()), e);
        }
    }

    @Override
    public void deleteTopic(String topic) {
        try (AdminClient client = AdminClient.create((Properties)this.props);){
            client.deleteTopics(Collections.singletonList(topic)).all().get(10000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            String message = "Caught an unexpected InterruptedException while trying to delete topic '%s'.";
            throw new RuntimeException(String.format("Caught an unexpected InterruptedException while trying to delete topic '%s'.", topic), e);
        }
        catch (ExecutionException e) {
            String message = "Unable to delete the topic '%s'.";
            throw new RuntimeException(String.format("Unable to delete the topic '%s'.", topic), e.getCause());
        }
        catch (TimeoutException e) {
            String message = "A timeout occurred while trying to delete topic '%s'.";
            throw new RuntimeException(String.format("A timeout occurred while trying to delete topic '%s'.", topic), e);
        }
    }

    @Override
    public boolean exists(String topic) {
        boolean exists = false;
        try (AdminClient client = AdminClient.create((Properties)this.props);){
            exists = this.getTopicNames(client).anyMatch(existingTopic -> existingTopic.equals(topic));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            String message = "Caught an unexpected InterruptedException while trying to determine if topic '%s' exists.";
            throw new RuntimeException(String.format("Caught an unexpected InterruptedException while trying to determine if topic '%s' exists.", topic), e);
        }
        catch (ExecutionException e) {
            String message = "Unable to query the state of topic '%s'";
            throw new RuntimeException(String.format("Unable to query the state of topic '%s'", topic), e.getCause());
        }
        catch (TimeoutException e) {
            String message = "A timeout occurred while trying to determine if topic '%s' exists.";
            throw new RuntimeException(String.format("A timeout occurred while trying to determine if topic '%s' exists.", topic), e);
        }
        return exists;
    }

    private Stream<String> getTopicNames(AdminClient client) throws InterruptedException, ExecutionException, TimeoutException {
        ListTopicsOptions options = new ListTopicsOptions();
        options.listInternal(false);
        return ((Collection)client.listTopics(options).listings().get(10000L, TimeUnit.MILLISECONDS)).stream().map(TopicListing::name);
    }

    @Override
    public Map<Integer, LeaderAndIsr> fetchLeaderAndIsr(String topic) {
        HashMap<Integer, LeaderAndIsr> leaderAndIsrByPartition = new HashMap<Integer, LeaderAndIsr>();
        try (AdminClient client = AdminClient.create((Properties)this.props);){
            DescribeTopicsResult result = client.describeTopics(Collections.singletonList(topic));
            TopicDescription topicDescription = (TopicDescription)((Map)result.all().get(10000L, TimeUnit.MILLISECONDS)).get(topic);
            for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) {
                int partition = topicPartitionInfo.partition();
                LeaderAndIsr leaderAndIsr = new LeaderAndIsr(topicPartitionInfo.leader().id(), topicPartitionInfo.isr().stream().map(Node::id).collect(Collectors.toSet()));
                leaderAndIsrByPartition.put(partition, leaderAndIsr);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            String message = "Caught an unexpected InterruptedException while trying to fetch the leader and ISR for topic '%s'.";
            throw new RuntimeException(String.format("Caught an unexpected InterruptedException while trying to fetch the leader and ISR for topic '%s'.", topic), e);
        }
        catch (ExecutionException e) {
            String message = "Unable to fetch the leader and ISR for topic '%s'.";
            throw new RuntimeException(String.format("Unable to fetch the leader and ISR for topic '%s'.", topic), e.getCause());
        }
        catch (TimeoutException e) {
            String message = "A timeout occurred while trying to fetch the leader and ISR for topic '%s'.";
            throw new RuntimeException(String.format("A timeout occurred while trying to fetch the leader and ISR for topic '%s'.", topic), e);
        }
        return Collections.unmodifiableMap(leaderAndIsrByPartition);
    }

    @Override
    public Properties fetchTopicConfig(String topic) {
        Properties topicConfig = new Properties();
        try (AdminClient client = AdminClient.create((Properties)this.props);){
            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
            Map configsByConfigResource = (Map)client.describeConfigs(Collections.singletonList(resource)).all().get(10000L, TimeUnit.MILLISECONDS);
            Config config = (Config)configsByConfigResource.get(resource);
            for (ConfigEntry configEntry : config.entries()) {
                topicConfig.put(configEntry.name(), configEntry.value());
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            String message = "Caught an unexpected InterruptedException while trying to fetch the topic configuration for topic '%s'.";
            throw new RuntimeException(String.format("Caught an unexpected InterruptedException while trying to fetch the topic configuration for topic '%s'.", topic), e);
        }
        catch (ExecutionException e) {
            String message = "Unable to retrieve the topic configuration for topic '%s'.";
            throw new RuntimeException(String.format("Unable to retrieve the topic configuration for topic '%s'.", topic), e.getCause());
        }
        catch (TimeoutException e) {
            String message = "A timeout occurred while trying to fetch the topic configuration for topic '%s'.";
            throw new RuntimeException(String.format("A timeout occurred while trying to fetch the topic configuration for topic '%s'.", topic), e);
        }
        return topicConfig;
    }

    public DefaultTopicManager(Properties props) {
        this.props = props;
    }
}

