/*
 * Decompiled with CFR 0.152.
 */
package net.mguenther.kafka.junit;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import net.mguenther.kafka.junit.EmbeddedConnectConfig;
import net.mguenther.kafka.junit.EmbeddedLifecycle;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedConnect
implements EmbeddedLifecycle {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedConnect.class);
    private static final int REQUEST_TIMEOUT_MS = 120000;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final List<Properties> connectorConfigs;
    private final DistributedConfig config;
    private final KafkaOffsetBackingStore offsetBackingStore;
    private final Worker worker;
    private final StatusBackingStore statusBackingStore;
    private final ConfigBackingStore configBackingStore;
    private final DistributedHerder herder;

    public EmbeddedConnect(EmbeddedConnectConfig connectConfig, String brokerList, String clusterId) {
        Supplier<String> clientIdGenerator = EmbeddedConnect.constantClientIdBase();
        AllConnectorClientConfigOverridePolicy policy = new AllConnectorClientConfigOverridePolicy();
        Properties effectiveWorkerConfig = connectConfig.getConnectProperties();
        effectiveWorkerConfig.put("bootstrap.servers", brokerList);
        this.connectorConfigs = connectConfig.getConnectors();
        this.config = new DistributedConfig(Utils.propsToStringMap((Properties)effectiveWorkerConfig));
        this.offsetBackingStore = new KafkaOffsetBackingStore(EmbeddedConnect.getTopicAdminSupplier(brokerList), clientIdGenerator);
        this.worker = new Worker(connectConfig.getWorkerId(), Time.SYSTEM, new Plugins(new HashMap()), (WorkerConfig)this.config, (OffsetBackingStore)this.offsetBackingStore, (ConnectorClientConfigOverridePolicy)policy);
        this.statusBackingStore = new KafkaStatusBackingStore(Time.SYSTEM, this.worker.getInternalValueConverter(), EmbeddedConnect.getTopicAdminSupplier(brokerList), clientIdGenerator.get());
        this.configBackingStore = new KafkaConfigBackingStore(this.worker.getInternalValueConverter(), this.config, new WorkerConfigTransformer(this.worker, Collections.emptyMap()), EmbeddedConnect.getTopicAdminSupplier(brokerList), clientIdGenerator.get());
        this.herder = new DistributedHerder(this.config, Time.SYSTEM, this.worker, clusterId, this.statusBackingStore, this.configBackingStore, "", new RestClient((WorkerConfig)new DistributedConfig(EmbeddedConnect.toMap(effectiveWorkerConfig))), (ConnectorClientConfigOverridePolicy)policy, new AutoCloseable[0]);
    }

    @Override
    public void start() {
        this.offsetBackingStore.configure((WorkerConfig)this.config);
        this.statusBackingStore.configure((WorkerConfig)this.config);
        try {
            log.info("Embedded Kafka Connect is starting.");
            this.worker.start();
            this.herder.start();
            log.info("Embedded Kafka Connect started.");
            log.info("Found {} connectors to deploy.", (Object)this.connectorConfigs.size());
            this.connectorConfigs.forEach(this::deployConnector);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to start Embedded Kafka Connect.", e);
        }
    }

    private void deployConnector(Properties connectorConfig) {
        FutureCallback callback = new FutureCallback();
        String connectorName = connectorConfig.getProperty("name");
        log.info("Deploying connector {}.", (Object)connectorName);
        this.herder.putConnectorConfig(connectorName, Utils.propsToStringMap((Properties)connectorConfig), true, (Callback)callback);
        try {
            callback.get(120000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to deploy connector {}.", (Object)connectorName, (Object)e);
        }
    }

    @Override
    public void stop() {
        try {
            boolean wasShuttingDown = this.shutdown.getAndSet(true);
            if (!wasShuttingDown) {
                log.info("Embedded Kafka Connect is stopping.");
                this.herder.stop();
                this.worker.stop();
                log.info("Embedded Kafka Connect stopped.");
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to stop Embedded Kafka Connect.", e);
        }
    }

    private static Supplier<TopicAdmin> getTopicAdminSupplier(String brokerList) {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put("bootstrap.servers", brokerList);
        return () -> new TopicAdmin(config);
    }

    private static Supplier<String> constantClientIdBase() {
        String randomSuffix = UUID.randomUUID().toString().substring(0, 7);
        return () -> String.format("kafka-junit-%s", randomSuffix);
    }

    private static Map<String, String> toMap(Properties props) {
        HashMap<String, String> propertyMap = new HashMap<String, String>();
        for (String propertyName : props.stringPropertyNames()) {
            String propertyValue = props.getProperty(propertyName);
            propertyMap.put(propertyName, propertyValue);
        }
        return Collections.unmodifiableMap(propertyMap);
    }
}

