/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.kafka.connect.sink;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoNamespace;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData;
import com.mongodb.kafka.connect.sink.MongoSinkConfig;
import com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor;
import com.mongodb.kafka.connect.sink.MongoSinkTask;
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
import com.mongodb.kafka.connect.sink.RateLimitSettings;
import com.mongodb.kafka.connect.sink.dlq.AnalyzedBatchFailedWithBulkWriteException;
import com.mongodb.kafka.connect.sink.dlq.ErrorReporter;
import com.mongodb.kafka.connect.util.TimeseriesValidation;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class StartedMongoSinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoSinkTask.class);
    private final MongoSinkConfig sinkConfig;
    private final MongoClient mongoClient;
    private final ErrorReporter errorReporter;
    private final Set<MongoNamespace> checkedTimeseriesNamespaces;

    StartedMongoSinkTask(MongoSinkConfig sinkConfig, MongoClient mongoClient, ErrorReporter errorReporter) {
        this.sinkConfig = sinkConfig;
        this.mongoClient = mongoClient;
        this.errorReporter = errorReporter;
        this.checkedTimeseriesNamespaces = new HashSet<MongoNamespace>();
    }

    void stop() {
        this.mongoClient.close();
    }

    void put(Collection<SinkRecord> records) {
        if (records.isEmpty()) {
            LOGGER.debug("No sink records to process for current poll operation");
            return;
        }
        MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(records, this.sinkConfig, this.errorReporter).forEach(this::bulkWriteBatch);
    }

    private void bulkWriteBatch(List<MongoProcessedSinkRecordData> batch) {
        if (batch.isEmpty()) {
            return;
        }
        MongoNamespace namespace = batch.get(0).getNamespace();
        MongoSinkTopicConfig config = batch.get(0).getConfig();
        this.checkTimeseries(namespace, config);
        List writeModels = batch.stream().map(MongoProcessedSinkRecordData::getWriteModel).collect(Collectors.toList());
        boolean bulkWriteOrdered = config.getBoolean("bulk.write.ordered");
        try {
            LOGGER.debug("Bulk writing {} document(s) into collection [{}] via an {} bulk write", new Object[]{writeModels.size(), namespace.getFullName(), bulkWriteOrdered ? "ordered" : "unordered"});
            BulkWriteResult result = this.mongoClient.getDatabase(namespace.getDatabaseName()).getCollection(namespace.getCollectionName(), BsonDocument.class).bulkWrite(writeModels, new BulkWriteOptions().ordered(bulkWriteOrdered));
            LOGGER.debug("Mongodb bulk write result: {}", (Object)result);
            StartedMongoSinkTask.checkRateLimit(config);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new DataException("Rate limiting was interrupted", (Throwable)e);
        }
        catch (RuntimeException e) {
            this.handleTolerableWriteException(batch.stream().map(MongoProcessedSinkRecordData::getSinkRecord).collect(Collectors.toList()), bulkWriteOrdered, e, config.logErrors(), config.tolerateErrors());
        }
    }

    private void checkTimeseries(MongoNamespace namespace, MongoSinkTopicConfig config) {
        if (!this.checkedTimeseriesNamespaces.contains(namespace)) {
            if (config.isTimeseries()) {
                TimeseriesValidation.validateCollection(this.mongoClient, namespace, config);
            }
            this.checkedTimeseriesNamespaces.add(namespace);
        }
    }

    private static void checkRateLimit(MongoSinkTopicConfig config) throws InterruptedException {
        RateLimitSettings rls = config.getRateLimitSettings();
        if (rls.isTriggered()) {
            LOGGER.debug("Rate limit settings triggering {}ms defer timeout after processing {} further batches for topic {}", new Object[]{rls.getTimeoutMs(), rls.getEveryN(), config.getTopic()});
            Thread.sleep(rls.getTimeoutMs());
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void handleTolerableWriteException(List<SinkRecord> batch, boolean ordered, RuntimeException e, boolean logErrors, boolean tolerateErrors) {
        if (e instanceof MongoBulkWriteException) {
            AnalyzedBatchFailedWithBulkWriteException analyzedBatch = new AnalyzedBatchFailedWithBulkWriteException(batch, ordered, (MongoBulkWriteException)e, this.errorReporter, StartedMongoSinkTask::log);
            if (logErrors) {
                LOGGER.error("Failed to put into the sink some records, see log entries below for the details", (Throwable)e);
                analyzedBatch.log();
            }
            if (!tolerateErrors) throw new DataException((Throwable)e);
            analyzedBatch.report();
            return;
        } else {
            if (logErrors) {
                StartedMongoSinkTask.log(batch, e);
            }
            if (tolerateErrors) return;
            throw new DataException((Throwable)e);
        }
    }

    private static void log(Collection<SinkRecord> records, RuntimeException e) {
        LOGGER.error("Failed to put into the sink the following records: {}", records, (Object)e);
    }
}

