/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.mongosync;

import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCommandException;
import com.mongodb.client.MongoIterable;
import com.mongodb.model.Namespace;
import com.mongodb.model.Shard;
import com.mongodb.model.ShardTimestamp;
import com.mongodb.mongosync.AbstractCollectionCloneWorker;
import com.mongodb.mongosync.AbstractOplogTailWorker;
import com.mongodb.mongosync.CollectionCloneWorker;
import com.mongodb.mongosync.GetLatestOplogTimestampTask;
import com.mongodb.mongosync.MongoSyncOptions;
import com.mongodb.mongosync.MultiBufferOplogTailWorker;
import com.mongodb.mongosync.ShardedCollectionCloneWorker;
import com.mongodb.mongosync.TimestampFile;
import com.mongodb.shardsync.ShardClient;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.builder.fluent.Configurations;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.codecs.DocumentCodec;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoSync {
    protected static final Logger logger = LoggerFactory.getLogger(MongoSync.class);
    CodecRegistry registry = CodecRegistries.fromRegistries((CodecRegistry[])new CodecRegistry[]{MongoClientSettings.getDefaultCodecRegistry(), CodecRegistries.fromProviders((CodecProvider[])new CodecProvider[]{PojoCodecProvider.builder().automatic(true).build()})});
    DocumentCodec documentCodec = new DocumentCodec(this.registry);
    private static final String SOURCE_URI = "source";
    private static final String DEST_URI = "dest";
    private static final String SPLIT_CHUNKS = "splitChunks";
    private static final String DROP_DEST_DBS = "dropDestDbs";
    private static final String CLEAN_TIMESTAMPS = "cleanTs";
    private static final String MULTI_OPLOG_WORKER = "multiWorker";
    private static final String OPLOG_THREADS = "oplogThreads";
    private static final String OPLOG_QUEUE_SIZE = "oplogQueueSize";
    private static final String OPLOG_BATCH_SIZE = "oplogBatchSize";
    private static final String INITIAL_SYNC = "initialSync";
    private static final String NAME = "name";
    private static final String OPLOG_TRANSFORMERS = "oplogTransformers";
    private static final String CHUNK_SKIP = "skipMatchingCountChunks";
    private static final String SHARD_LIST = "shardList";
    private static Options options;
    private static CommandLine line;
    private MongoSyncOptions mongoSyncOptions;
    private ShardClient sourceShardClient;
    private ShardClient destShardClient;
    private ExecutorService initialSyncExecutor;
    private AbstractCollectionCloneWorker initialSyncWorker;
    private ExecutorService oplogTailExecutor;
    private List<AbstractOplogTailWorker> oplogTailWorkers;
    private Map<String, TimestampFile> timestampFiles;
    private Map<String, ShardTimestamp> shardTimestamps;
    private boolean doInitialSync = true;

    private void initialize() throws IOException {
        Set<String> shardList = this.mongoSyncOptions.getShardList();
        boolean filtered = false;
        if (shardList != null && shardList.size() > 0) {
            filtered = true;
            logger.debug("shard filtered mode, {} shards", (Object)shardList.size());
            this.sourceShardClient = new ShardClient(SOURCE_URI, this.mongoSyncOptions.getSourceMongoUri(), shardList);
        } else {
            this.sourceShardClient = new ShardClient(SOURCE_URI, this.mongoSyncOptions.getSourceMongoUri());
        }
        this.sourceShardClient.init();
        this.sourceShardClient.populateShardMongoClients();
        this.sourceShardClient.populateCollectionsMap();
        this.mongoSyncOptions.setSourceShardClient(this.sourceShardClient);
        this.destShardClient = new ShardClient(DEST_URI, this.mongoSyncOptions.getDestMongoUri());
        this.destShardClient.init();
        this.destShardClient.populateShardMongoClients();
        this.mongoSyncOptions.setDestShardClient(this.destShardClient);
        this.stopSourceBalancer();
    }

    private void stopSourceBalancer() {
        logger.debug("stopSourceBalancer started");
        try {
            this.sourceShardClient.stopBalancer();
        }
        catch (MongoCommandException mce) {
            logger.error("Could not stop balancer on source shard: " + mce.getMessage());
        }
        logger.debug("stopSourceBalancer complete");
    }

    private void collectOplogLatestTimestamps() throws InterruptedException, ExecutionException, IOException {
        ArrayList<GetLatestOplogTimestampTask> tasks = new ArrayList<GetLatestOplogTimestampTask>();
        this.timestampFiles = new HashMap<String, TimestampFile>();
        this.shardTimestamps = new HashMap<String, ShardTimestamp>();
        for (String shardId : this.sourceShardClient.getShardsMap().keySet()) {
            TimestampFile tsFile = new TimestampFile(shardId);
            this.timestampFiles.put(shardId, tsFile);
            if (tsFile.exists() && !this.mongoSyncOptions.isCleanTimestampFiles()) {
                this.doInitialSync = false;
                try {
                    ShardTimestamp st = tsFile.getShardTimestamp();
                    logger.debug(String.format("timestamp file %s exists: %s", tsFile, st));
                    this.shardTimestamps.put(shardId, st);
                    continue;
                }
                catch (IOException e) {
                    logger.error(String.format("Error reading timestamp file %s", tsFile), (Throwable)e);
                    throw e;
                }
            }
            tasks.add(new GetLatestOplogTimestampTask(shardId, this.sourceShardClient));
        }
        if (tasks.size() > 0) {
            int numThreads = tasks.size();
            ExecutorService executor = Executors.newFixedThreadPool(numThreads);
            try {
                List results = executor.invokeAll(tasks);
                for (Future result : results) {
                    ShardTimestamp shardTimestamp = (ShardTimestamp)result.get();
                    this.shardTimestamps.put(shardTimestamp.getShardName(), shardTimestamp);
                    this.sourceShardClient.getShardsMap().get(shardTimestamp.getShardName()).setSyncStartTimestamp(shardTimestamp);
                    logger.debug("GetLatestOplogTimestampTask result: {}", (Object)shardTimestamp.toString());
                    TimestampFile timestampFile = this.timestampFiles.get(shardTimestamp.getShardName());
                    timestampFile.update(shardTimestamp);
                }
            }
            finally {
                executor.shutdown();
            }
        }
    }

    private void cloneCollection(Namespace ns) {
        CollectionCloneWorker worker = new CollectionCloneWorker(ns, this.sourceShardClient, this.destShardClient, this.mongoSyncOptions);
        this.initialSyncExecutor.execute(worker);
    }

    private void shardedInitialSync() {
        logger.debug("Starting sharded initial sync");
        this.initialSyncWorker = new ShardedCollectionCloneWorker(this.sourceShardClient, this.destShardClient, this.mongoSyncOptions);
        this.initialSyncWorker.run();
        logger.debug("Initial sync complete");
        this.initialSyncWorker = null;
    }

    private boolean excludeDbCheck(String dbName) {
        return ShardClient.excludedSystemDbs.contains(dbName) || this.mongoSyncOptions.excludeDb(dbName);
    }

    private void initialSync() {
        logger.debug("Starting initial sync");
        this.initialSyncExecutor = Executors.newFixedThreadPool(this.mongoSyncOptions.getThreads());
        MongoIterable<String> dbNames = this.sourceShardClient.listDatabaseNames();
        for (String dbName : dbNames) {
            if (this.excludeDbCheck(dbName)) continue;
            if (this.mongoSyncOptions.isDropDestDbs()) {
                this.destShardClient.dropDatabase(dbName);
            }
            ArrayList collectionNames = new ArrayList();
            this.sourceShardClient.listCollectionNames(dbName).into(collectionNames);
            logger.debug("{}: collection count: {}", (Object)dbName, (Object)collectionNames.size());
            for (String collectionName : collectionNames) {
                Namespace ns;
                if (collectionName.equals("system.profile") || collectionName.equals("system.indexes") || !this.mongoSyncOptions.includeCollection(dbName, collectionName) || this.mongoSyncOptions.excludeNamespace(ns = new Namespace(dbName, collectionName))) continue;
                this.cloneCollection(ns);
            }
        }
        this.initialSyncExecutor.shutdown();
        try {
            this.initialSyncExecutor.awaitTermination(999L, TimeUnit.DAYS);
        }
        catch (InterruptedException e) {
            logger.warn("ShardedCollectionCloneWorker interrupted");
            Thread.currentThread().interrupt();
        }
        logger.debug("Initial sync complete");
    }

    private void tailOplogs() {
        Collection<Shard> shards = this.sourceShardClient.getShardsMap().values();
        logger.debug("tailOplogs: {} shards", (Object)shards.size());
        this.oplogTailExecutor = Executors.newFixedThreadPool(shards.size());
        this.oplogTailWorkers = new ArrayList<AbstractOplogTailWorker>(shards.size());
        for (Shard shard : shards) {
            this.createWorker(shard.getId());
        }
        this.oplogTailExecutor.shutdown();
        while (!this.oplogTailExecutor.isTerminated()) {
            try {
                Thread.sleep(10000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        logger.debug("tailOplogs shutdown");
        logger.debug("Finished all threads");
    }

    private void createWorker(String shardId) {
        ShardTimestamp shardTimestamp = this.shardTimestamps.get(shardId);
        TimestampFile timestampFile = this.timestampFiles.get(shardId);
        try {
            MultiBufferOplogTailWorker worker = new MultiBufferOplogTailWorker(shardTimestamp, timestampFile, this.sourceShardClient, this.destShardClient, this.mongoSyncOptions);
            this.oplogTailWorkers.add(worker);
            this.oplogTailExecutor.execute(worker);
        }
        catch (IOException e) {
            logger.error(String.format("Error creating OplogTailWorker", e));
        }
    }

    private void execute() {
        this.mongoSyncOptions.isCleanTimestampFiles();
        try {
            this.collectOplogLatestTimestamps();
        }
        catch (IOException | InterruptedException | ExecutionException e) {
            logger.error("Error collecting latest oplog timestamps", (Throwable)e);
            return;
        }
        if (this.doInitialSync) {
            this.shardedInitialSync();
        } else {
            logger.debug("Skippping initial sync, timestamp file(s) exist");
        }
        if (this.mongoSyncOptions.isInitialSyncOnly()) {
            logger.debug("Skippping oplog tailing, initialSyncOnly is set");
        } else {
            this.tailOplogs();
        }
    }

    private void shutdown() {
        if (this.initialSyncWorker != null) {
            logger.debug("stopping initial sync worker");
            this.initialSyncWorker.shutdown();
        }
        if (this.oplogTailExecutor != null) {
            for (AbstractOplogTailWorker oplogTailWorker : this.oplogTailWorkers) {
                oplogTailWorker.stop();
            }
            logger.debug("oplog tail workers stopped");
        }
    }

    private void splitChunks() {
        logger.debug("Starting splitChunks");
        for (Document sourceColl : this.sourceShardClient.getCollectionsMap().values()) {
            String nsStr = (String)sourceColl.get((Object)"_id");
            Namespace ns = new Namespace(nsStr);
            if (ShardClient.excludedSystemDbs.contains(ns.getDatabaseName())) continue;
            List<Document> splitPoints = this.sourceShardClient.splitVector(ns, sourceColl);
            logger.debug("ns: {}, splitCount: {}", (Object)ns, (Object)splitPoints.size());
            for (Document split : splitPoints) {
                Document chunk = new Document();
                chunk.put("ns", (Object)nsStr);
                chunk.put("max", (Object)split);
                this.destShardClient.createChunk(chunk.toBsonDocument(BsonDocument.class, this.registry), false, false);
            }
        }
    }

    private static void printHelpAndExit(Options options) {
        HelpFormatter formatter = new HelpFormatter();
        formatter.printHelp("replayUtil", options);
        System.exit(-1);
    }

    protected static CommandLine initializeAndParseCommandLineOptions(String[] args) {
        options = new Options();
        options.addOption(new Option("help", "print this message"));
        OptionBuilder.withArgName((String)"Configuration properties file");
        OptionBuilder.hasArg();
        OptionBuilder.withLongOpt((String)"config");
        options.addOption(OptionBuilder.create((String)"c"));
        OptionBuilder.withArgName((String)"source cluster mongo uri");
        OptionBuilder.hasArg();
        OptionBuilder.withLongOpt((String)SOURCE_URI);
        options.addOption(OptionBuilder.create((String)"s"));
        OptionBuilder.withArgName((String)"destination cluster mongo uri");
        OptionBuilder.hasArg();
        OptionBuilder.withLongOpt((String)DEST_URI);
        options.addOption(OptionBuilder.create((String)"d"));
        OptionBuilder.withArgName((String)"split destination chunks");
        OptionBuilder.withLongOpt((String)SPLIT_CHUNKS);
        options.addOption(OptionBuilder.create((String)SPLIT_CHUNKS));
        OptionBuilder.withArgName((String)"initial sync only (no oplog tail)");
        OptionBuilder.withLongOpt((String)INITIAL_SYNC);
        options.addOption(OptionBuilder.create((String)INITIAL_SYNC));
        OptionBuilder.withArgName((String)"skip initial sync of chunks with matching counts");
        OptionBuilder.withLongOpt((String)CHUNK_SKIP);
        options.addOption(OptionBuilder.create((String)CHUNK_SKIP));
        OptionBuilder.withArgName((String)"# threads");
        OptionBuilder.hasArg();
        OptionBuilder.withLongOpt((String)"threads");
        options.addOption(OptionBuilder.create((String)"t"));
        OptionBuilder.withArgName((String)"batch size");
        OptionBuilder.hasArg();
        OptionBuilder.withLongOpt((String)"batchSize");
        options.addOption(OptionBuilder.create((String)"b"));
        OptionBuilder.withArgName((String)"Namespace filter");
        OptionBuilder.hasArgs();
        OptionBuilder.withLongOpt((String)"filter");
        options.addOption(OptionBuilder.create((String)"f"));
        OptionBuilder.withArgName((String)"Exclude namespace");
        OptionBuilder.hasArgs();
        OptionBuilder.withLongOpt((String)"excludeNamespace");
        options.addOption(OptionBuilder.create((String)"x"));
        OptionBuilder.withArgName((String)"Oplog transformer");
        OptionBuilder.hasArg();
        OptionBuilder.withLongOpt((String)OPLOG_TRANSFORMERS);
        options.addOption(OptionBuilder.create((String)"o"));
        OptionBuilder.withArgName((String)"Drop destination databases, but preserve config metadata");
        OptionBuilder.withLongOpt((String)DROP_DEST_DBS);
        options.addOption(OptionBuilder.create((String)DROP_DEST_DBS));
        OptionBuilder.withArgName((String)"Cleanup and previous/old timestamp files");
        OptionBuilder.withLongOpt((String)CLEAN_TIMESTAMPS);
        options.addOption(OptionBuilder.create((String)CLEAN_TIMESTAMPS));
        OptionBuilder.withArgName((String)"Use multi-threaded oplog tail workers");
        OptionBuilder.withLongOpt((String)MULTI_OPLOG_WORKER);
        options.addOption(OptionBuilder.create((String)MULTI_OPLOG_WORKER));
        OptionBuilder.withArgName((String)"# oplog tailing threads (per shard)");
        OptionBuilder.hasArg();
        OptionBuilder.withLongOpt((String)OPLOG_THREADS);
        options.addOption(OptionBuilder.create((String)OPLOG_THREADS));
        OptionBuilder.withArgName((String)"oplog queue size (per shard)");
        OptionBuilder.hasArg();
        OptionBuilder.withLongOpt((String)OPLOG_QUEUE_SIZE);
        options.addOption(OptionBuilder.create((String)OPLOG_QUEUE_SIZE));
        OptionBuilder.withArgName((String)"oplog batch size");
        OptionBuilder.hasArg();
        OptionBuilder.withLongOpt((String)OPLOG_BATCH_SIZE);
        options.addOption(OptionBuilder.create((String)OPLOG_BATCH_SIZE));
        OptionBuilder.withArgName((String)"name for this sync process");
        OptionBuilder.withLongOpt((String)NAME);
        options.addOption(OptionBuilder.create((String)NAME));
        GnuParser parser = new GnuParser();
        try {
            line = parser.parse(options, args);
            if (line.hasOption("help")) {
                MongoSync.printHelpAndExit(options);
            }
        }
        catch (ParseException e) {
            System.out.println(e.getMessage());
            MongoSync.printHelpAndExit(options);
        }
        catch (Exception e) {
            e.printStackTrace();
            MongoSync.printHelpAndExit(options);
        }
        return line;
    }

    private static Configuration readProperties() {
        Configurations configs = new Configurations();
        PropertiesConfiguration defaultConfig = new PropertiesConfiguration();
        File propsFile = null;
        if (line.hasOption("c")) {
            propsFile = new File(line.getOptionValue("c"));
        } else {
            propsFile = new File("shard-sync.properties");
            if (!propsFile.exists()) {
                logger.warn("Default config file shard-sync.properties not found, using command line options only");
                return defaultConfig;
            }
        }
        try {
            PropertiesConfiguration config = configs.properties(propsFile);
            return config;
        }
        catch (ConfigurationException e) {
            logger.error("Error loading properties file: " + propsFile, (Throwable)e);
            return defaultConfig;
        }
    }

    protected void parseArgs() throws ClassNotFoundException {
        String oplogBatchSizeStr;
        String oplogQueueSizeStr;
        String batchSizeStr;
        Configuration config = MongoSync.readProperties();
        this.mongoSyncOptions = new MongoSyncOptions();
        this.mongoSyncOptions.setSourceMongoUri(line.getOptionValue("s", config.getString(SOURCE_URI)));
        this.mongoSyncOptions.setDestMongoUri(line.getOptionValue("d", config.getString(DEST_URI)));
        this.mongoSyncOptions.setOplogTransformers(line.getOptionValue("o", config.getString(OPLOG_TRANSFORMERS)));
        this.mongoSyncOptions.setShardList(line.getOptionValue(SHARD_LIST, config.getString(SHARD_LIST)));
        String threadsStr = line.getOptionValue("t");
        if (threadsStr != null) {
            int threads = Integer.parseInt(threadsStr);
            this.mongoSyncOptions.setThreads(threads);
        }
        if ((batchSizeStr = line.getOptionValue("b")) != null) {
            int batchSize = Integer.parseInt(batchSizeStr);
            this.mongoSyncOptions.setBatchSize(batchSize);
        }
        String[] includes = line.getOptionValues("f");
        String[] excludes = line.getOptionValues("x");
        this.mongoSyncOptions.setIncludesExcludes(includes, excludes);
        this.mongoSyncOptions.setDropDestDbs(line.hasOption(DROP_DEST_DBS));
        this.mongoSyncOptions.setSkipChunkSyncIfMatchingCounts(line.hasOption(CHUNK_SKIP));
        this.mongoSyncOptions.setCleanTimestampFiles(line.hasOption(CLEAN_TIMESTAMPS));
        this.mongoSyncOptions.setUseMultiThreadedOplogTailWorkers(line.hasOption(MULTI_OPLOG_WORKER));
        String oplogThreadsStr = line.getOptionValue(OPLOG_THREADS);
        if (oplogThreadsStr != null) {
            int oplogThreads = Integer.parseInt(oplogThreadsStr);
            this.mongoSyncOptions.setOplogThreads(oplogThreads);
        }
        if ((oplogQueueSizeStr = line.getOptionValue(OPLOG_QUEUE_SIZE)) != null) {
            int oplogQueueSize = Integer.parseInt(oplogQueueSizeStr);
            this.mongoSyncOptions.setOplogQueueSize(oplogQueueSize);
        }
        if ((oplogBatchSizeStr = line.getOptionValue(OPLOG_BATCH_SIZE)) != null) {
            int oplogBatchSize = Integer.parseInt(oplogBatchSizeStr);
            this.mongoSyncOptions.setOplogBatchSize(oplogBatchSize);
        }
    }

    private static void addShutdownHook(final MongoSync sync) {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                logger.debug("**** SHUTDOWN *****");
                sync.shutdown();
            }
        }));
    }

    public static void main(String[] args) throws Exception {
        MongoSync.initializeAndParseCommandLineOptions(args);
        MongoSync sync = new MongoSync();
        sync.parseArgs();
        sync.initialize();
        if (line.hasOption(SPLIT_CHUNKS)) {
            sync.splitChunks();
        } else {
            MongoSync.addShutdownHook(sync);
            sync.execute();
        }
    }
}

