/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.connect.kafka;

import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.env.SecurityConfig;
import com.couchbase.client.core.util.CbStrings;
import com.couchbase.client.dcp.Authenticator;
import com.couchbase.client.dcp.CertificateAuthenticator;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.CredentialsProvider;
import com.couchbase.client.dcp.PasswordAuthenticator;
import com.couchbase.client.dcp.SecurityConfig;
import com.couchbase.client.dcp.StaticCredentialsProvider;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.core.env.NetworkResolution;
import com.couchbase.client.dcp.highlevel.DatabaseChangeListener;
import com.couchbase.client.dcp.highlevel.Deletion;
import com.couchbase.client.dcp.highlevel.DocumentChange;
import com.couchbase.client.dcp.highlevel.Mutation;
import com.couchbase.client.dcp.highlevel.StreamFailure;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.metrics.LogLevel;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.util.PartitionSet;
import com.couchbase.connect.kafka.SourceDocumentLifecycle;
import com.couchbase.connect.kafka.SourceOffset;
import com.couchbase.connect.kafka.StreamFrom;
import com.couchbase.connect.kafka.config.source.CouchbaseSourceTaskConfig;
import com.couchbase.connect.kafka.util.ConnectHelper;
import com.couchbase.connect.kafka.util.JmxHelper;
import com.couchbase.connect.kafka.util.Version;
import io.micrometer.core.instrument.MeterRegistry;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CouchbaseReader
extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseReader.class);
    private final Client client;
    private final List<Integer> partitions;
    private final Map<Integer, SourceOffset> partitionToSavedOffset;
    private final StreamFrom streamFrom;
    private final BlockingQueue<Throwable> errorQueue;
    private final MeterRegistry meterRegistry;

    public CouchbaseReader(CouchbaseSourceTaskConfig config, String connectorName, final BlockingQueue<DocumentChange> queue, final BlockingQueue<Throwable> errorQueue, List<Integer> partitions, Map<Integer, SourceOffset> partitionToSavedOffset, final SourceDocumentLifecycle lifecycle) {
        Objects.requireNonNull(connectorName);
        Objects.requireNonNull(lifecycle);
        Objects.requireNonNull(queue);
        this.partitions = Objects.requireNonNull(partitions);
        this.partitionToSavedOffset = Objects.requireNonNull(partitionToSavedOffset);
        this.streamFrom = config.streamFrom();
        this.errorQueue = Objects.requireNonNull(errorQueue);
        PasswordAuthenticator authenticator = CbStrings.isNullOrEmpty((String)config.clientCertificatePath()) ? new PasswordAuthenticator((CredentialsProvider)new StaticCredentialsProvider(config.username(), config.password().value())) : CertificateAuthenticator.fromKeyStore((Path)Paths.get(config.clientCertificatePath(), new String[0]), (String)config.clientCertificatePassword().value());
        Consumer<SecurityConfig.Builder> securityConfig = security -> {
            security.enableTls(config.enableTls()).enableHostnameVerification(config.enableHostnameVerification());
            if (!CbStrings.isNullOrEmpty((String)config.trustStorePath())) {
                security.trustStore(Paths.get(config.trustStorePath(), new String[0]), config.trustStorePassword().value());
            }
            if (!CbStrings.isNullOrEmpty((String)config.trustCertificatePath())) {
                security.trustCertificate(Paths.get(config.trustCertificatePath(), new String[0]));
            }
            if (CbStrings.isNullOrEmpty((String)config.trustStorePath()) && CbStrings.isNullOrEmpty((String)config.trustCertificatePath())) {
                security.trustCertificates(SecurityConfig.defaultCaCertificates());
            }
        };
        this.meterRegistry = CouchbaseReader.newMeterRegistry(connectorName, config);
        Client.Builder builder = Client.builder().userAgent("kafka-connector", Version.getVersion(), new String[]{connectorName}).bootstrapTimeout(config.bootstrapTimeout()).socketConnectTimeout(config.bootstrapTimeout().toMillis()).seedNodes(config.dcpSeedNodes()).networkResolution(NetworkResolution.valueOf((String)config.network())).bucket(config.bucket()).authenticator((Authenticator)authenticator).collectionsAware(true).scopeName(config.scope()).collectionNames(config.collections()).optionalControlParam("change_streams", (Object)true).noValue(config.noValue()).xattrs(config.xattrs()).compression(config.compression()).mitigateRollbacks(config.persistencePollingInterval().toMillis(), TimeUnit.MILLISECONDS).flowControl(config.flowControlBuffer().getByteCountAsSaturatedInt()).bufferAckWatermark(60).securityConfig(securityConfig).meterRegistry(this.meterRegistry);
        if (config.enableDcpTrace()) {
            Pattern p = Pattern.compile(config.dcpTraceDocumentIdRegex());
            builder.trace(LogLevel.INFO, p.pattern().equals(".*") ? id -> true : id -> p.matcher((CharSequence)id).matches());
        }
        this.client = builder.build();
        this.client.nonBlockingListener(new DatabaseChangeListener(){

            public void onMutation(Mutation mutation) {
                this.onChange((DocumentChange)mutation);
            }

            public void onDeletion(Deletion deletion) {
                this.onChange((DocumentChange)deletion);
            }

            private void onChange(DocumentChange change) {
                try {
                    lifecycle.logReceivedFromCouchbase(change);
                    queue.put(change);
                }
                catch (Throwable t) {
                    change.flowControlAck();
                    LOGGER.error("Unable to put DCP request into the queue", t);
                    errorQueue.offer(t);
                }
            }

            public void onFailure(StreamFailure streamFailure) {
                errorQueue.offer(streamFailure.getCause());
            }
        });
    }

    private static MeterRegistry newMeterRegistry(String connectorName, CouchbaseSourceTaskConfig config) {
        String taskId = ConnectHelper.getTaskIdFromLoggingContext().orElse(config.maybeTaskId());
        LinkedHashMap<String, String> commonKeyProperties = new LinkedHashMap<String, String>();
        commonKeyProperties.put("connector", ObjectName.quote(connectorName));
        commonKeyProperties.put("task", taskId);
        return JmxHelper.newJmxMeterRegistry("kafka.connect.couchbase", commonKeyProperties);
    }

    @Override
    public void run() {
        try {
            this.client.connect().block();
            StreamFrom fallbackStreamFrom = this.streamFrom.withoutSavedOffset();
            this.client.initializeState(fallbackStreamFrom.asDcpStreamFrom(), StreamTo.INFINITY).block();
            if (this.streamFrom.isSavedOffset()) {
                this.initFailoverLogs();
                this.restoreSavedOffsets();
            }
            this.client.startStreaming(this.partitions).block();
        }
        catch (Throwable t) {
            this.errorQueue.offer(t);
        }
    }

    private void restoreSavedOffsets() {
        LOGGER.info("Resuming from saved offsets for {} of {} partitions: {}", new Object[]{this.partitionToSavedOffset.size(), this.partitions.size(), PartitionSet.from(this.partitionToSavedOffset.keySet())});
        TreeMap<Integer, Long> partitionToFallbackUuid = new TreeMap<Integer, Long>();
        for (Map.Entry<Integer, SourceOffset> entry : this.partitionToSavedOffset.entrySet()) {
            int partition = entry.getKey();
            SourceOffset offset = entry.getValue();
            StreamOffset streamOffset = offset.asStreamOffset();
            if (streamOffset.getVbuuid() == 0L) {
                long currentVbuuid = this.client.sessionState().get(partition).getLastUuid();
                streamOffset = offset.withVbucketUuid(currentVbuuid).asStreamOffset();
                partitionToFallbackUuid.put(partition, currentVbuuid);
            }
            this.client.sessionState().set(partition, PartitionState.fromOffset((StreamOffset)streamOffset));
        }
        if (!partitionToFallbackUuid.isEmpty()) {
            LOGGER.info("Some source offsets are missing a partition UUID. This is normal if you're upgrading from connector version 3.4.5 or earlier. This message should go away after a document from each partition is published to Kafka. Here is the map from partition number to the latest partition UUID used as a fallback: {}", partitionToFallbackUuid);
        }
    }

    private void initFailoverLogs() {
        this.client.failoverLogs(this.partitions).doOnNext(event -> {
            int partition = DcpFailoverLogResponse.vbucket((ByteBuf)event);
            PartitionState ps = this.client.sessionState().get(partition);
            ps.setFailoverLog(DcpFailoverLogResponse.entries((ByteBuf)event));
            this.client.sessionState().set(partition, ps);
        }).blockLast();
    }

    public void shutdown() {
        try {
            this.client.disconnect().block();
        }
        finally {
            this.meterRegistry.close();
        }
    }
}

