/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test.functional;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.AdminUtil;
import org.apache.accumulo.core.fate.ReadOnlyTStore;
import org.apache.accumulo.core.fate.ZooStore;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.test.util.SlowOps;
import org.apache.zookeeper.KeeperException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FateConcurrencyIT
extends AccumuloClusterHarness {
    private static final Logger log = LoggerFactory.getLogger(FateConcurrencyIT.class);
    private static final int NUM_ROWS = 1000;
    private static final long SLOW_SCAN_SLEEP_MS = 250L;
    private AccumuloClient client;
    private ClientContext context;
    private static final ExecutorService pool = Executors.newCachedThreadPool();
    private String secret;
    private long maxWaitMillis;
    private SlowOps slowOps;

    @Override
    protected Duration defaultTimeout() {
        return Duration.ofMinutes(4L);
    }

    @BeforeEach
    public void setup() {
        this.client = (AccumuloClient)Accumulo.newClient().from(FateConcurrencyIT.getClientProps()).build();
        this.context = (ClientContext)this.client;
        this.secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
        this.maxWaitMillis = Math.max(TimeUnit.MINUTES.toMillis(1L), this.defaultTimeout().toMillis() / 2L);
    }

    @AfterEach
    public void closeClient() {
        this.client.close();
    }

    @AfterAll
    public static void cleanup() {
        pool.shutdownNow();
    }

    @Test
    public void changeTableStateTest() throws Exception {
        String tableName = this.getUniqueNames(1)[0];
        SlowOps.setExpectedCompactions(this.client, 1);
        this.slowOps = new SlowOps(this.client, tableName, this.maxWaitMillis);
        Assertions.assertEquals((Object)TableState.ONLINE, (Object)this.getTableState(tableName), (String)"verify table online after created");
        OnLineCallable onlineOp = new OnLineCallable(tableName);
        Future<OnlineOpTiming> task = pool.submit(onlineOp);
        OnlineOpTiming timing1 = task.get();
        log.trace("Online 1 in {} ms", (Object)TimeUnit.NANOSECONDS.toMillis(timing1.runningTime()));
        Assertions.assertEquals((Object)TableState.ONLINE, (Object)this.getTableState(tableName), (String)"verify table is still online");
        this.client.tableOperations().offline(tableName, true);
        Assertions.assertEquals((Object)TableState.OFFLINE, (Object)this.getTableState(tableName), (String)"verify table is offline");
        onlineOp = new OnLineCallable(tableName);
        task = pool.submit(onlineOp);
        OnlineOpTiming timing2 = task.get();
        log.trace("Online 2 in {} ms", (Object)TimeUnit.NANOSECONDS.toMillis(timing2.runningTime()));
        Assertions.assertEquals((Object)TableState.ONLINE, (Object)this.getTableState(tableName), (String)"verify table is back online");
        this.slowOps.startCompactTask();
        onlineOp = new OnLineCallable(tableName);
        task = pool.submit(onlineOp);
        OnlineOpTiming timing3 = task.get();
        Assertions.assertTrue((timing3.runningTime() < TimeUnit.MILLISECONDS.toNanos(250000L) ? 1 : 0) != 0, (String)"online should take less time than expected compaction time");
        Assertions.assertEquals((Object)TableState.ONLINE, (Object)this.getTableState(tableName), (String)"verify table is still online");
        Assertions.assertTrue((boolean)this.findFate(tableName), (String)"Find FATE operation for table");
        this.client.tableOperations().cancelCompaction(tableName);
        log.debug("Success: Timing results for online commands.");
        log.debug("Time for unblocked online {} ms", (Object)TimeUnit.NANOSECONDS.toMillis(timing1.runningTime()));
        log.debug("Time for online when offline {} ms", (Object)TimeUnit.NANOSECONDS.toMillis(timing2.runningTime()));
        log.debug("Time for blocked online {} ms", (Object)TimeUnit.NANOSECONDS.toMillis(timing3.runningTime()));
        this.slowOps.blockWhileCompactionRunning();
    }

    private boolean findFate(String aTableName) {
        log.debug("Look for fate {}", (Object)aTableName);
        for (int retry = 0; retry < 5; ++retry) {
            try {
                boolean found = this.lookupFateInZookeeper(aTableName);
                log.trace("Try {}: Fate in zk for table {} : {}", new Object[]{retry, aTableName, found});
                if (found) {
                    log.debug("Found fate {}", (Object)aTableName);
                    return true;
                }
                Thread.sleep(150L);
                continue;
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                return false;
            }
            catch (Exception ex) {
                log.debug("Find fate failed for table name {} with exception, will retry", (Object)aTableName, (Object)ex);
            }
        }
        return false;
    }

    @Test
    public void getFateStatus() {
        TableId tableId;
        SlowOps.setExpectedCompactions(this.client, 1);
        String tableName = this.getUniqueNames(1)[0];
        this.slowOps = new SlowOps(this.client, tableName, this.maxWaitMillis);
        try {
            Assertions.assertEquals((Object)TableState.ONLINE, (Object)this.getTableState(tableName), (String)"verify table online after created");
            tableId = this.context.getTableId(tableName);
            log.trace("tid: {}", (Object)tableId);
        }
        catch (TableNotFoundException ex) {
            throw new IllegalStateException(String.format("Table %s does not exist, failing test", tableName));
        }
        this.slowOps.startCompactTask();
        AdminUtil.FateStatus withLocks = null;
        List noLocks = null;
        AdminUtil admin = new AdminUtil(false);
        for (int maxRetries = 3; maxRetries > 0; --maxRetries) {
            try {
                InstanceId instanceId = this.context.getInstanceID();
                ZooReaderWriter zk = this.context.getZooReader().asWriter(this.secret);
                ZooStore zs = new ZooStore(ZooUtil.getRoot((InstanceId)instanceId) + "/fate", zk);
                ServiceLock.ServiceLockPath lockPath = ServiceLock.path((String)(ZooUtil.getRoot((InstanceId)instanceId) + "/table_locks/" + tableId));
                withLocks = admin.getStatus((ReadOnlyTStore)zs, (ZooReader)zk, lockPath, null, null);
                noLocks = admin.getTransactionStatus((ReadOnlyTStore)zs, null, null);
                break;
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                Assertions.fail((String)"Interrupt received - test failed");
                return;
            }
            catch (KeeperException ex) {
                try {
                    Thread.sleep(1000L);
                    continue;
                }
                catch (InterruptedException intr_ex) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
        Assertions.assertNotNull(withLocks);
        Assertions.assertNotNull(noLocks);
        Assertions.assertEquals((int)withLocks.getTransactions().size(), (int)noLocks.size());
        int matchCount = 0;
        for (AdminUtil.TransactionStatus tx : withLocks.getTransactions()) {
            if (!this.isCompaction(tx)) continue;
            log.trace("Fate id: {}, status: {}", (Object)tx.getTxid(), (Object)tx.getStatus());
            for (AdminUtil.TransactionStatus tx2 : noLocks) {
                if (!tx2.getTxid().equals(tx.getTxid())) continue;
                ++matchCount;
            }
        }
        Assertions.assertTrue((matchCount > 0 ? 1 : 0) != 0, (String)"Number of fates matches should be > 0");
        try {
            this.client.tableOperations().cancelCompaction(tableName);
            boolean cancelled = this.slowOps.blockWhileCompactionRunning();
            log.debug("Cancel completed successfully: {}", (Object)cancelled);
        }
        catch (AccumuloException | AccumuloSecurityException | TableNotFoundException ex) {
            log.debug("Could not cancel compaction due to exception", ex);
        }
    }

    private boolean lookupFateInZookeeper(String tableName) throws KeeperException {
        AdminUtil admin = new AdminUtil(false);
        try {
            TableId tableId = this.context.getTableId(tableName);
            log.trace("tid: {}", (Object)tableId);
            InstanceId instanceId = this.context.getInstanceID();
            ZooReaderWriter zk = this.context.getZooReader().asWriter(this.secret);
            ZooStore zs = new ZooStore(ZooUtil.getRoot((InstanceId)instanceId) + "/fate", zk);
            ServiceLock.ServiceLockPath lockPath = ServiceLock.path((String)(ZooUtil.getRoot((InstanceId)instanceId) + "/table_locks/" + tableId));
            AdminUtil.FateStatus fateStatus = admin.getStatus((ReadOnlyTStore)zs, (ZooReader)zk, lockPath, null, null);
            log.trace("current fates: {}", (Object)fateStatus.getTransactions().size());
            for (AdminUtil.TransactionStatus tx : fateStatus.getTransactions()) {
                if (!this.isCompaction(tx)) continue;
                return true;
            }
        }
        catch (InterruptedException | TableNotFoundException ex) {
            throw new IllegalStateException(ex);
        }
        return Boolean.FALSE;
    }

    private boolean isCompaction(AdminUtil.TransactionStatus tx) {
        if (tx == null) {
            log.trace("Fate tx is null");
            return false;
        }
        log.trace("Fate id: {}, status: {}", (Object)tx.getTxid(), (Object)tx.getStatus());
        String top = tx.getTop();
        String txName = tx.getTxName();
        return top != null && txName != null && top.contains("CompactionDriver") && tx.getTxName().equals("TABLE_COMPACT");
    }

    private TableState getTableState(String tableName) throws TableNotFoundException {
        TableId tableId = this.context.getTableId(tableName);
        TableState tstate = this.context.getTableState(tableId);
        log.trace("tableName: '{}': tableId {}, current state: {}", new Object[]{tableName, tableId, tstate});
        return tstate;
    }

    @Test
    public void multipleCompactions() {
        int tableCount = 4;
        SlowOps.setExpectedCompactions(this.client, tableCount);
        List<SlowOps> tables = Arrays.stream(this.getUniqueNames(tableCount)).map(tableName -> new SlowOps(this.client, (String)tableName, this.maxWaitMillis)).collect(Collectors.toList());
        tables.forEach(SlowOps::startCompactTask);
        Assertions.assertEquals((long)tableCount, (long)tables.stream().map(SlowOps::getTableName).filter(this::findFate).count());
        tables.forEach(t -> {
            try {
                this.client.tableOperations().cancelCompaction(t.getTableName());
            }
            catch (AccumuloException | AccumuloSecurityException | TableNotFoundException ex) {
                log.debug("Exception throw during multiple table test clean-up", ex);
            }
            boolean cancelled = t.blockWhileCompactionRunning();
            if (!cancelled) {
                log.info("Failed to cancel compaction during multiple compaction test clean-up for {}", (Object)t.getTableName());
            }
        });
    }

    private class OnLineCallable
    implements Callable<OnlineOpTiming> {
        final String tableName;

        OnLineCallable(String tableName) {
            this.tableName = tableName;
        }

        @Override
        public OnlineOpTiming call() throws Exception {
            OnlineOpTiming status = new OnlineOpTiming();
            log.trace("Setting {} online", (Object)this.tableName);
            FateConcurrencyIT.this.client.tableOperations().online(this.tableName, true);
            status.setComplete();
            log.trace("Online completed in {} ms", (Object)TimeUnit.NANOSECONDS.toMillis(status.runningTime()));
            return status;
        }
    }

    private static class OnlineOpTiming {
        private final long started = System.nanoTime();
        private long completed = 0L;

        OnlineOpTiming() {
        }

        void setComplete() {
            this.completed = System.nanoTime();
        }

        long runningTime() {
            return this.completed - this.started;
        }
    }
}

