/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.connect.kafka;

import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.dcp.highlevel.DocumentChange;
import com.couchbase.client.dcp.metrics.LogLevel;
import com.couchbase.connect.kafka.config.common.LoggingConfig;
import com.couchbase.connect.kafka.handler.source.CouchbaseSourceRecord;
import com.couchbase.connect.kafka.util.ConnectHelper;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.source.SourceRecord;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceDocumentLifecycle {
    private static final Logger log = LoggerFactory.getLogger(SourceDocumentLifecycle.class);
    private final String taskId = ConnectHelper.getTaskIdFromLoggingContext().orElse("?");
    private final String taskUuid;
    private final LogLevel logLevel;

    private boolean enabled(Milestone milestone) {
        return this.logLevel.isEnabled(milestone.logger);
    }

    public static SourceDocumentLifecycle create(String taskUuid, LoggingConfig config) {
        return new SourceDocumentLifecycle(taskUuid, config);
    }

    private SourceDocumentLifecycle(String taskUuid, LoggingConfig config) {
        this.taskUuid = Objects.requireNonNull(taskUuid);
        this.logLevel = config.logDocumentLifecycle() ? LogLevel.INFO : LogLevel.DEBUG;
        log.info("Logging document lifecycle milestones to this category at {} level", (Object)this.logLevel);
    }

    public void logReceivedFromCouchbase(DocumentChange event) {
        if (this.enabled(Milestone.RECEIVED_FROM_COUCHBASE)) {
            LinkedHashMap<String, Object> details = new LinkedHashMap<String, Object>();
            details.put("connectTaskId", this.taskId);
            details.put("revision", event.getRevision());
            details.put("type", event.isMutation() ? "mutation" : "deletion");
            details.put("partition", event.getVbucket());
            details.put("sequenceNumber", event.getOffset().getSeqno());
            details.put("sizeInBytes", event.getContent().length);
            details.put("usSinceCouchbaseChange(might be inaccurate before Couchbase 7)", event.getTimestamp().until(Instant.now(), ChronoUnit.MICROS));
            this.logMilestone(event, Milestone.RECEIVED_FROM_COUCHBASE, details);
        }
    }

    public void logSkippedBecauseJsonpathFilterSaysIgnore(DocumentChange event) {
        this.logMilestone(event, Milestone.SKIPPED_BECAUSE_JSONPATH_SAYS_IGNORE);
    }

    public void logSkippedBecauseFilterSaysIgnore(DocumentChange event) {
        this.logMilestone(event, Milestone.SKIPPED_BECAUSE_FILTER_SAYS_IGNORE);
    }

    public void logSkippedBecauseHandlerSaysIgnore(DocumentChange event) {
        this.logMilestone(event, Milestone.SKIPPED_BECAUSE_HANDLER_SAYS_IGNORE);
    }

    public void logConvertedToKafkaRecord(DocumentChange event, SourceRecord record) {
        if (this.enabled(Milestone.CONVERTED_TO_KAFKA_RECORD)) {
            LinkedHashMap<String, Object> details = new LinkedHashMap<String, Object>();
            details.put("topic", record.topic());
            details.put("key", record.key());
            details.put("kafkaPartition", record.kafkaPartition());
            details.put("sourcePartition", record.sourcePartition());
            details.put("sourceOffset", record.sourceOffset());
            this.logMilestone(event, Milestone.CONVERTED_TO_KAFKA_RECORD, details);
        }
    }

    public void logCommittedToKafkaTopic(CouchbaseSourceRecord sourceRecord, @Nullable RecordMetadata metadata) {
        this.logMilestone(sourceRecord, Milestone.COMMITTED_TO_TOPIC, this.toMap(metadata));
    }

    public void logSourceOffsetUpdateCommittedToBlackHoleTopic(CouchbaseSourceRecord sourceRecord, @Nullable RecordMetadata metadata) {
        this.logMilestone(sourceRecord, Milestone.SOURCE_OFFSET_UPDATE_COMMITTED_TO_BLACK_HOLE_TOPIC, this.toMap(metadata));
    }

    private Map<String, Object> toMap(@Nullable RecordMetadata metadata) {
        LinkedHashMap<String, Object> details = new LinkedHashMap<String, Object>();
        if (metadata != null) {
            details.put("topic", metadata.topic());
            details.put("partition", metadata.partition());
            details.put("offset", metadata.offset());
            details.put("timestamp", metadata.timestamp());
            details.put("serializedKeySize", metadata.serializedKeySize());
            details.put("serializedValueSize", metadata.serializedValueSize());
        }
        return CbCollections.mapOf((Object)"recordMetadata", details);
    }

    private void logMilestone(DocumentChange event, Milestone milestone) {
        this.logMilestone(event, milestone, Collections.emptyMap());
    }

    private void logMilestone(CouchbaseSourceRecord sourceRecord, Milestone milestone, Map<String, Object> milestoneDetails) {
        if (this.enabled(milestone)) {
            LinkedHashMap<String, Object> message = new LinkedHashMap<String, Object>();
            message.put("milestone", (Object)milestone);
            message.put("tracingToken", sourceRecord.getTracingToken());
            ConnectHelper.getConnectorContextFromLoggingContext().ifPresent(it -> message.put("context", it));
            message.put("documentId", sourceRecord.getCouchbaseDocumentId());
            message.putAll(milestoneDetails);
            message.put("taskUuid", this.taskUuid);
            this.doLog(milestone.logger, message);
        }
    }

    private void logMilestone(DocumentChange event, Milestone milestone, Map<String, Object> milestoneDetails) {
        if (this.enabled(milestone)) {
            LinkedHashMap<String, Object> message = new LinkedHashMap<String, Object>();
            message.put("milestone", (Object)milestone);
            message.put("tracingToken", event.getTracingToken());
            ConnectHelper.getConnectorContextFromLoggingContext().ifPresent(it -> message.put("context", it));
            message.put("documentId", event.getQualifiedKey());
            message.putAll(milestoneDetails);
            message.put("taskUuid", this.taskUuid);
            this.doLog(milestone.logger, message);
        }
    }

    private void doLog(Logger logger, Object message) {
        try {
            this.logLevel.log(logger, Mapper.encodeAsString((Object)message));
        }
        catch (Exception e) {
            this.logLevel.log(logger, message.toString());
        }
    }

    public static enum Milestone {
        RECEIVED_FROM_COUCHBASE,
        SKIPPED_BECAUSE_JSONPATH_SAYS_IGNORE,
        SKIPPED_BECAUSE_FILTER_SAYS_IGNORE,
        SKIPPED_BECAUSE_HANDLER_SAYS_IGNORE,
        CONVERTED_TO_KAFKA_RECORD,
        COMMITTED_TO_TOPIC,
        SOURCE_OFFSET_UPDATE_COMMITTED_TO_BLACK_HOLE_TOPIC;

        private final Logger logger = LoggerFactory.getLogger((String)(SourceDocumentLifecycle.class.getName() + "." + this.name()));
    }
}

