/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test.replication;

import com.google.protobuf.GeneratedMessageV3;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Disabled(value="Replication ITs are not stable and not currently maintained")
@Deprecated
public class GarbageCollectorCommunicatesWithTServersIT
extends ConfigurableMacBase {
    private static final Logger log = LoggerFactory.getLogger(GarbageCollectorCommunicatesWithTServersIT.class);
    private final int GC_PERIOD_SECONDS = 1;

    @Override
    protected Duration defaultTimeout() {
        return Duration.ofMinutes(2L);
    }

    @Override
    public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
        cfg.setNumTservers(1);
        cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "15s");
        cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
        cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
        cfg.setProperty(Property.GC_CYCLE_START, "10s");
        cfg.setProperty(Property.REPLICATION_NAME, "manager");
        cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "240s");
        cfg.setProperty(Property.MANAGER_REPLICATION_SCAN_INTERVAL, "240s");
        cfg.setProperty(Property.REPLICATION_DRIVER_DELAY, "240s");
        cfg.setProperty(Property.TSERV_WAL_MAX_SIZE, "1M");
        coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
    }

    private Set<String> getWalsForTable(String tableName) throws Exception {
        ServerContext context = this.getServerContext();
        String tableId = (String)context.tableOperations().tableIdMap().get(tableName);
        Assertions.assertNotNull((Object)tableId, (String)("Could not determine table ID for " + tableName));
        WalStateManager wals = new WalStateManager(context);
        HashSet<String> result = new HashSet<String>();
        for (Map.Entry entry : wals.getAllState().entrySet()) {
            log.debug("Reading WALs: {}={}", entry.getKey(), entry.getValue());
            result.add(((Path)entry.getKey()).toString());
        }
        return result;
    }

    private Set<String> getFilesForTable(String tableName) throws Exception {
        AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();
        TableId tableId = TableId.of((String)((String)client.tableOperations().tableIdMap().get(tableName)));
        Assertions.assertNotNull((Object)tableId, (String)("Could not determine table ID for " + tableName));
        HashSet<String> rfiles = new HashSet<String>();
        try (Scanner s = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY);){
            Range r = MetadataSchema.TabletsSection.getRange((TableId)tableId);
            s.setRange(r);
            s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
            for (Map.Entry entry : s) {
                log.debug("Reading RFiles: {}={}", (Object)((Key)entry.getKey()).toStringNoTruncate(), entry.getValue());
                String cq = ((Key)entry.getKey()).getColumnQualifier().toString();
                String path = new Path(cq).toString();
                log.debug("Normalize path to rfile: {}", (Object)path);
                rfiles.add(path);
            }
        }
        return rfiles;
    }

    private Map<String, Replication.Status> getMetadataStatusForTable(String tableName) throws Exception {
        AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();
        String tableId = (String)client.tableOperations().tableIdMap().get(tableName);
        Assertions.assertNotNull((Object)tableId, (String)("Could not determine table ID for " + tableName));
        HashMap<String, Replication.Status> fileToStatus = new HashMap<String, Replication.Status>();
        try (Scanner s = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY);){
            Range r = MetadataSchema.ReplicationSection.getRange();
            s.setRange(r);
            s.fetchColumn(MetadataSchema.ReplicationSection.COLF, new Text(tableId));
            for (Map.Entry entry : s) {
                Text file = new Text();
                MetadataSchema.ReplicationSection.getFile((Key)((Key)entry.getKey()), (Text)file);
                Replication.Status status = Replication.Status.parseFrom((byte[])((Value)entry.getValue()).get());
                log.info("Got status for {}: {}", (Object)file, (Object)ProtobufUtil.toString((GeneratedMessageV3)status));
                fileToStatus.put(file.toString(), status);
            }
        }
        return fileToStatus;
    }

    @Test
    public void testActiveWalPrecludesClosing() throws Exception {
        String table = this.getUniqueNames(1)[0];
        AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();
        ReplicationTable.setOnline((AccumuloClient)client);
        log.info("Creating {}", (Object)table);
        client.tableOperations().create(table);
        client.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
        log.info("Writing a few mutations to the table");
        try (BatchWriter bw = client.createBatchWriter(table);){
            byte[] empty = new byte[]{};
            for (int i = 0; i < 5; ++i) {
                Mutation m = new Mutation((CharSequence)Integer.toString(i));
                m.put(empty, empty, empty);
                bw.addMutation(m);
            }
        }
        log.info("Checking that metadata only has two WALs recorded for this table (inUse, and opened)");
        Set<String> wals = this.getWalsForTable(table);
        Assertions.assertEquals((int)2, (int)wals.size(), (String)"Expected to only find two WALs for the table");
        client.tableOperations().flush(table, null, null, true);
        client.tableOperations().flush(MetadataTable.NAME, null, null, true);
        log.info("Waiting for replication table to come online");
        log.info("Fetching replication statuses from metadata table");
        Map<String, Replication.Status> fileToStatus = this.getMetadataStatusForTable(table);
        Assertions.assertEquals((int)1, (int)fileToStatus.size(), (String)"Expected to only find one replication status message");
        String walName = fileToStatus.keySet().iterator().next();
        wals.retainAll(fileToStatus.keySet());
        Assertions.assertEquals((int)1, (int)wals.size());
        Replication.Status status = fileToStatus.get(walName);
        Assertions.assertFalse((boolean)status.getClosed(), (String)"Expected Status for file to not be closed");
        Set<String> filesForTable = this.getFilesForTable(table);
        Assertions.assertEquals((int)1, (int)filesForTable.size(), (String)"Expected to only find one rfile for table");
        log.info("Files for table before MajC: {}", filesForTable);
        client.tableOperations().compact(table, null, null, false, true);
        Set<String> filesForTableAfterCompaction = this.getFilesForTable(table);
        log.info("Files for table after MajC: {}", filesForTableAfterCompaction);
        Assertions.assertEquals((int)1, (int)filesForTableAfterCompaction.size(), (String)"Expected to only find one rfile for table");
        Assertions.assertNotEquals(filesForTableAfterCompaction, filesForTable, (String)"Expected the files before and after compaction to differ");
        Path fileToBeDeleted = new Path(filesForTable.iterator().next());
        FileSystem fs = this.getCluster().getFileSystem();
        boolean fileExists = fs.exists(fileToBeDeleted);
        while (fileExists) {
            log.info("File which should get deleted still exists: {}", (Object)fileToBeDeleted);
            Thread.sleep(2000L);
            fileExists = fs.exists(fileToBeDeleted);
        }
        Map<String, Replication.Status> fileToStatusAfterMinc = this.getMetadataStatusForTable(table);
        Assertions.assertEquals((int)1, (int)fileToStatusAfterMinc.size(), (String)("Expected to still find only one replication status message: " + fileToStatusAfterMinc));
        Assertions.assertEquals(fileToStatus, fileToStatusAfterMinc, (String)"Status before and after MinC should be identical");
    }

    @Test
    public void testUnreferencedWalInTserverIsClosed() throws Exception {
        String[] names = this.getUniqueNames(2);
        String table = names[0];
        String otherTable = names[1];
        AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();
        ReplicationTable.setOnline((AccumuloClient)client);
        log.info("Creating {}", (Object)table);
        client.tableOperations().create(table);
        client.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
        log.info("Writing a few mutations to the table");
        byte[] empty = new byte[]{};
        try (BatchWriter bw = client.createBatchWriter(table);){
            for (int i = 0; i < 5; ++i) {
                Mutation m = new Mutation((CharSequence)Integer.toString(i));
                m.put(empty, empty, empty);
                bw.addMutation(m);
            }
            log.info("Flushing mutations to the server");
        }
        log.info("Checking that metadata only has one WAL recorded for this table");
        Set<String> wals = this.getWalsForTable(table);
        Assertions.assertEquals((int)2, (int)wals.size(), (String)"Expected to only find two WAL for the table");
        log.info("Compacting the table which will remove all WALs from the tablets");
        client.tableOperations().flush(table, null, null, true);
        client.tableOperations().flush(MetadataTable.NAME, null, null, true);
        log.info("Fetching replication statuses from metadata table");
        Map<String, Replication.Status> fileToStatus = this.getMetadataStatusForTable(table);
        Assertions.assertEquals((int)1, (int)fileToStatus.size(), (String)"Expected to only find one replication status message");
        String walName = fileToStatus.keySet().iterator().next();
        Assertions.assertTrue((boolean)wals.contains(walName), (String)"Expected log file name from tablet to equal replication entry");
        Replication.Status status = fileToStatus.get(walName);
        Assertions.assertFalse((boolean)status.getClosed(), (String)"Expected Status for file to not be closed");
        Set<String> filesForTable = this.getFilesForTable(table);
        Assertions.assertEquals((int)1, (int)filesForTable.size(), (String)"Expected to only find one rfile for table");
        log.info("Files for table before MajC: {}", filesForTable);
        client.tableOperations().compact(table, null, null, false, true);
        Set<String> filesForTableAfterCompaction = this.getFilesForTable(table);
        log.info("Files for table after MajC: {}", filesForTableAfterCompaction);
        Assertions.assertEquals((int)1, (int)filesForTableAfterCompaction.size(), (String)"Expected to only find one rfile for table");
        Assertions.assertNotEquals(filesForTableAfterCompaction, filesForTable, (String)"Expected the files before and after compaction to differ");
        Path fileToBeDeleted = new Path(filesForTable.iterator().next());
        FileSystem fs = this.getCluster().getFileSystem();
        boolean fileExists = fs.exists(fileToBeDeleted);
        while (fileExists) {
            log.info("File which should get deleted still exists: {}", (Object)fileToBeDeleted);
            Thread.sleep(2000L);
            fileExists = fs.exists(fileToBeDeleted);
        }
        Map<String, Replication.Status> fileToStatusAfterMinc = this.getMetadataStatusForTable(table);
        Assertions.assertEquals((int)1, (int)fileToStatusAfterMinc.size(), (String)("Expected to still find only one replication status message: " + fileToStatusAfterMinc));
        client.tableOperations().create(otherTable);
        try (BatchWriter bw = client.createBatchWriter(otherTable);){
            byte[] bigValue = new byte[512000];
            Arrays.fill(bigValue, (byte)1);
            for (int i = 0; i < 50; ++i) {
                Mutation m = new Mutation((CharSequence)Integer.toString(i));
                m.put(empty, empty, bigValue);
                bw.addMutation(m);
                if (i % 10 != 0) continue;
                bw.flush();
            }
        }
        client.tableOperations().flush(otherTable, null, null, true);
        ClientContext context = (ClientContext)client;
        List tservers = (List)ThriftClientTypes.MANAGER.execute(context, mgr -> mgr.getActiveTservers(TraceUtil.traceInfo(), context.rpcCreds()));
        Assertions.assertEquals((int)1, (int)tservers.size(), (String)"Expected only one active tservers");
        HostAndPort tserver = HostAndPort.fromString((String)((String)tservers.get(0)));
        log.info("Fetching active WALs from {}", (Object)tserver);
        TabletClientService.Client cli = (TabletClientService.Client)ThriftUtil.getClient((ThriftClientTypes)ThriftClientTypes.TABLET_SERVER, (HostAndPort)tserver, (ClientContext)context);
        List activeWalsForTserver = cli.getActiveLogs(TraceUtil.traceInfo(), context.rpcCreds());
        log.info("Active wals: {}", (Object)activeWalsForTserver);
        Assertions.assertEquals((int)1, (int)activeWalsForTserver.size(), (String)"Expected to find only one active WAL");
        String activeWal = new Path((String)activeWalsForTserver.get(0)).toString();
        Assertions.assertNotEquals((Object)"Current active WAL on tserver should not be the original WAL we saw", (Object)walName, (String)activeWal);
        log.info("Ensuring that replication status does get closed after WAL is no longer in use by Tserver");
        while (true) {
            Map<String, Replication.Status> replicationStatuses = this.getMetadataStatusForTable(table);
            log.info("Got replication status messages {}", replicationStatuses);
            Assertions.assertEquals((int)1, (int)replicationStatuses.size(), (String)"Did not expect to find additional status records");
            status = replicationStatuses.values().iterator().next();
            log.info("Current status: {}", (Object)ProtobufUtil.toString((GeneratedMessageV3)status));
            if (status.getClosed()) {
                return;
            }
            log.info("Status is not yet closed, waiting for garbage collector to close it");
            Thread.sleep(2000L);
        }
    }
}

