/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.mongosync;

import com.mongodb.client.MongoClient;
import com.mongodb.client.model.WriteModel;
import com.mongodb.model.Namespace;
import com.mongodb.mongosync.ApplyOperationsHelper;
import com.mongodb.mongosync.BulkWriteOutput;
import com.mongodb.mongosync.LookupTransformer;
import com.mongodb.mongosync.MongoSyncOptions;
import com.mongodb.mongosync.OplogBatch;
import com.mongodb.mongosync.OplogQueueEntry;
import com.mongodb.mongosync.OplogTailMonitor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChildOplogWorker
implements Runnable {
    protected static final Logger logger = LoggerFactory.getLogger(ChildOplogWorker.class);
    private String shardId;
    private BlockingQueue<OplogQueueEntry> workQueue;
    private ApplyOperationsHelper applyOperationsHelper;
    private OplogTailMonitor oplogTailMonitor;
    private boolean shutdown = false;
    private Map<String, OplogBatch> oplogBatches;
    private BsonTimestamp lastTimestamp = null;
    private MongoSyncOptions options;
    private Map<String, LookupTransformer> lookupTransformers;
    private long lastFlushMillis;

    public ChildOplogWorker(String shardId, BlockingQueue<OplogQueueEntry> workQueue, ApplyOperationsHelper applyOperationsHelper, OplogTailMonitor oplogTailMonitor, MongoSyncOptions options) {
        this.shardId = shardId;
        this.workQueue = workQueue;
        this.applyOperationsHelper = applyOperationsHelper;
        this.oplogTailMonitor = oplogTailMonitor;
        this.oplogBatches = new HashMap<String, OplogBatch>();
        this.options = options;
        this.initializeTransformers();
    }

    private void initializeTransformers() {
        for (Map.Entry<String, String> entry : this.options.getTransformersMap().entrySet()) {
            if (this.lookupTransformers == null) {
                this.lookupTransformers = new HashMap<String, LookupTransformer>();
            }
            MongoClient sourceClient = this.options.getSourceShardClient().getMongoClient();
            Namespace ns = new Namespace(entry.getKey());
            LookupTransformer transformer = new LookupTransformer(sourceClient, ns.getDatabaseName(), ns.getCollectionName(), entry.getValue());
            this.lookupTransformers.put(entry.getKey(), transformer);
        }
    }

    public void stop() {
        this.shutdown = true;
    }

    private void flush(int minThreshold) {
        for (Map.Entry<String, OplogBatch> entry : this.oplogBatches.entrySet()) {
            Namespace ns = new Namespace(entry.getKey());
            this.flush(minThreshold, ns, entry.getValue());
        }
    }

    private void flush(int minThreshold, Namespace ns, OplogBatch oplogBatch) {
        BulkWriteOutput output = null;
        long current = System.currentTimeMillis();
        long elapsed = current - this.lastFlushMillis;
        int size = oplogBatch.size();
        if (size > minThreshold || size > 0 && elapsed > 15000L) {
            output = this.applyOperationsHelper.applyBulkWriteModelsOnCollection(ns, oplogBatch);
            oplogBatch.clear();
            this.lastFlushMillis = current;
        }
        if (output != null) {
            this.oplogTailMonitor.updateStatus(output);
            this.oplogTailMonitor.setLatestTimestamp(new BsonTimestamp(this.lastTimestamp.getValue()));
        }
    }

    @Override
    public void run() {
        while (!this.shutdown) {
            try {
                WriteModel<BsonDocument> model;
                LookupTransformer tran;
                OplogQueueEntry currentQueueEntry;
                block12: {
                    currentQueueEntry = null;
                    try {
                        currentQueueEntry = this.workQueue.poll(5L, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException e) {
                        if (!this.shutdown) break block12;
                        logger.debug("{}: interruped, breaking", (Object)this.shardId);
                        break;
                    }
                }
                if (currentQueueEntry == null) {
                    this.flush(this.options.getOplogBatchSize());
                    continue;
                }
                BsonDocument currentDocument = currentQueueEntry.doc;
                String ns = currentDocument.getString((Object)"ns").getValue();
                Namespace namespace = new Namespace(ns);
                String op = currentDocument.getString((Object)"op").getValue();
                if (op.equals("c") || namespace.getCollectionName().equals("system.indexes")) continue;
                OplogBatch batch = this.oplogBatches.get(ns);
                if (batch == null) {
                    batch = new OplogBatch(this.options.getOplogBatchSize());
                    this.oplogBatches.put(ns, batch);
                }
                if (this.lookupTransformers != null && op.equals("u") && (tran = this.lookupTransformers.get(ns)) != null) {
                    BsonDocument updateQuery = currentDocument.getDocument((Object)"o2");
                    ObjectId id = updateQuery.getObjectId((Object)"_id").getValue();
                    BsonValue lookupValue = null;
                    try {
                        lookupValue = tran.lookup(id);
                        updateQuery.put(tran.getLookupValueKey(), lookupValue);
                    }
                    catch (Exception e) {
                        logger.warn("ChildOplogWorker exception executing transformation lookup", (Throwable)e);
                    }
                }
                if ((model = ApplyOperationsHelper.getWriteModelForOperation(currentDocument)) != null) {
                    batch.addWriteModel(model, currentQueueEntry.id);
                } else {
                    logger.warn("{}: ignoring oplog entry. could not convert the document to model. Given document is {}", (Object)this.shardId, (Object)currentDocument.toJson());
                }
                this.lastTimestamp = currentDocument.getTimestamp((Object)"ts");
                this.flush(this.options.getOplogBatchSize(), namespace, batch);
            }
            catch (Exception e) {
                logger.error("{}: ChildOplogWorker error", (Object)this.shardId, (Object)e);
            }
        }
        logger.debug("{}: child flush", (Object)this.shardId);
        this.flush(0);
    }
}

