/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test.replication;

import com.google.protobuf.GeneratedMessageV3;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ReplicationOperationsImpl;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.ManagerClientServiceHandler;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
import org.easymock.EasyMock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Disabled(value="Replication ITs are not stable and not currently maintained")
@Deprecated
public class ReplicationOperationsImplIT
extends ConfigurableMacBase {
    private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImplIT.class);
    private AccumuloClient client;
    private ServerContext context;

    @BeforeEach
    public void configureInstance() throws Exception {
        this.client = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();
        this.context = this.getServerContext();
        ReplicationTable.setOnline((AccumuloClient)this.client);
        this.client.securityOperations().grantTablePermission(this.client.whoami(), MetadataTable.NAME, TablePermission.WRITE);
        this.client.securityOperations().grantTablePermission(this.client.whoami(), ReplicationTable.NAME, TablePermission.READ);
        this.client.securityOperations().grantTablePermission(this.client.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
    }

    private ReplicationOperationsImpl getReplicationOperations() {
        Manager manager = (Manager)EasyMock.createMock(Manager.class);
        EasyMock.expect((Object)manager.getContext()).andReturn((Object)this.context).anyTimes();
        EasyMock.replay((Object[])new Object[]{manager});
        final ManagerClientServiceHandler mcsh = new ManagerClientServiceHandler(manager){

            protected TableId getTableId(ClientContext context, String tableName) {
                try {
                    return TableId.of((String)((String)ReplicationOperationsImplIT.this.client.tableOperations().tableIdMap().get(tableName)));
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        };
        ClientContext context = (ClientContext)this.client;
        return new ReplicationOperationsImpl(context){

            protected boolean getManagerDrain(TInfo tinfo, TCredentials rpcCreds, String tableName, Set<String> wals) {
                try {
                    return mcsh.drainReplicationTable(tinfo, rpcCreds, tableName, wals);
                }
                catch (TException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    @Test
    public void waitsUntilEntriesAreReplicated() throws Exception {
        this.client.tableOperations().create("foo");
        TableId tableId = TableId.of((String)((String)this.client.tableOperations().tableIdMap().get("foo")));
        String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
        String file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
        Replication.Status stat = Replication.Status.newBuilder().setBegin(0L).setEnd(10000L).setInfiniteEnd(false).setClosed(false).build();
        BatchWriter bw = ReplicationTable.getBatchWriter((AccumuloClient)this.client);
        Mutation m = new Mutation((CharSequence)file1);
        ReplicationSchema.StatusSection.add((Mutation)m, (TableId)tableId, (Value)ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.addMutation(m);
        m = new Mutation((CharSequence)file2);
        ReplicationSchema.StatusSection.add((Mutation)m, (TableId)tableId, (Value)ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.addMutation(m);
        bw.close();
        bw = this.client.createBatchWriter(MetadataTable.NAME);
        m = new Mutation((CharSequence)(MetadataSchema.ReplicationSection.getRowPrefix() + file1));
        m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId.canonical()), ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.addMutation(m);
        m = new Mutation((CharSequence)(MetadataSchema.ReplicationSection.getRowPrefix() + file2));
        m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId.canonical()), ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.close();
        AtomicBoolean done = new AtomicBoolean(false);
        AtomicBoolean exception = new AtomicBoolean(false);
        ReplicationOperationsImpl roi = this.getReplicationOperations();
        Thread t = new Thread(() -> {
            try {
                roi.drain("foo");
            }
            catch (Exception e) {
                log.error("Got error", (Throwable)e);
                exception.set(true);
            }
            done.set(true);
        });
        t.start();
        Assertions.assertFalse((boolean)done.get());
        bw = this.client.createBatchWriter(MetadataTable.NAME);
        m = new Mutation((CharSequence)(MetadataSchema.ReplicationSection.getRowPrefix() + file1));
        m.putDelete(MetadataSchema.ReplicationSection.COLF, new Text(tableId.canonical()));
        bw.addMutation(m);
        bw.flush();
        Assertions.assertFalse((boolean)done.get());
        m = new Mutation((CharSequence)(MetadataSchema.ReplicationSection.getRowPrefix() + file2));
        m.putDelete(MetadataSchema.ReplicationSection.COLF, new Text(tableId.canonical()));
        bw.addMutation(m);
        bw.flush();
        bw.close();
        Assertions.assertFalse((boolean)done.get());
        bw = ReplicationTable.getBatchWriter((AccumuloClient)this.client);
        m = new Mutation((CharSequence)file1);
        m.putDelete(ReplicationSchema.StatusSection.NAME, new Text(tableId.canonical()));
        bw.addMutation(m);
        bw.flush();
        Assertions.assertFalse((boolean)done.get());
        m = new Mutation((CharSequence)file2);
        m.putDelete(ReplicationSchema.StatusSection.NAME, new Text(tableId.canonical()));
        bw.addMutation(m);
        bw.flush();
        try {
            t.join(5000L);
        }
        catch (InterruptedException e) {
            Assertions.fail((String)"ReplicationOperations.drain did not complete");
        }
        Assertions.assertTrue((boolean)done.get(), (String)"Drain never finished");
        Assertions.assertFalse((boolean)exception.get(), (String)"Saw unexpected exception");
    }

    @Test
    public void unrelatedReplicationRecordsDontBlockDrain() throws Exception {
        this.client.tableOperations().create("foo");
        this.client.tableOperations().create("bar");
        TableId tableId1 = TableId.of((String)((String)this.client.tableOperations().tableIdMap().get("foo")));
        TableId tableId2 = TableId.of((String)((String)this.client.tableOperations().tableIdMap().get("bar")));
        String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
        String file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
        Replication.Status stat = Replication.Status.newBuilder().setBegin(0L).setEnd(10000L).setInfiniteEnd(false).setClosed(false).build();
        BatchWriter bw = ReplicationTable.getBatchWriter((AccumuloClient)this.client);
        Mutation m = new Mutation((CharSequence)file1);
        ReplicationSchema.StatusSection.add((Mutation)m, (TableId)tableId1, (Value)ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.addMutation(m);
        m = new Mutation((CharSequence)file2);
        ReplicationSchema.StatusSection.add((Mutation)m, (TableId)tableId2, (Value)ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.addMutation(m);
        bw.close();
        bw = this.client.createBatchWriter(MetadataTable.NAME);
        m = new Mutation((CharSequence)(MetadataSchema.ReplicationSection.getRowPrefix() + file1));
        m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId1.canonical()), ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.addMutation(m);
        m = new Mutation((CharSequence)(MetadataSchema.ReplicationSection.getRowPrefix() + file2));
        m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId2.canonical()), ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.close();
        AtomicBoolean done = new AtomicBoolean(false);
        AtomicBoolean exception = new AtomicBoolean(false);
        ReplicationOperationsImpl roi = this.getReplicationOperations();
        Thread t = new Thread(() -> {
            try {
                roi.drain("foo");
            }
            catch (Exception e) {
                log.error("Got error", (Throwable)e);
                exception.set(true);
            }
            done.set(true);
        });
        t.start();
        Assertions.assertFalse((boolean)done.get());
        bw = this.client.createBatchWriter(MetadataTable.NAME);
        m = new Mutation((CharSequence)(MetadataSchema.ReplicationSection.getRowPrefix() + file1));
        m.putDelete(MetadataSchema.ReplicationSection.COLF, new Text(tableId1.canonical()));
        bw.addMutation(m);
        bw.flush();
        Assertions.assertFalse((boolean)done.get());
        bw = ReplicationTable.getBatchWriter((AccumuloClient)this.client);
        m = new Mutation((CharSequence)file1);
        m.putDelete(ReplicationSchema.StatusSection.NAME, new Text(tableId1.canonical()));
        bw.addMutation(m);
        bw.flush();
        bw.close();
        try {
            t.join(5000L);
        }
        catch (InterruptedException e) {
            Assertions.fail((String)"ReplicationOperations.drain did not complete");
        }
        Assertions.assertTrue((boolean)done.get(), (String)"Drain never completed");
        Assertions.assertFalse((boolean)exception.get(), (String)"Saw unexpected exception");
    }

    @Test
    public void inprogressReplicationRecordsBlockExecution() throws Exception {
        this.client.tableOperations().create("foo");
        TableId tableId1 = TableId.of((String)((String)this.client.tableOperations().tableIdMap().get("foo")));
        String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
        Replication.Status stat = Replication.Status.newBuilder().setBegin(0L).setEnd(10000L).setInfiniteEnd(false).setClosed(false).build();
        BatchWriter bw = ReplicationTable.getBatchWriter((AccumuloClient)this.client);
        Mutation m = new Mutation((CharSequence)file1);
        ReplicationSchema.StatusSection.add((Mutation)m, (TableId)tableId1, (Value)ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.addMutation(m);
        bw.close();
        LogEntry logEntry = new LogEntry(new KeyExtent(tableId1, null, null), System.currentTimeMillis(), file1);
        bw = this.client.createBatchWriter(MetadataTable.NAME);
        m = new Mutation((CharSequence)(MetadataSchema.ReplicationSection.getRowPrefix() + file1));
        m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId1.canonical()), ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.addMutation(m);
        m = new Mutation(logEntry.getRow());
        m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
        bw.addMutation(m);
        bw.close();
        AtomicBoolean done = new AtomicBoolean(false);
        AtomicBoolean exception = new AtomicBoolean(false);
        ReplicationOperationsImpl roi = this.getReplicationOperations();
        Thread t = new Thread(() -> {
            try {
                roi.drain("foo");
            }
            catch (Exception e) {
                log.error("Got error", (Throwable)e);
                exception.set(true);
            }
            done.set(true);
        });
        t.start();
        Assertions.assertFalse((boolean)done.get());
        Replication.Status newStatus = Replication.Status.newBuilder().setBegin(1000L).setEnd(2000L).setInfiniteEnd(false).setClosed(true).build();
        bw = this.client.createBatchWriter(MetadataTable.NAME);
        m = new Mutation((CharSequence)(MetadataSchema.ReplicationSection.getRowPrefix() + file1));
        m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId1.canonical()), ProtobufUtil.toValue((GeneratedMessageV3)newStatus));
        bw.addMutation(m);
        bw.flush();
        Assertions.assertFalse((boolean)done.get());
        bw = ReplicationTable.getBatchWriter((AccumuloClient)this.client);
        m = new Mutation((CharSequence)file1);
        m.put(ReplicationSchema.StatusSection.NAME, new Text(tableId1.canonical()), ProtobufUtil.toValue((GeneratedMessageV3)newStatus));
        bw.addMutation(m);
        bw.flush();
        try {
            t.join(5000L);
        }
        catch (InterruptedException e) {
            Assertions.fail((String)"ReplicationOperations.drain did not complete");
        }
        Assertions.assertFalse((boolean)done.get(), (String)"Drain somehow finished");
        Assertions.assertFalse((boolean)exception.get(), (String)"Saw unexpected exception");
    }

    @Test
    public void laterCreatedLogsDontBlockExecution() throws Exception {
        this.client.tableOperations().create("foo");
        TableId tableId1 = TableId.of((String)((String)this.client.tableOperations().tableIdMap().get("foo")));
        String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
        Replication.Status stat = Replication.Status.newBuilder().setBegin(0L).setEnd(10000L).setInfiniteEnd(false).setClosed(false).build();
        BatchWriter bw = ReplicationTable.getBatchWriter((AccumuloClient)this.client);
        Mutation m = new Mutation((CharSequence)file1);
        ReplicationSchema.StatusSection.add((Mutation)m, (TableId)tableId1, (Value)ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.addMutation(m);
        bw.close();
        bw = this.client.createBatchWriter(MetadataTable.NAME);
        m = new Mutation((CharSequence)(MetadataSchema.ReplicationSection.getRowPrefix() + file1));
        m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId1.canonical()), ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.addMutation(m);
        bw.close();
        log.info("Reading metadata first time");
        try (Scanner scanner = this.client.createScanner(MetadataTable.NAME, Authorizations.EMPTY);){
            for (Map.Entry e : scanner) {
                log.info("{}", e.getKey());
            }
        }
        AtomicBoolean done = new AtomicBoolean(false);
        AtomicBoolean exception = new AtomicBoolean(false);
        ReplicationOperationsImpl roi = this.getReplicationOperations();
        Thread t = new Thread(() -> {
            try {
                roi.drain("foo");
            }
            catch (Exception e) {
                log.error("Got error", (Throwable)e);
                exception.set(true);
            }
            done.set(true);
        });
        t.start();
        Thread.sleep(2000L);
        bw = this.client.createBatchWriter(MetadataTable.NAME);
        m = new Mutation((CharSequence)(MetadataSchema.ReplicationSection.getRowPrefix() + "/accumulo/wals/tserver+port/" + UUID.randomUUID()));
        m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId1.canonical()), ProtobufUtil.toValue((GeneratedMessageV3)stat));
        bw.addMutation(m);
        m = new Mutation((CharSequence)(MetadataSchema.ReplicationSection.getRowPrefix() + file1));
        m.putDelete(MetadataSchema.ReplicationSection.COLF, new Text(tableId1.canonical()));
        bw.addMutation(m);
        bw.close();
        log.info("Reading metadata second time");
        try (Scanner scanner = this.client.createScanner(MetadataTable.NAME, Authorizations.EMPTY);){
            for (Map.Entry e : scanner) {
                log.info("{}", e.getKey());
            }
        }
        bw = ReplicationTable.getBatchWriter((AccumuloClient)this.client);
        m = new Mutation((CharSequence)file1);
        m.putDelete(ReplicationSchema.StatusSection.NAME, new Text(tableId1.canonical()));
        bw.addMutation(m);
        bw.close();
        try {
            t.join(5000L);
        }
        catch (InterruptedException e) {
            Assertions.fail((String)"ReplicationOperations.drain did not complete");
        }
        Assertions.assertTrue((boolean)done.get(), (String)"Drain didn't finish");
    }
}

