/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.mongosync;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoException;
import com.mongodb.bulk.BulkWriteError;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.model.Namespace;
import com.mongodb.mongosync.MongoSyncOptions;
import com.mongodb.shardsync.ShardClient;
import java.util.ArrayList;
import java.util.List;
import org.bson.BsonSerializationException;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.RawBsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCollectionCloneWorker {
    protected static final Logger logger = LoggerFactory.getLogger(AbstractCollectionCloneWorker.class);
    protected Namespace ns;
    protected Document shardCollection;
    protected ShardClient sourceShardClient;
    protected ShardClient destShardClient;
    protected MongoSyncOptions options;
    protected MongoDatabase sourceDb;
    protected MongoDatabase destDb;
    protected MongoCollection<RawBsonDocument> sourceCollection;
    protected MongoCollection<RawBsonDocument> destCollection;
    protected List<RawBsonDocument> docsBuffer;
    List<WriteModel<RawBsonDocument>> writesBuffer;
    protected static final BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
    protected long successCount;
    protected long errorCount;
    protected long duplicateKeyCount;
    protected BsonValue lastId;
    protected BsonValue previousBatchLastId;

    public abstract void run();

    public abstract void shutdown();

    public AbstractCollectionCloneWorker(Namespace ns, ShardClient sourceShardClient, ShardClient destShardClient, MongoSyncOptions options) {
        this.ns = ns;
        this.sourceShardClient = sourceShardClient;
        this.destShardClient = destShardClient;
        this.options = options;
        if (ns != null) {
            this.sourceDb = sourceShardClient.getMongoClient().getDatabase(ns.getDatabaseName());
            this.sourceCollection = this.sourceDb.getCollection(ns.getCollectionName(), RawBsonDocument.class);
            this.destDb = destShardClient.getMongoClient().getDatabase(ns.getDatabaseName());
            this.destCollection = this.destDb.getCollection(ns.getCollectionName(), RawBsonDocument.class);
            this.shardCollection = sourceShardClient.getCollectionsMap().get(ns.getNamespace());
        }
        this.docsBuffer = new ArrayList<RawBsonDocument>(options.getBatchSize());
        this.writesBuffer = new ArrayList<WriteModel<RawBsonDocument>>(options.getBatchSize());
    }

    protected static BsonValue getId(RawBsonDocument doc) {
        BsonValue lastId = null;
        try {
            lastId = doc.get((Object)"_id");
        }
        catch (BsonSerializationException bsonSerializationException) {
            // empty catch block
        }
        return lastId;
    }

    private int getDuplicateKeyErrorCount(List<BulkWriteError> errors) {
        int count = 0;
        for (BulkWriteError bwe : errors) {
            if (bwe.getCode() == 11000) {
                ++count;
                continue;
            }
            logger.warn(String.format("%s - insertMany() error : %s", this.ns, bwe.getMessage()));
        }
        return count;
    }

    protected Number getCount() {
        Number total = null;
        try {
            total = ShardClient.countDocuments(this.sourceDb, this.sourceCollection);
            logger.debug(String.format("%s - count: %s documents", this.ns, total));
            return total;
        }
        catch (MongoException me) {
            logger.warn("error getting collection count for ns: {}. will retry with estimated count, message: {}", (Object)this.ns, (Object)me.getMessage());
            try {
                total = ShardClient.estimatedDocumentCount(this.sourceDb, this.sourceCollection);
                logger.debug(String.format("%s - count: %s documents", this.ns, total));
                return total;
            }
            catch (MongoException me2) {
                logger.warn("2nd attempt, error getting estimated collection count for ns: {}, message: {}", (Object)this.ns, (Object)me2.getMessage());
                return total;
            }
        }
    }

    protected void doInsert() {
        boolean retry = false;
        try {
            BulkWriteResult result = this.destCollection.bulkWrite(this.writesBuffer, bulkWriteOptions);
            this.successCount += (long)result.getInsertedCount();
        }
        catch (MongoBulkWriteException bwe) {
            List errors = bwe.getWriteErrors();
            int batchDuplicateKeyCount = this.getDuplicateKeyErrorCount(errors);
            int batchErrorCount = bwe.getWriteErrors().size() - batchDuplicateKeyCount;
            this.errorCount += (long)batchErrorCount;
            this.duplicateKeyCount += (long)batchDuplicateKeyCount;
        }
        catch (MongoException e) {
            logger.warn(String.format("%s - insertMany() unexpected error: %s", this.ns, e.getMessage()));
            retry = true;
        }
        if (retry) {
            int pos = 0;
            BsonValue prevId = null;
            BsonValue id = null;
            for (RawBsonDocument doc : this.docsBuffer) {
                try {
                    id = AbstractCollectionCloneWorker.getId(doc);
                    this.destCollection.insertOne((Object)doc);
                    ++this.successCount;
                    prevId = id;
                }
                catch (MongoException me) {
                    logger.warn(String.format("%s - {_id: %s, prevId: %s, pos: %s} retry using insertOne() unexpected error: %s", this.ns, id, prevId, pos, me.getMessage()));
                    ++this.errorCount;
                }
                ++pos;
            }
        }
    }
}

