/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;

import com.mongodb.MongoException;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongodbWriter
extends AbstractSinkWriter<SeaTunnelRow, Void> {
    private static final Logger log = LoggerFactory.getLogger(MongodbWriter.class);
    private MongodbClientProvider collectionProvider;
    private final DocumentSerializer<SeaTunnelRow> serializer;
    private long bulkActions;
    private final List<WriteModel<BsonDocument>> bulkRequests;
    private int maxRetries;
    private long retryIntervalMs;
    private long batchIntervalMs;
    private volatile long lastSendTime = 0L;
    private final SinkWriter.Context context;

    public MongodbWriter(DocumentSerializer<SeaTunnelRow> serializer, MongodbWriterOptions options, SinkWriter.Context context) {
        this.initOptions(options);
        this.context = context;
        this.serializer = serializer;
        this.bulkRequests = new ArrayList<WriteModel<BsonDocument>>();
    }

    private void initOptions(MongodbWriterOptions options) {
        this.maxRetries = options.getRetryMax();
        this.retryIntervalMs = options.getRetryInterval();
        this.collectionProvider = MongodbCollectionProvider.builder().connectionString(options.getConnectString()).database(options.getDatabase()).collection(options.getCollection()).build();
        this.bulkActions = options.getFlushSize();
        this.batchIntervalMs = options.getBatchIntervalMs();
    }

    public void write(SeaTunnelRow o) throws IOException {
        this.bulkRequests.add(this.serializer.serializeToWriteModel(o));
        if (this.isOverMaxBatchSizeLimit() || this.isOverMaxBatchIntervalLimit()) {
            this.doBulkWrite();
        }
    }

    @Override
    public Optional<Void> prepareCommit() {
        this.doBulkWrite();
        return Optional.empty();
    }

    public void close() throws IOException {
        this.doBulkWrite();
        if (this.collectionProvider != null) {
            this.collectionProvider.close();
        }
    }

    synchronized void doBulkWrite() {
        if (this.bulkRequests.isEmpty()) {
            return;
        }
        boolean success = IntStream.rangeClosed(0, this.maxRetries).anyMatch(i -> {
            try {
                this.lastSendTime = System.currentTimeMillis();
                this.collectionProvider.getDefaultCollection().bulkWrite(this.bulkRequests, new BulkWriteOptions().ordered(true));
                this.bulkRequests.clear();
                return true;
            }
            catch (MongoException e) {
                log.debug("Bulk Write to MongoDB failed, retry times = {}", (Object)i, (Object)e);
                if (i >= this.maxRetries) {
                    throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCode.WRITER_OPERATION_FAILED, "Bulk Write to MongoDB failed", e);
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(this.retryIntervalMs * (long)(i + 1));
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCode.WRITER_OPERATION_FAILED, "Unable to flush; interrupted while doing another attempt", e);
                }
                return false;
            }
        });
        if (!success) {
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCode.WRITER_OPERATION_FAILED, "Bulk Write to MongoDB failed after max retries");
        }
    }

    private boolean isOverMaxBatchSizeLimit() {
        return this.bulkActions != -1L && (long)this.bulkRequests.size() >= this.bulkActions;
    }

    private boolean isOverMaxBatchIntervalLimit() {
        long lastSentInterval = System.currentTimeMillis() - this.lastSendTime;
        return this.batchIntervalMs != -1L && lastSentInterval >= this.batchIntervalMs;
    }
}

