/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.connect.kafka;

import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.env.NetworkResolution;
import com.couchbase.client.core.env.SeedNode;
import com.couchbase.client.core.util.ConnectionString;
import com.couchbase.client.dcp.config.HostAndPort;
import com.couchbase.client.dcp.util.PartitionSet;
import com.couchbase.client.java.Bucket;
import com.couchbase.connect.kafka.ConnectorLifecycle;
import com.couchbase.connect.kafka.CouchbaseSourceTask;
import com.couchbase.connect.kafka.KafkaCouchbaseClient;
import com.couchbase.connect.kafka.config.source.CouchbaseSourceConfig;
import com.couchbase.connect.kafka.config.source.CouchbaseSourceTaskConfig;
import com.couchbase.connect.kafka.util.CouchbaseHelper;
import com.couchbase.connect.kafka.util.ListHelper;
import com.couchbase.connect.kafka.util.SeedNodeHelper;
import com.couchbase.connect.kafka.util.Version;
import com.couchbase.connect.kafka.util.config.ConfigHelper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CouchbaseSourceConnector
extends SourceConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSourceConnector.class);
    private Map<String, String> configProperties;
    private CouchbaseBucketConfig bucketConfig;
    private Set<SeedNode> seedNodes;
    private final ConnectorLifecycle lifecycle = new ConnectorLifecycle();

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> properties) {
        this.lifecycle.logConnectorStarted(properties.get("name"));
        try {
            this.configProperties = properties;
            CouchbaseSourceConfig config = ConfigHelper.parse(CouchbaseSourceConfig.class, properties);
            try (KafkaCouchbaseClient client = new KafkaCouchbaseClient(config);){
                Bucket bucket = client.bucket();
                if (bucket == null) {
                    throw new ConnectException("Cannot start CouchbaseSourceConnector because bucket name is not present");
                }
                this.bucketConfig = (CouchbaseBucketConfig)CouchbaseHelper.getConfig(bucket, config.bootstrapTimeout());
                ConnectionString connectionString = ConnectionString.create((String)String.join((CharSequence)",", config.seedNodes()));
                NetworkResolution network = NetworkResolution.valueOf((String)config.network());
                this.seedNodes = SeedNodeHelper.getKvNodes(bucket, connectionString, config.enableTls(), network, config.bootstrapTimeout());
            }
        }
        catch (ConfigException e) {
            throw new ConnectException("Cannot start CouchbaseSourceConnector due to configuration error", (Throwable)e);
        }
    }

    public Class<? extends Task> taskClass() {
        return CouchbaseSourceTask.class;
    }

    private List<PartitionSet> splitPartitions(int maxTasks) {
        List partitions = IntStream.range(0, this.bucketConfig.numberOfPartitions()).boxed().collect(Collectors.toList());
        return ListHelper.chunks(partitions, maxTasks).stream().filter(list -> !list.isEmpty()).map(PartitionSet::from).collect(Collectors.toList());
    }

    public List<Map<String, String>> taskConfigs(int maxTasks) {
        String seedNodes = this.seedNodes.stream().map(n -> {
            int port = (Integer)n.kvPort().orElseThrow(() -> new AssertionError((Object)"seed node must have kv port"));
            return new HostAndPort(n.address(), port).format();
        }).collect(Collectors.joining(","));
        List<PartitionSet> partitionsGrouped = this.splitPartitions(maxTasks);
        this.lifecycle.logPartitionsAssigned(partitionsGrouped);
        String partitionsKey = ConfigHelper.keyName(CouchbaseSourceTaskConfig.class, CouchbaseSourceTaskConfig::partitions);
        String dcpSeedNodesKey = ConfigHelper.keyName(CouchbaseSourceTaskConfig.class, CouchbaseSourceTaskConfig::dcpSeedNodes);
        String taskIdKey = ConfigHelper.keyName(CouchbaseSourceTaskConfig.class, CouchbaseSourceTaskConfig::maybeTaskId);
        ArrayList<Map<String, String>> taskConfigs = new ArrayList<Map<String, String>>();
        int taskId = 0;
        for (PartitionSet taskPartitions : partitionsGrouped) {
            String formattedPartitions = taskPartitions.format();
            HashMap<String, String> taskProps = new HashMap<String, String>(this.configProperties);
            taskProps.put(partitionsKey, formattedPartitions);
            taskProps.put(dcpSeedNodesKey, seedNodes);
            taskProps.put(taskIdKey, "maybe-" + taskId++);
            taskConfigs.add(taskProps);
        }
        return taskConfigs;
    }

    public void stop() {
        this.lifecycle.logConnectorStopped();
    }

    public ConfigDef config() {
        return ConfigHelper.define(CouchbaseSourceConfig.class);
    }
}

