/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.connect.kafka;

import com.couchbase.client.core.logging.LogRedaction;
import com.couchbase.client.core.util.CbStrings;
import com.couchbase.client.dcp.core.logging.RedactionLevel;
import com.couchbase.client.dcp.highlevel.DocumentChange;
import com.couchbase.client.dcp.util.PartitionSet;
import com.couchbase.connect.kafka.CouchbaseReader;
import com.couchbase.connect.kafka.SourceDocumentLifecycle;
import com.couchbase.connect.kafka.SourceOffset;
import com.couchbase.connect.kafka.SourceTaskLifecycle;
import com.couchbase.connect.kafka.config.source.CouchbaseSourceTaskConfig;
import com.couchbase.connect.kafka.filter.Filter;
import com.couchbase.connect.kafka.handler.source.CollectionMetadata;
import com.couchbase.connect.kafka.handler.source.CouchbaseSourceRecord;
import com.couchbase.connect.kafka.handler.source.DocumentEvent;
import com.couchbase.connect.kafka.handler.source.SourceHandler;
import com.couchbase.connect.kafka.handler.source.SourceHandlerParams;
import com.couchbase.connect.kafka.handler.source.SourceRecordBuilder;
import com.couchbase.connect.kafka.util.ScopeAndCollection;
import com.couchbase.connect.kafka.util.TopicMap;
import com.couchbase.connect.kafka.util.Version;
import com.couchbase.connect.kafka.util.config.ConfigHelper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CouchbaseSourceTask
extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSourceTask.class);
    private static final long STOP_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10L);
    private String connectorName;
    private CouchbaseReader couchbaseReader;
    private BlockingQueue<DocumentChange> queue;
    private BlockingQueue<Throwable> errorQueue;
    private String defaultTopicTemplate;
    private Map<ScopeAndCollection, String> collectionToTopic;
    private String bucket;
    private Filter filter;
    private SourceHandler sourceHandler;
    private int batchSizeMax;
    private boolean connectorNameInOffsets;
    private boolean noValue;
    private SourceDocumentLifecycle lifecycle;
    private Optional<String> blackHoleTopic;
    private final SourceTaskLifecycle taskLifecycle = new SourceTaskLifecycle();

    public String version() {
        return Version.getVersion();
    }

    public void initialize(SourceTaskContext context) {
        super.initialize(context);
        this.taskLifecycle.logTaskInitialized((String)context.configs().get("name"));
    }

    public void commit() throws InterruptedException {
        super.commit();
        this.taskLifecycle.logOffsetCommitHook();
    }

    public void start(Map<String, String> properties) {
        CouchbaseSourceTaskConfig config;
        this.connectorName = properties.get("name");
        try {
            config = ConfigHelper.parse(CouchbaseSourceTaskConfig.class, properties);
            if (CbStrings.isNullOrEmpty((String)this.connectorName)) {
                throw new ConfigException("Connector must have a non-blank 'name' config property.");
            }
        }
        catch (ConfigException e) {
            throw new ConnectException("Couldn't start CouchbaseSourceTask due to configuration error", (Throwable)e);
        }
        LogRedaction.setRedactionLevel((com.couchbase.client.core.logging.RedactionLevel)config.logRedaction());
        RedactionLevel.set((RedactionLevel)this.toDcp(config.logRedaction()));
        Map<String, String> unmodifiableProperties = Collections.unmodifiableMap(properties);
        this.lifecycle = SourceDocumentLifecycle.create(config);
        this.filter = (Filter)Utils.newInstance(config.eventFilter());
        this.filter.init(unmodifiableProperties);
        this.sourceHandler = (SourceHandler)Utils.newInstance(config.sourceHandler());
        this.sourceHandler.init(unmodifiableProperties);
        this.blackHoleTopic = Optional.ofNullable(CbStrings.emptyToNull((String)config.blackHoleTopic().trim()));
        this.defaultTopicTemplate = config.topic();
        this.collectionToTopic = TopicMap.parseCollectionToTopic(config.collectionToTopic());
        this.bucket = config.bucket();
        this.connectorNameInOffsets = config.connectorNameInOffsets();
        this.batchSizeMax = config.batchSizeMax();
        this.noValue = config.noValue();
        PartitionSet partitionSet = PartitionSet.parse((String)config.partitions());
        this.taskLifecycle.logTaskStarted(this.connectorName, partitionSet);
        List partitions = partitionSet.toList();
        Map<Integer, SourceOffset> partitionToSavedSeqno = this.readSourceOffsets(partitions);
        HashSet partitionsWithoutSavedOffsets = new HashSet(partitions);
        partitionsWithoutSavedOffsets.removeAll(partitionToSavedSeqno.keySet());
        this.taskLifecycle.logSourceOffsetsRead(partitionToSavedSeqno, PartitionSet.from(partitionsWithoutSavedOffsets));
        this.queue = new LinkedBlockingQueue<DocumentChange>();
        this.errorQueue = new LinkedBlockingQueue<Throwable>(1);
        this.couchbaseReader = new CouchbaseReader(config, this.connectorName, this.queue, this.errorQueue, partitions, partitionToSavedSeqno, this.lifecycle);
        this.couchbaseReader.start();
    }

    private RedactionLevel toDcp(com.couchbase.client.core.logging.RedactionLevel level) {
        switch (level) {
            case FULL: {
                return RedactionLevel.FULL;
            }
            case NONE: {
                return RedactionLevel.NONE;
            }
            case PARTIAL: {
                return RedactionLevel.PARTIAL;
            }
        }
        throw new IllegalArgumentException("Unrecognized redaction level: " + level);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<SourceRecord> poll() throws InterruptedException {
        this.checkErrorQueue();
        DocumentChange firstEvent = this.queue.poll(1L, TimeUnit.SECONDS);
        if (firstEvent == null) {
            LOGGER.debug("Poll returns 0 results");
            return null;
        }
        ArrayList<DocumentChange> events = new ArrayList<DocumentChange>();
        try {
            events.add(firstEvent);
            this.queue.drainTo(events, this.batchSizeMax - 1);
            ConversionResult results = this.convertToSourceRecords(events);
            LOGGER.info("Poll returns {} result(s) (filtered out {})", (Object)results.published, (Object)results.dropped);
            List<SourceRecord> list = results.records;
            return list;
        }
        finally {
            events.forEach(DocumentChange::flowControlAck);
        }
    }

    private boolean isSourceOffsetUpdate(SourceRecord record) {
        return this.blackHoleTopic.isPresent() && this.blackHoleTopic.get().equals(record.topic());
    }

    public void commitRecord(SourceRecord record, RecordMetadata metadata) {
        if (record instanceof CouchbaseSourceRecord) {
            CouchbaseSourceRecord couchbaseRecord = (CouchbaseSourceRecord)record;
            if (this.isSourceOffsetUpdate(couchbaseRecord)) {
                this.lifecycle.logSourceOffsetUpdateCommittedToBlackHoleTopic(couchbaseRecord, metadata);
            } else {
                this.lifecycle.logCommittedToKafkaTopic(couchbaseRecord, metadata);
            }
        } else {
            LOGGER.warn("Committed a record we didn't create? Record key {}", record.key());
        }
    }

    private void checkErrorQueue() throws ConnectException {
        Throwable fatalError = (Throwable)this.errorQueue.poll();
        if (fatalError != null) {
            throw new ConnectException(fatalError);
        }
    }

    private String getDefaultTopic(DocumentEvent docEvent) {
        CollectionMetadata collectionMetadata = docEvent.collectionMetadata();
        return this.defaultTopicTemplate.replace("${bucket}", this.bucket).replace("${scope}", collectionMetadata.scopeName()).replace("${collection}", collectionMetadata.collectionName()).replace("%", "_");
    }

    private ConversionResult convertToSourceRecords(List<DocumentChange> events) {
        ArrayList<SourceRecord> results = new ArrayList<SourceRecord>(events.size());
        int dropped = 0;
        for (DocumentChange e : events) {
            DocumentEvent docEvent = DocumentEvent.create(e, this.bucket);
            if (!this.filter.pass(docEvent)) {
                this.lifecycle.logSkippedBecauseFilterSaysIgnore(e);
                ++dropped;
                this.blackHoleTopic.ifPresent(topic -> results.add(this.createSourceOffsetUpdateRecord((String)topic, e, docEvent)));
                continue;
            }
            CouchbaseSourceRecord sourceRecord = this.convertToSourceRecord(e, docEvent);
            if (sourceRecord == null) {
                this.lifecycle.logSkippedBecauseHandlerSaysIgnore(e);
                ++dropped;
                this.blackHoleTopic.ifPresent(topic -> results.add(this.createSourceOffsetUpdateRecord((String)topic, e, docEvent)));
                continue;
            }
            this.lifecycle.logConvertedToKafkaRecord(e, sourceRecord);
            results.add(sourceRecord);
        }
        int published = results.size();
        if (this.blackHoleTopic.isPresent()) {
            published -= dropped;
        }
        return new ConversionResult(results, published, dropped);
    }

    private SourceRecord createSourceOffsetUpdateRecord(String topic, DocumentChange change, DocumentEvent docEvent) {
        return new SourceRecordBuilder().key("ignored-" + change.getVbucket()).build(change, this.sourcePartition(docEvent.partition()), CouchbaseSourceTask.sourceOffset(change), topic);
    }

    private CouchbaseSourceRecord convertToSourceRecord(DocumentChange change, DocumentEvent docEvent) {
        String topic = this.collectionToTopic.getOrDefault(CouchbaseSourceTask.scopeAndCollection(docEvent), this.getDefaultTopic(docEvent));
        SourceRecordBuilder builder = this.sourceHandler.handle(new SourceHandlerParams(docEvent, topic, this.noValue));
        if (builder == null) {
            return null;
        }
        return builder.build(change, this.sourcePartition(docEvent.partition()), CouchbaseSourceTask.sourceOffset(change), topic);
    }

    public void stop() {
        this.taskLifecycle.logTaskStopped();
        if (this.couchbaseReader != null) {
            this.couchbaseReader.shutdown();
            try {
                this.couchbaseReader.join(STOP_TIMEOUT_MILLIS);
                if (this.couchbaseReader.isAlive()) {
                    LOGGER.error("Reader thread is still alive after shutdown request.");
                }
            }
            catch (InterruptedException e) {
                LOGGER.error("Interrupted while joining reader thread.", (Throwable)e);
            }
        }
    }

    private Map<Integer, SourceOffset> readSourceOffsets(Collection<Integer> partitions) {
        Map offsets = this.context.offsetStorageReader().offsets(this.sourcePartitions(partitions));
        LOGGER.debug("Raw source offsets: {}", (Object)offsets);
        HashSet<Integer> missingPartitions = new HashSet<Integer>(partitions);
        TreeMap<Integer, SourceOffset> partitionToSourceOffset = new TreeMap<Integer, SourceOffset>();
        offsets.forEach((partitionIdentifier, offset) -> {
            int partition = Integer.parseInt((String)partitionIdentifier.get("partition"));
            missingPartitions.remove(partition);
            if (offset != null) {
                partitionToSourceOffset.put(partition, SourceOffset.fromMap(offset));
            }
        });
        if (!missingPartitions.isEmpty()) {
            LOGGER.error("Offset storage reader returned no information about these partitions: {}", (Object)PartitionSet.from(missingPartitions));
        }
        return partitionToSourceOffset;
    }

    private List<Map<String, Object>> sourcePartitions(Collection<Integer> partitions) {
        ArrayList<Map<String, Object>> sourcePartitions = new ArrayList<Map<String, Object>>();
        for (Integer partition : partitions) {
            sourcePartitions.add(this.sourcePartition(partition));
        }
        return sourcePartitions;
    }

    private Map<String, Object> sourcePartition(int partition) {
        HashMap<String, Object> sourcePartition = new HashMap<String, Object>(3);
        sourcePartition.put("bucket", this.bucket);
        sourcePartition.put("partition", String.valueOf(partition));
        if (this.connectorNameInOffsets) {
            sourcePartition.put("connector", this.connectorName);
        }
        return sourcePartition;
    }

    private static Map<String, Object> sourceOffset(DocumentChange change) {
        return new SourceOffset(change.getOffset()).toMap();
    }

    private static ScopeAndCollection scopeAndCollection(DocumentEvent docEvent) {
        CollectionMetadata md = docEvent.collectionMetadata();
        return new ScopeAndCollection(md.scopeName(), md.collectionName());
    }

    private static class ConversionResult {
        public final List<SourceRecord> records;
        public final int published;
        public final int dropped;

        public ConversionResult(List<SourceRecord> records, int published, int dropped) {
            this.records = records;
            this.published = published;
            this.dropped = dropped;
        }
    }
}

