/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.connect.kafka;

import com.couchbase.client.core.logging.LogRedaction;
import com.couchbase.client.core.logging.RedactionLevel;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.core.util.CbStrings;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.connect.kafka.KafkaCouchbaseClient;
import com.couchbase.connect.kafka.config.common.ConnectionConfig;
import com.couchbase.connect.kafka.config.sink.CouchbaseSinkConfig;
import com.couchbase.connect.kafka.config.sink.SinkBehaviorConfig;
import com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler;
import com.couchbase.connect.kafka.handler.sink.SinkAction;
import com.couchbase.connect.kafka.handler.sink.SinkDocument;
import com.couchbase.connect.kafka.handler.sink.SinkHandler;
import com.couchbase.connect.kafka.handler.sink.SinkHandlerContext;
import com.couchbase.connect.kafka.handler.sink.SinkHandlerParams;
import com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler;
import com.couchbase.connect.kafka.util.BatchBuilder;
import com.couchbase.connect.kafka.util.DocumentIdExtractor;
import com.couchbase.connect.kafka.util.DocumentPathExtractor;
import com.couchbase.connect.kafka.util.DurabilitySetter;
import com.couchbase.connect.kafka.util.KafkaRetryHelper;
import com.couchbase.connect.kafka.util.Keyspace;
import com.couchbase.connect.kafka.util.TopicMap;
import com.couchbase.connect.kafka.util.Version;
import com.couchbase.connect.kafka.util.config.ConfigHelper;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class CouchbaseSinkTask
extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSinkTask.class);
    private Keyspace defaultDestCollection;
    private Map<String, Keyspace> topicToCollection;
    private KafkaCouchbaseClient client;
    private JsonConverter converter;
    private DocumentIdExtractor documentIdExtractor;
    private SinkHandler sinkHandler;
    private boolean sinkHandlerUsesKvConnections;
    private KafkaRetryHelper retryHelper;
    private DurabilitySetter durabilitySetter;
    private Optional<Duration> documentExpiry;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> properties) {
        CouchbaseSinkConfig config;
        try {
            config = ConfigHelper.parse(CouchbaseSinkConfig.class, properties);
        }
        catch (ConfigException e) {
            throw new ConnectException("Couldn't start CouchbaseSinkTask due to configuration error", (Throwable)e);
        }
        HashMap<String, String> clusterEnvProperties = new HashMap<String, String>();
        properties.forEach((key, value) -> {
            if (key.startsWith("couchbase.env.") && !CbStrings.isNullOrEmpty((String)value)) {
                clusterEnvProperties.put(CbStrings.removeStart((String)key, (String)"couchbase.env."), (String)value);
            }
        });
        LOGGER.info("Custom ClusterEnvironment properties: {}", clusterEnvProperties);
        LogRedaction.setRedactionLevel((RedactionLevel)config.logRedaction());
        this.client = new KafkaCouchbaseClient(config, clusterEnvProperties);
        this.defaultDestCollection = Keyspace.parse(config.defaultCollection(), config.bucket());
        this.topicToCollection = TopicMap.parseTopicToCollection(config.topicToCollection(), config.bucket());
        this.converter = new JsonConverter();
        this.converter.configure(CbCollections.mapOf((Object)"schemas.enable", (Object)false), false);
        String docIdPointer = config.documentId();
        if (docIdPointer != null && !docIdPointer.isEmpty()) {
            this.documentIdExtractor = new DocumentIdExtractor(docIdPointer, config.removeDocumentId());
        }
        Class<? extends SinkHandler> sinkHandlerClass = config.sinkHandler();
        SinkBehaviorConfig.DocumentMode documentMode = config.documentMode();
        if (documentMode != SinkBehaviorConfig.DocumentMode.DOCUMENT) {
            sinkHandlerClass = documentMode == SinkBehaviorConfig.DocumentMode.N1QL ? N1qlSinkHandler.class : SubDocumentSinkHandler.class;
            LOGGER.warn("Forcing sink handler to {} because document mode is {}. The `couchbase.document.mode` config property is deprecated; please use `couchbase.sink.handler` instead.", sinkHandlerClass, (Object)documentMode);
        }
        this.sinkHandler = (SinkHandler)Utils.newInstance(sinkHandlerClass);
        this.sinkHandler.init(new SinkHandlerContext(this.client.cluster().reactive(), Collections.unmodifiableMap(properties)));
        this.sinkHandlerUsesKvConnections = this.sinkHandler.usesKvCollections();
        if (this.sinkHandlerUsesKvConnections && config.bucket().isEmpty()) {
            String propertyName = ConfigHelper.keyName(CouchbaseSinkConfig.class, ConnectionConfig::bucket);
            throw new ConfigException("Missing required config property: " + propertyName);
        }
        LOGGER.info("Using sink handler: {}", (Object)this.sinkHandler);
        this.durabilitySetter = DurabilitySetter.create(config);
        this.documentExpiry = config.documentExpiration().isZero() ? Optional.empty() : Optional.of(config.documentExpiration());
        this.retryHelper = new KafkaRetryHelper("CouchbaseSinkTask.put()", config.retryTimeout());
        if (this.usingLongKvTimeouts()) {
            String retryTimeoutName = ConfigHelper.keyName(CouchbaseSinkConfig.class, SinkBehaviorConfig::retryTimeout);
            LOGGER.warn("The specified KV timeout is very long, and might cause problems for the Kafka consumer session.  Consider using the '" + retryTimeoutName + "' config property instead of setting a long KV timeout. The retry timeout handles more kinds of write failures and can safely be set to a duration longer than Kafka consumer session timeout.");
        }
    }

    private boolean usingLongKvTimeouts() {
        Duration actualKvTimeout = this.client.cluster().environment().timeoutConfig().kvTimeout();
        Duration actualKvDurableTimeout = this.client.cluster().environment().timeoutConfig().kvDurableTimeout();
        Duration threshold = Duration.ofSeconds(20L);
        return actualKvTimeout.compareTo(threshold) > 0 || actualKvDurableTimeout.compareTo(threshold) > 0;
    }

    public void put(Collection<SinkRecord> records) {
        this.retryHelper.runWithRetry(() -> this.doPut(records));
    }

    private void doPut(Collection<SinkRecord> records) {
        if (records.isEmpty()) {
            return;
        }
        SinkRecord first = records.iterator().next();
        int recordsCount = records.size();
        LOGGER.trace("Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the Couchbase...", new Object[]{recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset()});
        ArrayList<SinkHandlerParams> paramsList = new ArrayList<SinkHandlerParams>();
        for (SinkRecord record : records) {
            Keyspace destCollectionSpec = this.topicToCollection.getOrDefault(record.topic(), this.defaultDestCollection);
            ReactiveCollection destCollection = this.sinkHandlerUsesKvConnections ? this.client.collection(destCollectionSpec).reactive() : null;
            SinkHandlerParams params = new SinkHandlerParams(this.client.cluster().reactive(), destCollection, destCollectionSpec, record, this.toSinkDocument(record), this.documentExpiry, this.durabilitySetter);
            paramsList.add(params);
        }
        List<SinkAction> actions = this.sinkHandler.handleBatch(paramsList);
        CouchbaseSinkTask.execute(actions);
    }

    private static void execute(List<SinkAction> actions) {
        Duration timeout = Duration.ofMinutes(10L);
        CouchbaseSinkTask.toMono(actions).block(timeout);
    }

    static Mono<Void> toMono(List<SinkAction> actions) {
        BatchBuilder<Mono<Void>> batchBuilder = new BatchBuilder<Mono<Void>>();
        for (SinkAction action : actions) {
            batchBuilder.add(action.action(), action.concurrencyHint());
        }
        Stream<Mono> batches = batchBuilder.build().stream().map(batch -> Flux.fromIterable((Iterable)batch).flatMap(it -> it).then());
        return Flux.fromStream(batches).concatMap(it -> it).then();
    }

    private SinkDocument toSinkDocument(SinkRecord record) {
        if (record.value() == null) {
            return null;
        }
        byte[] valueAsJsonBytes = this.converter.fromConnectData(record.topic(), record.valueSchema(), record.value());
        try {
            if (this.documentIdExtractor != null) {
                return this.documentIdExtractor.extractDocumentId(valueAsJsonBytes);
            }
        }
        catch (DocumentPathExtractor.DocumentPathNotFoundException e) {
            LOGGER.warn(e.getMessage() + "; letting sink handler use fallback ID");
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return new SinkDocument(null, valueAsJsonBytes);
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    public void stop() {
        if (this.retryHelper != null) {
            this.retryHelper.close();
            this.retryHelper = null;
        }
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
    }
}

