/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.connect.kafka;

import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.dcp.util.PartitionSet;
import com.couchbase.client.java.Bucket;
import com.couchbase.connect.kafka.ConnectorLifecycle;
import com.couchbase.connect.kafka.CouchbaseSourceTask;
import com.couchbase.connect.kafka.KafkaCouchbaseClient;
import com.couchbase.connect.kafka.config.source.CouchbaseSourceConfig;
import com.couchbase.connect.kafka.config.source.CouchbaseSourceTaskConfig;
import com.couchbase.connect.kafka.util.CouchbaseHelper;
import com.couchbase.connect.kafka.util.ListHelper;
import com.couchbase.connect.kafka.util.Version;
import com.couchbase.connect.kafka.util.config.ConfigHelper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;

public class CouchbaseSourceConnector
extends SourceConnector {
    private Map<String, String> configProperties;
    private int numPartitions;
    private final ConnectorLifecycle lifecycle = new ConnectorLifecycle();

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> properties) {
        this.lifecycle.logConnectorStarted(properties.get("name"));
        try {
            this.configProperties = properties;
            CouchbaseSourceConfig config = ConfigHelper.parse(CouchbaseSourceConfig.class, properties);
            try (KafkaCouchbaseClient client = new KafkaCouchbaseClient(config);){
                Bucket bucket = client.bucket();
                if (bucket == null) {
                    throw new ConnectException("Cannot start CouchbaseSourceConnector because bucket name is not present");
                }
                this.numPartitions = ((CouchbaseBucketConfig)CouchbaseHelper.getConfig(bucket, config.bootstrapTimeout())).numberOfPartitions();
            }
        }
        catch (ConfigException e) {
            throw new ConnectException("Cannot start CouchbaseSourceConnector due to configuration error", (Throwable)e);
        }
    }

    public Class<? extends Task> taskClass() {
        return CouchbaseSourceTask.class;
    }

    private List<PartitionSet> splitPartitions(int maxTasks) {
        List partitions = IntStream.range(0, this.numPartitions).boxed().collect(Collectors.toList());
        return ListHelper.chunks(partitions, maxTasks).stream().filter(list -> !list.isEmpty()).map(PartitionSet::from).collect(Collectors.toList());
    }

    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<PartitionSet> partitionsGrouped = this.splitPartitions(maxTasks);
        this.lifecycle.logPartitionsAssigned(partitionsGrouped);
        String partitionsKey = ConfigHelper.keyName(CouchbaseSourceTaskConfig.class, CouchbaseSourceTaskConfig::partitions);
        String taskIdKey = ConfigHelper.keyName(CouchbaseSourceTaskConfig.class, CouchbaseSourceTaskConfig::maybeTaskId);
        ArrayList<Map<String, String>> taskConfigs = new ArrayList<Map<String, String>>();
        int taskId = 0;
        for (PartitionSet taskPartitions : partitionsGrouped) {
            String formattedPartitions = taskPartitions.format();
            HashMap<String, String> taskProps = new HashMap<String, String>(this.configProperties);
            taskProps.put(partitionsKey, formattedPartitions);
            taskProps.put(taskIdKey, "maybe-" + taskId++);
            taskConfigs.add(taskProps);
        }
        return taskConfigs;
    }

    public void stop() {
        this.lifecycle.logConnectorStopped();
    }

    public ConfigDef config() {
        return ConfigHelper.define(CouchbaseSourceConfig.class);
    }
}

