/*
 * Decompiled with CFR 0.152.
 */
package io.strimzi.test.container;

import com.groupcdg.pitest.annotations.DoNotMutate;
import io.strimzi.test.container.KafkaVersionService;
import io.strimzi.test.container.StrimziConnectContainer;
import io.strimzi.test.container.StrimziKafkaCluster;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;

public class StrimziConnectCluster {
    private static final String NETWORK_ALIAS_PREFIX = "connect-";
    private static final int CONNECT_PORT = 8083;
    private static final int INTER_WORKER_PORT = 8084;
    private final StrimziKafkaCluster kafkaCluster;
    private final Map<String, String> additionalConnectConfiguration;
    private final String kafkaVersion;
    private final boolean includeFileConnectors;
    private final String groupId;
    private final List<StrimziConnectContainer> workers;

    public StrimziConnectCluster(StrimziConnectClusterBuilder builder) {
        this.kafkaCluster = builder.kafkaCluster;
        this.additionalConnectConfiguration = builder.additionalConnectConfiguration;
        this.kafkaVersion = builder.kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : builder.kafkaVersion;
        this.includeFileConnectors = builder.includeFileConnectors;
        this.groupId = builder.groupId;
        String imageName = KafkaVersionService.strimziTestContainerImageName(this.kafkaVersion);
        this.workers = new ArrayList<StrimziConnectContainer>();
        for (int i = 0; i < builder.workersNum; ++i) {
            String host = NETWORK_ALIAS_PREFIX + i;
            Properties configs = this.buildConfigs(host);
            StrimziConnectContainer worker = (StrimziConnectContainer)((StrimziConnectContainer)((StrimziConnectContainer)((StrimziConnectContainer)((StrimziConnectContainer)new StrimziConnectContainer(imageName, this.kafkaCluster, configs).withNetwork(this.kafkaCluster.getNetwork())).withNetworkAliases(new String[]{host})).withExposedPorts(new Integer[]{8083})).withEnv("LOG_DIR", "/tmp")).waitForRunning().waitingFor((WaitStrategy)Wait.forHttp((String)"/").forStatusCode(200));
            this.workers.add(worker);
        }
    }

    private Properties buildConfigs(String host) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.kafkaCluster.getNetworkBootstrapServers());
        properties.setProperty("group.id", this.groupId);
        properties.setProperty("key.converter", "org.apache.kafka.connect.storage.StringConverter");
        properties.setProperty("value.converter", "org.apache.kafka.connect.storage.StringConverter");
        properties.setProperty("offset.storage.topic", "connect-offsets");
        properties.setProperty("offset.storage.replication.factor", "-1");
        properties.setProperty("config.storage.topic", "connect-configs");
        properties.setProperty("config.storage.replication.factor", "-1");
        properties.setProperty("status.storage.topic", "connect-status");
        properties.setProperty("status.storage.replication.factor", "-1");
        properties.setProperty("listeners", "http://:8083,http://" + host + ":8084");
        properties.putAll(this.additionalConnectConfiguration);
        if (this.includeFileConnectors) {
            String connectFileJar = "/opt/kafka/libs/connect-file-" + this.kafkaVersion + ".jar";
            if (properties.containsKey("plugin.path")) {
                String pluginPath = properties.getProperty("plugin.path");
                properties.setProperty("plugin.path", pluginPath + "," + connectFileJar);
            } else {
                properties.setProperty("plugin.path", connectFileJar);
            }
        }
        return properties;
    }

    @DoNotMutate
    public Collection<GenericContainer<?>> getWorkers() {
        return new ArrayList(this.workers);
    }

    @DoNotMutate
    public void start() {
        for (StrimziConnectContainer worker : this.workers) {
            worker.start();
        }
    }

    @DoNotMutate
    public void stop() {
        this.workers.forEach(GenericContainer::stop);
    }

    @DoNotMutate
    public String getRestEndpoint() {
        for (StrimziConnectContainer worker : this.workers) {
            if (!worker.isRunning()) continue;
            return "http://" + worker.getHost() + ":" + worker.getMappedPort(8083);
        }
        throw new IllegalStateException("No workers are running and healthy");
    }

    String getKafkaVersion() {
        return this.kafkaVersion;
    }

    public static class StrimziConnectClusterBuilder {
        private Map<String, String> additionalConnectConfiguration = new HashMap<String, String>();
        private boolean includeFileConnectors = true;
        private int workersNum = 1;
        private String kafkaVersion;
        private StrimziKafkaCluster kafkaCluster;
        private String groupId;

        public StrimziConnectClusterBuilder withKafkaCluster(StrimziKafkaCluster kafkaCluster) {
            this.kafkaCluster = kafkaCluster;
            return this;
        }

        public StrimziConnectClusterBuilder withNumberOfWorkers(int workersNum) {
            this.workersNum = workersNum;
            return this;
        }

        public StrimziConnectClusterBuilder withAdditionalConnectConfiguration(Map<String, String> additionalConnectConfiguration) {
            this.additionalConnectConfiguration = additionalConnectConfiguration;
            return this;
        }

        public StrimziConnectClusterBuilder withKafkaVersion(String kafkaVersion) {
            this.kafkaVersion = kafkaVersion;
            return this;
        }

        public StrimziConnectClusterBuilder withoutFileConnectors() {
            this.includeFileConnectors = false;
            return this;
        }

        public StrimziConnectClusterBuilder withGroupId(String groupId) {
            this.groupId = groupId;
            return this;
        }

        public StrimziConnectCluster build() {
            if (this.kafkaCluster == null) {
                throw new IllegalArgumentException("A Kafka cluster must be specified");
            }
            if (this.groupId == null) {
                throw new IllegalArgumentException("The Connect cluster group.id configuration must be specified");
            }
            if (this.workersNum <= 0) {
                throw new IllegalArgumentException("The number of workers in the Connect cluster must be greater than 0");
            }
            if (this.additionalConnectConfiguration == null) {
                throw new IllegalArgumentException("The additional configuration must be specified");
            }
            return new StrimziConnectCluster(this);
        }
    }
}

