/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.mongosync;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mongodb.CursorType;
import com.mongodb.MongoException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.model.ShardTimestamp;
import com.mongodb.mongosync.AbstractOplogTailWorker;
import com.mongodb.mongosync.ApplyOperationsHelper;
import com.mongodb.mongosync.ChildOplogWorker;
import com.mongodb.mongosync.MongoSyncOptions;
import com.mongodb.mongosync.OplogQueueEntry;
import com.mongodb.mongosync.OplogTailMonitor;
import com.mongodb.mongosync.TimestampFile;
import com.mongodb.shardsync.ShardClient;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.bson.conversions.Bson;

public class MultiBufferOplogTailWorker
extends AbstractOplogTailWorker
implements Runnable {
    private int numChildWorkers;
    private Map<Integer, ChildOplogWorker> childWorkers;
    private List<ExecutorService> childExecutors;
    ScheduledExecutorService monitorExecutor;
    private Map<Integer, BlockingQueue<OplogQueueEntry>> childQueues;

    public MultiBufferOplogTailWorker(ShardTimestamp shardTimestamp, TimestampFile timestampFile, ShardClient sourceShardClient, ShardClient destShardClient, MongoSyncOptions options) throws IOException {
        this.shardId = shardTimestamp.getShardName();
        this.shardTimestamp = shardTimestamp;
        this.sourceShardClient = sourceShardClient;
        this.destShardClient = destShardClient;
        this.options = options;
        this.numChildWorkers = options.getOplogThreads();
        this.childWorkers = new HashMap<Integer, ChildOplogWorker>(this.numChildWorkers);
        this.childExecutors = new ArrayList<ExecutorService>(this.numChildWorkers);
        this.childQueues = new HashMap<Integer, BlockingQueue<OplogQueueEntry>>(this.numChildWorkers);
        this.oplogTailMonitor = new OplogTailMonitor(timestampFile, sourceShardClient, this.childQueues);
        this.applyOperationsHelper = new ApplyOperationsHelper(this.shardId, this.oplogTailMonitor, destShardClient);
    }

    private void startChildExecutors() {
        int i = 0;
        while (i < this.numChildWorkers) {
            LinkedBlockingQueue<OplogQueueEntry> childQueue = new LinkedBlockingQueue<OplogQueueEntry>(this.options.getOplogQueueSize());
            this.childQueues.put(i, childQueue);
            ChildOplogWorker worker = new ChildOplogWorker(this.shardId, childQueue, this.applyOperationsHelper, this.oplogTailMonitor, this.options);
            this.childWorkers.put(i, worker);
            ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("child-oplog-worker_" + this.shardId + "_" + i).setDaemon(true).build());
            this.childExecutors.add(executor);
            executor.execute(worker);
            ++i;
        }
    }

    private void stopChildExecutors() {
        for (ChildOplogWorker worker : this.childWorkers.values()) {
            logger.debug("{}: stopping worker", (Object)this.shardId);
            worker.stop();
        }
        for (ExecutorService executor : this.childExecutors) {
            executor.shutdown();
            while (!executor.isTerminated()) {
                try {
                    Thread.sleep(10000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }
        logger.debug("{}: childExecutors shutdown", (Object)this.shardId);
    }

    @Override
    public void run() {
        block12: {
            this.startChildExecutors();
            this.monitorExecutor = Executors.newScheduledThreadPool(1);
            this.monitorExecutor.scheduleAtFixedRate(this.oplogTailMonitor, 0L, 30L, TimeUnit.SECONDS);
            MongoDatabase local = this.sourceShardClient.getShardMongoClient(this.shardId).getDatabase("local");
            MongoCollection oplog = local.getCollection("oplog.rs", BsonDocument.class);
            MongoCursor cursor = null;
            Set<String> includedNamespaces = this.options.getIncludedNamespaceStrings();
            Set<String> excludedNamespaces = this.options.getExcludedNamespaceStrings();
            Bson query = null;
            query = includedNamespaces.isEmpty() && excludedNamespaces.isEmpty() ? Filters.and((Bson[])new Bson[]{Filters.gt((String)"ts", (Object)this.shardTimestamp.getTimestamp()), Filters.ne((String)"op", (Object)"n")}) : (excludedNamespaces.size() > 0 ? Filters.and((Bson[])new Bson[]{Filters.gt((String)"ts", (Object)this.shardTimestamp.getTimestamp()), Filters.ne((String)"op", (Object)"n"), Filters.nin((String)"ns", excludedNamespaces)}) : Filters.and((Bson[])new Bson[]{Filters.gt((String)"ts", (Object)this.shardTimestamp.getTimestamp()), Filters.ne((String)"op", (Object)"n"), Filters.in((String)"ns", includedNamespaces)}));
            long start = System.currentTimeMillis();
            try {
                try {
                    logger.debug("{}: starting oplog tail query: {}", (Object)this.shardId, (Object)query);
                    long count = 0L;
                    cursor = oplog.find(query).sort(Filters.eq((String)"$natural", (Object)1)).oplogReplay(true).noCursorTimeout(true).cursorType(CursorType.TailableAwait).iterator();
                    while (cursor.hasNext() && !this.shutdown) {
                        BsonDocument doc = (BsonDocument)cursor.next();
                        String op = doc.getString((Object)"op").getValue();
                        if (op.equals("n") || op.equals("c")) continue;
                        this.currentNs = doc.getString((Object)"ns").getValue();
                        if (!this.options.includeNamespace(this.currentNs) || this.currentNs == null || this.currentNs.equals("") || this.currentNs.startsWith("config.")) continue;
                        this.addToBuffer(doc);
                        ++count;
                    }
                    long end = System.currentTimeMillis();
                    Double dur = (double)(end - start) / 1000.0;
                    logger.debug("{}: oplog tail complete, {} operations applied in {} seconds", new Object[]{this.shardId, count, dur});
                }
                catch (Exception e) {
                    logger.error("{}: tail error", (Object)this.shardId, (Object)e);
                    try {
                        cursor.close();
                    }
                    catch (Exception exception) {}
                    break block12;
                }
            }
            catch (Throwable throwable) {
                try {
                    cursor.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                throw throwable;
            }
            try {
                cursor.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        this.stopChildExecutors();
    }

    private int getCombinedHashModulo(String ns, BsonValue id) {
        int hash = 7;
        hash = 31 * hash + (id == null ? 0 : id.hashCode());
        hash = (hash = 31 * hash + (ns == null ? 0 : ns.hashCode())) >= 0 ? (hash %= this.numChildWorkers) : -hash % this.numChildWorkers;
        return hash;
    }

    @Override
    protected void addToBuffer(BsonDocument doc) throws InterruptedException {
        if (this.currentNs.endsWith(".$cmd")) {
            logger.debug("$cmd: {}", (Object)doc);
        }
        BsonValue id = this.getIdForOperation(doc);
        Integer hashKey = this.getCombinedHashModulo(this.currentNs, id);
        BlockingQueue<OplogQueueEntry> childQueue = this.childQueues.get(hashKey);
        childQueue.put(new OplogQueueEntry(doc, id));
    }

    private BsonValue getIdForOperation(BsonDocument operation) throws MongoException {
        String opType;
        switch (opType = operation.getString((Object)"op").getValue()) {
            case "u": {
                BsonDocument o2 = operation.getDocument((Object)"o2");
                if (o2 != null) {
                    BsonValue id = o2.get((Object)"_id");
                    if (id != null) {
                        return id;
                    }
                    logger.warn("{}: did not find o2._id field for update oplog entry: {}", (Object)this.shardId, (Object)operation);
                    break;
                }
                logger.error("{}: did not find o2 field for update oplog entry: {}", (Object)this.shardId, (Object)operation);
                return null;
            }
            case "d": 
            case "i": {
                BsonDocument oDoc = operation.getDocument((Object)"o");
                if (oDoc != null) {
                    BsonValue id = oDoc.get((Object)"_id");
                    if (id != null) {
                        return id;
                    }
                    logger.warn("{}: did not find o._id field for insert/delete oplog entry: {}", (Object)this.shardId, (Object)operation);
                    break;
                }
                logger.error("{}: did not find o field for insert/delete oplog entry: {}", (Object)this.shardId, (Object)operation);
                break;
            }
            default: {
                logger.error(String.format("{}: unexpected operation %s; op: %s", this.shardId, opType, operation.toJson()));
            }
        }
        return null;
    }

    private Integer getModuloKeyForBsonValue(BsonValue val) {
        Integer key = null;
        int hash = val.hashCode();
        key = hash >= 0 ? Integer.valueOf(hash % this.numChildWorkers) : Integer.valueOf(-hash % this.numChildWorkers);
        return key;
    }

    private Integer getModuloKeyForNamespace(String ns) {
        Integer key = null;
        int hash = ns.hashCode();
        key = hash >= 0 ? Integer.valueOf(hash % this.numChildWorkers) : Integer.valueOf(-hash % this.numChildWorkers);
        return key;
    }

    @Override
    protected synchronized void stop() {
        logger.debug("{}: OplogTailWorker got stop signal", (Object)this.shardId);
        this.shutdown = true;
        for (ChildOplogWorker worker : this.childWorkers.values()) {
            worker.stop();
        }
        int i = 0;
        for (ExecutorService executor : this.childExecutors) {
            try {
                logger.debug("{}: starting shutdown of executor {}", (Object)this.shardId, (Object)i++);
                executor.shutdown();
                executor.awaitTermination(240L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                logger.error("Timed out waiting for executor to complete", (Throwable)e);
            }
        }
        logger.debug("{}: child executors shutdown", (Object)this.shardId);
        try {
            this.monitorExecutor.shutdown();
            this.monitorExecutor.awaitTermination(60L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            logger.error("Timed out waiting for monitor executor to complete", (Throwable)e);
        }
        logger.debug("{}: monitor shutdown", (Object)this.shardId);
    }
}

