/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test.replication;

import com.google.protobuf.GeneratedMessageV3;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.manager.replication.SequentialWorkAssigner;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.miniclusterImpl.ProcessReference;
import org.apache.accumulo.server.replication.ReplicaSystemFactory;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Disabled(value="Replication ITs are not stable and not currently maintained")
@Deprecated
public class MultiInstanceReplicationIT
extends ConfigurableMacBase {
    private static final Logger log = LoggerFactory.getLogger(MultiInstanceReplicationIT.class);
    private ExecutorService executor;

    @BeforeEach
    public void createExecutor() {
        this.executor = Executors.newSingleThreadExecutor();
    }

    @AfterEach
    public void stopExecutor() {
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
    }

    @Override
    public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
        cfg.setNumTservers(1);
        cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "15s");
        cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
        cfg.setProperty(Property.TSERV_WAL_MAX_SIZE, "2M");
        cfg.setProperty(Property.GC_CYCLE_START, "1s");
        cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
        cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
        cfg.setProperty(Property.MANAGER_REPLICATION_SCAN_INTERVAL, "1s");
        cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
        cfg.setProperty(Property.REPLICATION_NAME, "manager");
        cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
        cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
        hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
    }

    private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) {
        String credProvider;
        Map primarySiteConfig = primaryCfg.getSiteConfig();
        if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) {
            String truststorePassword;
            HashMap<String, String> peerSiteConfig = new HashMap<String, String>();
            peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true");
            String keystorePath = (String)primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey());
            Assertions.assertNotNull((Object)keystorePath, (String)"Keystore Path was null");
            peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath);
            String truststorePath = (String)primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey());
            Assertions.assertNotNull((Object)truststorePath, (String)"Truststore Path was null");
            peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath);
            String keystorePassword = (String)primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey());
            if (keystorePassword != null) {
                peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword);
            }
            if ((truststorePassword = (String)primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey())) != null) {
                peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword);
            }
            System.out.println("Setting site configuration for peer " + peerSiteConfig);
            peerCfg.setSiteConfig(peerSiteConfig);
        }
        if ((credProvider = (String)primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey())) != null) {
            Map peerSiteConfig = peerCfg.getSiteConfig();
            peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider);
            peerCfg.setSiteConfig(peerSiteConfig);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void dataWasReplicatedToThePeer() throws Exception {
        MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(MultiInstanceReplicationIT.createTestDir(this.getClass().getName() + "_" + this.testName() + "_peer"), "testRootPassword1");
        peerCfg.setNumTservers(1);
        peerCfg.setInstanceName("peer");
        peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
        this.updatePeerConfigFromPrimary(this.getCluster().getConfig(), peerCfg);
        MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg);
        peerCluster.start();
        try (AccumuloClient clientManager = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();
             AccumuloClient clientPeer = peerCluster.createAccumuloClient("root", (AuthenticationToken)new PasswordToken((CharSequence)"testRootPassword1"));){
            ReplicationTable.setOnline((AccumuloClient)clientManager);
            String peerUserName = "peer";
            String peerPassword = "foo";
            String peerClusterName = "peer";
            clientPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken((CharSequence)peerPassword));
            clientManager.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
            clientManager.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
            clientManager.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + peerClusterName, ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, (String)AccumuloReplicaSystem.buildConfiguration((String)peerCluster.getInstanceName(), (String)peerCluster.getZooKeepers())));
            String managerTable = "manager";
            String peerTable = "peer";
            clientPeer.tableOperations().create("peer", new NewTableConfiguration());
            String peerTableId = (String)clientPeer.tableOperations().tableIdMap().get("peer");
            Assertions.assertNotNull((Object)peerTableId);
            clientPeer.securityOperations().grantTablePermission(peerUserName, "peer", TablePermission.WRITE);
            HashMap<Object, String> props = new HashMap<Object, String>();
            props.put(Property.TABLE_REPLICATION.getKey(), "true");
            props.put(Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId);
            clientManager.tableOperations().create("manager", new NewTableConfiguration().setProperties(props));
            String managerTableId = (String)clientManager.tableOperations().tableIdMap().get("manager");
            Assertions.assertNotNull((Object)managerTableId);
            try (BatchWriter bw = clientManager.createBatchWriter("manager");){
                for (int rows = 0; rows < 5000; ++rows) {
                    Mutation m = new Mutation((CharSequence)Integer.toString(rows));
                    for (int cols = 0; cols < 100; ++cols) {
                        String value = Integer.toString(cols);
                        m.put((CharSequence)value, (CharSequence)"", (CharSequence)value);
                    }
                    bw.addMutation(m);
                }
            }
            log.info("Wrote all data to manager cluster");
            Set filesNeedingReplication = clientManager.replicationOperations().referencedFiles("manager");
            log.info("Files to replicate: " + filesNeedingReplication);
            for (Object proc : (Collection)this.cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
                this.cluster.killProcess(ServerType.TABLET_SERVER, (ProcessReference)proc);
            }
            this.cluster.exec(TabletServer.class, new String[0]);
            log.info("TabletServer restarted");
            try (Scanner scanner = ReplicationTable.getScanner((AccumuloClient)clientManager);){
                scanner.forEach((k, v) -> {});
            }
            log.info("TabletServer is online");
            while (!ReplicationTable.isOnline((AccumuloClient)clientManager)) {
                log.info("Replication table still offline, waiting");
                Thread.sleep(5000L);
            }
            log.info("");
            log.info("Fetching metadata records:");
            scanner = clientManager.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
            try {
                for (Map.Entry kv : scanner) {
                    if (MetadataSchema.ReplicationSection.COLF.equals((Object)((Key)kv.getKey()).getColumnFamily())) {
                        log.info("{} {}", (Object)((Key)kv.getKey()).toStringNoTruncate(), (Object)ProtobufUtil.toString((GeneratedMessageV3)Replication.Status.parseFrom((byte[])((Value)kv.getValue()).get())));
                        continue;
                    }
                    log.info("{} {}", (Object)((Key)kv.getKey()).toStringNoTruncate(), kv.getValue());
                }
            }
            finally {
                if (scanner != null) {
                    scanner.close();
                }
            }
            log.info("");
            log.info("Fetching replication records:");
            scanner = ReplicationTable.getScanner((AccumuloClient)clientManager);
            try {
                for (Map.Entry kv : scanner) {
                    log.info("{} {}", (Object)((Key)kv.getKey()).toStringNoTruncate(), (Object)ProtobufUtil.toString((GeneratedMessageV3)Replication.Status.parseFrom((byte[])((Value)kv.getValue()).get())));
                }
            }
            finally {
                if (scanner != null) {
                    scanner.close();
                }
            }
            Future<Boolean> future = this.executor.submit(() -> {
                long then = System.currentTimeMillis();
                clientManager.replicationOperations().drain("manager", filesNeedingReplication);
                long now = System.currentTimeMillis();
                log.info("Drain completed in " + (now - then) + "ms");
                return true;
            });
            try {
                future.get(60L, TimeUnit.SECONDS);
            }
            catch (TimeoutException e) {
                future.cancel(true);
                Assertions.fail((String)"Drain did not finish within 60 seconds");
            }
            finally {
                this.executor.shutdownNow();
            }
            log.info("drain completed");
            log.info("");
            log.info("Fetching metadata records:");
            try (Scanner scanner = clientManager.createScanner(MetadataTable.NAME, Authorizations.EMPTY);){
                for (Map.Entry kv : scanner) {
                    if (MetadataSchema.ReplicationSection.COLF.equals((Object)((Key)kv.getKey()).getColumnFamily())) {
                        log.info("{} {}", (Object)((Key)kv.getKey()).toStringNoTruncate(), (Object)ProtobufUtil.toString((GeneratedMessageV3)Replication.Status.parseFrom((byte[])((Value)kv.getValue()).get())));
                        continue;
                    }
                    log.info("{} {}", (Object)((Key)kv.getKey()).toStringNoTruncate(), kv.getValue());
                }
            }
            log.info("");
            log.info("Fetching replication records:");
            scanner = ReplicationTable.getScanner((AccumuloClient)clientManager);
            try {
                for (Map.Entry kv : scanner) {
                    log.info("{} {}", (Object)((Key)kv.getKey()).toStringNoTruncate(), (Object)ProtobufUtil.toString((GeneratedMessageV3)Replication.Status.parseFrom((byte[])((Value)kv.getValue()).get())));
                }
            }
            finally {
                if (scanner != null) {
                    scanner.close();
                }
            }
            try (Scanner manager = clientManager.createScanner("manager", Authorizations.EMPTY);
                 Scanner peer = clientPeer.createScanner("peer", Authorizations.EMPTY);){
                Iterator managerIter = manager.iterator();
                Iterator peerIter = peer.iterator();
                Map.Entry managerEntry = null;
                Map.Entry peerEntry = null;
                while (managerIter.hasNext() && peerIter.hasNext()) {
                    managerEntry = (Map.Entry)managerIter.next();
                    peerEntry = (Map.Entry)peerIter.next();
                    Assertions.assertEquals((int)0, (int)((Key)managerEntry.getKey()).compareTo((Key)peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS), (String)(managerEntry.getKey() + " was not equal to " + peerEntry.getKey()));
                    Assertions.assertEquals(managerEntry.getValue(), peerEntry.getValue());
                }
                log.info("Last manager entry: {}", managerEntry);
                log.info("Last peer entry: {}", peerEntry);
                Assertions.assertFalse((boolean)managerIter.hasNext(), (String)"Had more data to read from the manager");
                Assertions.assertFalse((boolean)peerIter.hasNext(), (String)"Had more data to read from the peer");
            }
        }
        finally {
            peerCluster.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void dataReplicatedToCorrectTable() throws Exception {
        MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(MultiInstanceReplicationIT.createTestDir(this.getClass().getName() + "_" + this.testName() + "_peer"), "testRootPassword1");
        peerCfg.setNumTservers(1);
        peerCfg.setInstanceName("peer");
        peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
        this.updatePeerConfigFromPrimary(this.getCluster().getConfig(), peerCfg);
        MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg);
        peer1Cluster.start();
        try (AccumuloClient clientManager = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();
             AccumuloClient clientPeer = peer1Cluster.createAccumuloClient("root", (AuthenticationToken)new PasswordToken((CharSequence)"testRootPassword1"));){
            String peerClusterName = "peer";
            String peerUserName = "peer";
            String peerPassword = "foo";
            clientPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken((CharSequence)peerPassword));
            clientManager.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
            clientManager.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
            clientManager.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + peerClusterName, ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, (String)AccumuloReplicaSystem.buildConfiguration((String)peer1Cluster.getInstanceName(), (String)peer1Cluster.getZooKeepers())));
            String managerTable1 = "manager1";
            String peerTable1 = "peer1";
            String managerTable2 = "manager2";
            String peerTable2 = "peer2";
            clientPeer.tableOperations().create(peerTable1, new NewTableConfiguration());
            String peerTableId1 = (String)clientPeer.tableOperations().tableIdMap().get(peerTable1);
            Assertions.assertNotNull((Object)peerTableId1);
            clientPeer.tableOperations().create(peerTable2, new NewTableConfiguration());
            String peerTableId2 = (String)clientPeer.tableOperations().tableIdMap().get(peerTable2);
            Assertions.assertNotNull((Object)peerTableId2);
            HashMap<Object, String> props1 = new HashMap<Object, String>();
            props1.put(Property.TABLE_REPLICATION.getKey(), "true");
            props1.put(Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1);
            clientManager.tableOperations().create(managerTable1, new NewTableConfiguration().setProperties(props1));
            String managerTableId1 = (String)clientManager.tableOperations().tableIdMap().get(managerTable1);
            Assertions.assertNotNull((Object)managerTableId1);
            HashMap<Object, String> props2 = new HashMap<Object, String>();
            props2.put(Property.TABLE_REPLICATION.getKey(), "true");
            props2.put(Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2);
            clientManager.tableOperations().create(managerTable2, new NewTableConfiguration().setProperties(props2));
            String managerTableId2 = (String)clientManager.tableOperations().tableIdMap().get(managerTable2);
            Assertions.assertNotNull((Object)managerTableId2);
            clientPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
            clientPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
            long managerTable1Records = 0L;
            try (BatchWriter bw = clientManager.createBatchWriter(managerTable1);){
                for (int rows = 0; rows < 2500; ++rows) {
                    Mutation m = new Mutation((CharSequence)(managerTable1 + rows));
                    for (int cols = 0; cols < 100; ++cols) {
                        String value = Integer.toString(cols);
                        m.put((CharSequence)value, (CharSequence)"", (CharSequence)value);
                        ++managerTable1Records;
                    }
                    bw.addMutation(m);
                }
            }
            long managerTable2Records = 0L;
            try (BatchWriter bw = clientManager.createBatchWriter(managerTable2);){
                for (int rows = 0; rows < 2500; ++rows) {
                    Mutation m = new Mutation((CharSequence)(managerTable2 + rows));
                    for (int cols = 0; cols < 100; ++cols) {
                        String value = Integer.toString(cols);
                        m.put((CharSequence)value, (CharSequence)"", (CharSequence)value);
                        ++managerTable2Records;
                    }
                    bw.addMutation(m);
                }
            }
            log.info("Wrote all data to manager cluster");
            Set filesFor1 = clientManager.replicationOperations().referencedFiles(managerTable1);
            Set filesFor2 = clientManager.replicationOperations().referencedFiles(managerTable2);
            log.info("Files to replicate for table1: " + filesFor1);
            log.info("Files to replicate for table2: " + filesFor2);
            for (ProcessReference proc : (Collection)this.cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
                this.cluster.killProcess(ServerType.TABLET_SERVER, proc);
            }
            this.cluster.exec(TabletServer.class, new String[0]);
            log.info("Restarted the tserver");
            try (Scanner scanner = clientManager.createScanner(managerTable1, Authorizations.EMPTY);){
                scanner.forEach((k, v) -> {});
            }
            while (!ReplicationTable.isOnline((AccumuloClient)clientManager)) {
                log.info("Replication table still offline, waiting");
                Thread.sleep(5000L);
            }
            log.info("Waiting for {} for {}", (Object)filesFor1, (Object)managerTable1);
            clientManager.replicationOperations().drain(managerTable1, filesFor1);
            log.info("Waiting for {} for {}", (Object)filesFor2, (Object)managerTable2);
            clientManager.replicationOperations().drain(managerTable2, filesFor2);
            long countTable = 0L;
            try (Scanner scanner = clientPeer.createScanner(peerTable1, Authorizations.EMPTY);){
                for (Map.Entry entry : scanner) {
                    ++countTable;
                    Assertions.assertTrue((boolean)((Key)entry.getKey()).getRow().toString().startsWith(managerTable1), (String)("Found unexpected key-value" + ((Key)entry.getKey()).toStringNoTruncate() + " " + entry.getValue()));
                }
            }
            log.info("Found {} records in {}", (Object)countTable, (Object)peerTable1);
            Assertions.assertEquals((long)managerTable1Records, (long)countTable);
            countTable = 0L;
            scanner = clientPeer.createScanner(peerTable2, Authorizations.EMPTY);
            try {
                for (Map.Entry entry : scanner) {
                    ++countTable;
                    Assertions.assertTrue((boolean)((Key)entry.getKey()).getRow().toString().startsWith(managerTable2), (String)("Found unexpected key-value" + ((Key)entry.getKey()).toStringNoTruncate() + " " + entry.getValue()));
                }
            }
            finally {
                if (scanner != null) {
                    scanner.close();
                }
            }
            log.info("Found {} records in {}", (Object)countTable, (Object)peerTable2);
            Assertions.assertEquals((long)managerTable2Records, (long)countTable);
        }
        finally {
            peer1Cluster.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void dataWasReplicatedToThePeerWithoutDrain() throws Exception {
        MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(MultiInstanceReplicationIT.createTestDir(this.getClass().getName() + "_" + this.testName() + "_peer"), "testRootPassword1");
        peerCfg.setNumTservers(1);
        peerCfg.setInstanceName("peer");
        peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
        this.updatePeerConfigFromPrimary(this.getCluster().getConfig(), peerCfg);
        MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg);
        peerCluster.start();
        try (AccumuloClient clientManager = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();
             AccumuloClient clientPeer = peerCluster.createAccumuloClient("root", (AuthenticationToken)new PasswordToken((CharSequence)"testRootPassword1"));){
            String peerUserName = "repl";
            String peerPassword = "passwd";
            clientPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken((CharSequence)peerPassword));
            String peerClusterName = "peer";
            clientManager.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + peerClusterName, ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, (String)AccumuloReplicaSystem.buildConfiguration((String)peerCluster.getInstanceName(), (String)peerCluster.getZooKeepers())));
            clientManager.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
            clientManager.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
            String managerTable = "manager";
            String peerTable = "peer";
            clientPeer.tableOperations().create(peerTable, new NewTableConfiguration());
            String peerTableId = (String)clientPeer.tableOperations().tableIdMap().get(peerTable);
            Assertions.assertNotNull((Object)peerTableId);
            clientPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
            HashMap<Object, String> props = new HashMap<Object, String>();
            props.put(Property.TABLE_REPLICATION.getKey(), "true");
            props.put(Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId);
            clientManager.tableOperations().create(managerTable, new NewTableConfiguration().setProperties(props));
            String managerTableId = (String)clientManager.tableOperations().tableIdMap().get(managerTable);
            Assertions.assertNotNull((Object)managerTableId);
            try (BatchWriter bw = clientManager.createBatchWriter(managerTable);){
                for (int rows = 0; rows < 5000; ++rows) {
                    Mutation m = new Mutation((CharSequence)Integer.toString(rows));
                    for (int cols = 0; cols < 100; ++cols) {
                        String value = Integer.toString(cols);
                        m.put((CharSequence)value, (CharSequence)"", (CharSequence)value);
                    }
                    bw.addMutation(m);
                }
            }
            log.info("Wrote all data to manager cluster");
            Set files = clientManager.replicationOperations().referencedFiles(managerTable);
            log.info("Files to replicate:" + files);
            for (Object proc : (Collection)this.cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
                this.cluster.killProcess(ServerType.TABLET_SERVER, (ProcessReference)proc);
            }
            this.cluster.exec(TabletServer.class, new String[0]);
            while (!ReplicationTable.isOnline((AccumuloClient)clientManager)) {
                log.info("Replication table still offline, waiting");
                Thread.sleep(5000L);
            }
            try (Scanner scanner = clientManager.createScanner(managerTable, Authorizations.EMPTY);){
                scanner.forEach((k, v) -> {});
            }
            scanner = ReplicationTable.getScanner((AccumuloClient)clientManager);
            try {
                for (Map.Entry kv : scanner) {
                    log.debug("{} {}", (Object)((Key)kv.getKey()).toStringNoTruncate(), (Object)ProtobufUtil.toString((GeneratedMessageV3)Replication.Status.parseFrom((byte[])((Value)kv.getValue()).get())));
                }
            }
            finally {
                if (scanner != null) {
                    scanner.close();
                }
            }
            clientManager.replicationOperations().drain(managerTable, files);
            try (Scanner manager = clientManager.createScanner(managerTable, Authorizations.EMPTY);
                 Scanner peer = clientPeer.createScanner(peerTable, Authorizations.EMPTY);){
                Iterator managerIter = manager.iterator();
                Iterator peerIter = peer.iterator();
                while (managerIter.hasNext() && peerIter.hasNext()) {
                    Map.Entry managerEntry = (Map.Entry)managerIter.next();
                    Map.Entry peerEntry = (Map.Entry)peerIter.next();
                    Assertions.assertEquals((int)0, (int)((Key)managerEntry.getKey()).compareTo((Key)peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS), (String)(peerEntry.getKey() + " was not equal to " + peerEntry.getKey()));
                    Assertions.assertEquals(managerEntry.getValue(), peerEntry.getValue());
                }
                Assertions.assertFalse((boolean)managerIter.hasNext(), (String)"Had more data to read from the manager");
                Assertions.assertFalse((boolean)peerIter.hasNext(), (String)"Had more data to read from the peer");
            }
        }
        finally {
            peerCluster.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void dataReplicatedToCorrectTableWithoutDrain() throws Exception {
        MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(MultiInstanceReplicationIT.createTestDir(this.getClass().getName() + "_" + this.testName() + "_peer"), "testRootPassword1");
        peerCfg.setNumTservers(1);
        peerCfg.setInstanceName("peer");
        peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
        this.updatePeerConfigFromPrimary(this.getCluster().getConfig(), peerCfg);
        MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg);
        peer1Cluster.start();
        try (AccumuloClient clientManager = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();
             AccumuloClient clientPeer = peer1Cluster.createAccumuloClient("root", (AuthenticationToken)new PasswordToken((CharSequence)"testRootPassword1"));){
            int i;
            String value;
            int cols;
            Mutation m;
            int rows;
            String peerClusterName = "peer";
            String peerUserName = "repl";
            String peerPassword = "passwd";
            clientPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken((CharSequence)peerPassword));
            clientManager.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
            clientManager.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
            clientManager.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + peerClusterName, ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, (String)AccumuloReplicaSystem.buildConfiguration((String)peer1Cluster.getInstanceName(), (String)peer1Cluster.getZooKeepers())));
            String managerTable1 = "manager1";
            String peerTable1 = "peer1";
            String managerTable2 = "manager2";
            String peerTable2 = "peer2";
            clientPeer.tableOperations().create(peerTable1, new NewTableConfiguration());
            String peerTableId1 = (String)clientPeer.tableOperations().tableIdMap().get(peerTable1);
            Assertions.assertNotNull((Object)peerTableId1);
            clientPeer.tableOperations().create(peerTable2, new NewTableConfiguration());
            String peerTableId2 = (String)clientPeer.tableOperations().tableIdMap().get(peerTable2);
            Assertions.assertNotNull((Object)peerTableId2);
            HashMap<Object, String> props1 = new HashMap<Object, String>();
            props1.put(Property.TABLE_REPLICATION.getKey(), "true");
            props1.put(Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2);
            clientManager.tableOperations().create(managerTable1, new NewTableConfiguration().setProperties(props1));
            String managerTableId1 = (String)clientManager.tableOperations().tableIdMap().get(managerTable1);
            Assertions.assertNotNull((Object)managerTableId1);
            HashMap<Object, String> props2 = new HashMap<Object, String>();
            props2.put(Property.TABLE_REPLICATION.getKey(), "true");
            props2.put(Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2);
            clientManager.tableOperations().create(managerTable2, new NewTableConfiguration().setProperties(props2));
            String managerTableId2 = (String)clientManager.tableOperations().tableIdMap().get(managerTable2);
            Assertions.assertNotNull((Object)managerTableId2);
            clientPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
            clientPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
            clientManager.tableOperations().setProperty(managerTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1);
            clientManager.tableOperations().setProperty(managerTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2);
            try (Object bw = clientManager.createBatchWriter(managerTable1);){
                for (rows = 0; rows < 2500; ++rows) {
                    m = new Mutation((CharSequence)(managerTable1 + rows));
                    for (cols = 0; cols < 100; ++cols) {
                        value = Integer.toString(cols);
                        m.put((CharSequence)value, (CharSequence)"", (CharSequence)value);
                    }
                    bw.addMutation(m);
                }
            }
            bw = clientManager.createBatchWriter(managerTable2);
            try {
                for (rows = 0; rows < 2500; ++rows) {
                    m = new Mutation((CharSequence)(managerTable2 + rows));
                    for (cols = 0; cols < 100; ++cols) {
                        value = Integer.toString(cols);
                        m.put((CharSequence)value, (CharSequence)"", (CharSequence)value);
                    }
                    bw.addMutation(m);
                }
            }
            finally {
                if (bw != null) {
                    bw.close();
                }
            }
            log.info("Wrote all data to manager cluster");
            for (ProcessReference proc : (Collection)this.cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
                this.cluster.killProcess(ServerType.TABLET_SERVER, proc);
            }
            this.cluster.exec(TabletServer.class, new String[0]);
            while (!ReplicationTable.isOnline((AccumuloClient)clientManager)) {
                log.info("Replication table still offline, waiting");
                Thread.sleep(5000L);
            }
            boolean fullyReplicated = false;
            for (int i2 = 0; i2 < 10 && !fullyReplicated; ++i2) {
                UtilWaitThread.sleepUninterruptibly((long)2L, (TimeUnit)TimeUnit.SECONDS);
                try (Scanner s = ReplicationTable.getScanner((AccumuloClient)clientManager);){
                    ReplicationSchema.WorkSection.limit((ScannerBase)s);
                    for (Map.Entry entry : s) {
                        Replication.Status status = Replication.Status.parseFrom((byte[])((Value)entry.getValue()).get());
                        if (!StatusUtil.isFullyReplicated((Replication.Status)status)) continue;
                        fullyReplicated |= true;
                    }
                    continue;
                }
            }
            Assertions.assertNotEquals((Object)0, (Object)fullyReplicated);
            long countTable = 0L;
            for (i = 0; i < 10; ++i) {
                for (Map.Entry entry : clientPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
                    ++countTable;
                    Assertions.assertTrue((boolean)((Key)entry.getKey()).getRow().toString().startsWith(managerTable1), (String)("Found unexpected key-value" + ((Key)entry.getKey()).toStringNoTruncate() + " " + entry.getValue()));
                }
                log.info("Found {} records in {}", (Object)countTable, (Object)peerTable1);
                if (countTable != 0L) break;
                Thread.sleep(5000L);
            }
            Assertions.assertTrue((countTable > 0L ? 1 : 0) != 0, (String)("Found no records in " + peerTable1 + " in the peer cluster"));
            for (i = 0; i < 10; ++i) {
                countTable = 0L;
                for (Map.Entry entry : clientPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
                    ++countTable;
                    Assertions.assertTrue((boolean)((Key)entry.getKey()).getRow().toString().startsWith(managerTable2), (String)("Found unexpected key-value" + ((Key)entry.getKey()).toStringNoTruncate() + " " + entry.getValue()));
                }
                log.info("Found {} records in {}", (Object)countTable, (Object)peerTable2);
                if (countTable != 0L) break;
                Thread.sleep(5000L);
            }
            Assertions.assertTrue((countTable > 0L ? 1 : 0) != 0, (String)("Found no records in " + peerTable2 + " in the peer cluster"));
        }
        finally {
            peer1Cluster.stop();
        }
    }
}

