/*
 * Decompiled with CFR 0.152.
 */
package com.sleepycat.je.rep.impl.node;

import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DiskLimitException;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.ReplicaConsistencyPolicy;
import com.sleepycat.je.StatsConfig;
import com.sleepycat.je.dbi.DbConfigManager;
import com.sleepycat.je.dbi.EnvironmentImpl;
import com.sleepycat.je.rep.CommitPointConsistencyPolicy;
import com.sleepycat.je.rep.GroupShutdownException;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.MasterStateException;
import com.sleepycat.je.rep.NodeType;
import com.sleepycat.je.rep.ReplicaConsistencyException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.RestartRequiredException;
import com.sleepycat.je.rep.TimeConsistencyPolicy;
import com.sleepycat.je.rep.impl.RepGroupImpl;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.impl.RepParams;
import com.sleepycat.je.rep.impl.node.DbCache;
import com.sleepycat.je.rep.impl.node.MasterTransfer;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.impl.node.RepNode;
import com.sleepycat.je.rep.impl.node.Replay;
import com.sleepycat.je.rep.impl.node.ReplicaOutputThread;
import com.sleepycat.je.rep.impl.node.ReplicaStatDefinition;
import com.sleepycat.je.rep.net.DataChannel;
import com.sleepycat.je.rep.stream.BaseProtocol;
import com.sleepycat.je.rep.stream.MasterStatus;
import com.sleepycat.je.rep.stream.Protocol;
import com.sleepycat.je.rep.stream.ReplicaFeederHandshake;
import com.sleepycat.je.rep.stream.ReplicaFeederHandshakeConfig;
import com.sleepycat.je.rep.stream.ReplicaFeederSyncup;
import com.sleepycat.je.rep.txn.MasterTxn;
import com.sleepycat.je.rep.txn.ReplayTxn;
import com.sleepycat.je.rep.utilint.BinaryProtocol;
import com.sleepycat.je.rep.utilint.NamedChannel;
import com.sleepycat.je.rep.utilint.NamedChannelWithTimeout;
import com.sleepycat.je.rep.utilint.RepUtils;
import com.sleepycat.je.rep.utilint.ServiceDispatcher;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.LongStat;
import com.sleepycat.je.utilint.StatGroup;
import com.sleepycat.je.utilint.StoppableThread;
import com.sleepycat.je.utilint.TestHook;
import com.sleepycat.je.utilint.TestHookExecute;
import com.sleepycat.je.utilint.TracerFormatter;
import com.sleepycat.je.utilint.VLSN;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Replica {
    private final RepNode repNode;
    private final RepImpl repImpl;
    private final Replay replay;
    private Exception shutdownException = null;
    private NamedChannelWithTimeout replicaFeederChannel = null;
    private final ConsistencyTracker consistencyTracker;
    private volatile VLSN txnEndVLSN;
    private int testDelayMs = 0;
    private boolean dontProcessStream = false;
    private static final int NETWORK_RETRIES = 2;
    private static final int SERVICE_UNAVAILABLE_RETRIES = 10;
    private static final int CONNECT_RETRY_SLEEP_MS = 100;
    private Protocol protocol = null;
    private final StatGroup aggProtoStats;
    private HardRecoveryElectionException hardRecoveryElectionException;
    private ReplicaFeederSyncup.TestHook<Object> replicaFeederSyncupHook;
    private final TestHook<BinaryProtocol.Message> replayHook;
    private static TestHook<BinaryProtocol.Message> initialReplayHook;
    private final DbCache dbCache;
    private final BlockingQueue<BinaryProtocol.Message> replayQueue;
    private volatile ReplicaOutputThread replicaOutputThread;
    private final Logger logger;
    private final LongStat nMessageQueueOverflows;

    Replica(RepNode repNode, Replay replay) {
        this.repNode = repNode;
        this.repImpl = repNode.getRepImpl();
        DbConfigManager configManager = repNode.getConfigManager();
        this.dbCache = new DbCache(this.repImpl.getDbTree(), configManager.getInt(RepParams.REPLAY_MAX_OPEN_DB_HANDLES), configManager.getDuration(RepParams.REPLAY_DB_HANDLE_TIMEOUT));
        this.consistencyTracker = new ConsistencyTracker();
        this.replay = replay;
        this.logger = LoggerUtils.getLogger(this.getClass());
        this.aggProtoStats = new StatGroup("BinaryProtocol", "Network traffic due to the replication stream.");
        this.nMessageQueueOverflows = replay.getMessageQueueOverflows();
        this.testDelayMs = repNode.getConfigManager().getInt(RepParams.TEST_REPLICA_DELAY);
        this.replayHook = initialReplayHook;
        int replayQueueSize = repNode.getConfigManager().getInt(RepParams.REPLICA_MESSAGE_QUEUE_SIZE);
        this.replayQueue = new ArrayBlockingQueue<BinaryProtocol.Message>(replayQueueSize);
    }

    public void shutdown() {
        if (!this.repNode.isShutdown()) {
            throw EnvironmentFailureException.unexpectedState("Rep node must have initiated the shutdown.");
        }
        this.consistencyTracker.shutdown();
        if (Thread.currentThread() == this.repNode) {
            return;
        }
        RepUtils.shutdownChannel(this.replicaFeederChannel);
        this.repNode.getVLSNFreezeLatch().clearLatch();
    }

    public void setTestDelayMs(int testDelayMs) {
        this.testDelayMs = testDelayMs;
    }

    public int getTestDelayMs() {
        return this.testDelayMs;
    }

    public void setDontProcessStream() {
        this.dontProcessStream = true;
    }

    public VLSN getTxnEndVLSN() {
        return this.txnEndVLSN;
    }

    public Replay replay() {
        return this.replay;
    }

    public DbCache getDbCache() {
        return this.dbCache;
    }

    public ConsistencyTracker getConsistencyTracker() {
        return this.consistencyTracker;
    }

    DataChannel getReplicaFeederChannel() {
        return this.replicaFeederChannel.getChannel();
    }

    Protocol getProtocol() {
        return this.protocol;
    }

    public long getMasterTxnEndVLSN() {
        return this.consistencyTracker.getMasterTxnEndVLSN();
    }

    public ReplicaOutputThread getReplicaOutputThread() {
        return this.replicaOutputThread;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runReplicaLoop() throws InterruptedException, DatabaseException, GroupShutdownException {
        Class<?> retryExceptionClass = null;
        int retryCount = 0;
        try {
            while (true) {
                try {
                    this.runReplicaLoopInternal();
                }
                catch (RetryException e) {
                    if (!this.repNode.getMasterStatus().inSync()) {
                        LoggerUtils.fine(this.logger, this.repImpl, "Retry terminated, out of sync.");
                        break;
                    }
                    if (e.getClass() == retryExceptionClass || e.retries == 0) {
                        if (++retryCount >= e.retries) {
                            LoggerUtils.info(this.logger, this.repImpl, "Failed to recover from exception: " + e.getMessage() + ", despite " + e.retries + " retries.\n" + LoggerUtils.getStackTrace(e));
                            break;
                        }
                    } else {
                        retryCount = 0;
                        retryExceptionClass = e.getClass();
                    }
                    if (retryCount % 10 == 0) {
                        LoggerUtils.info(this.logger, this.repImpl, "Retry #: " + retryCount + "/" + e.retries + " Will retry replica loop after " + e.retrySleepMs + "ms. ");
                    }
                    Thread.sleep(e.retrySleepMs);
                    if (this.repNode.getMasterStatus().inSync()) continue;
                }
                catch (DiskLimitException e) {
                    // empty catch block
                }
                break;
            }
        }
        finally {
            if (this.hardRecoveryElectionException == null) {
                this.repNode.resetReadyLatch(this.shutdownException);
            }
        }
    }

    private void runReplicaLoopInternal() throws RestartRequiredException, InterruptedException, RetryException, InsufficientLogException {
        this.shutdownException = null;
        LoggerUtils.info(this.logger, this.repImpl, "Replica loop started with master: " + this.repNode.getMasterStatus().getNodeMasterNameId());
        if (this.testDelayMs > 0) {
            LoggerUtils.info(this.logger, this.repImpl, "Test delay of: " + this.testDelayMs + "ms. after each message sent");
        }
        try {
            this.initReplicaLoop();
            this.doRunReplicaLoopInternalWork();
        }
        catch (RestartRequiredException rre) {
            this.shutdownException = rre;
            throw rre;
        }
        catch (ClosedByInterruptException closedByInterruptException) {
            if (this.repNode.isShutdown()) {
                LoggerUtils.info(this.logger, this.repImpl, "Replica loop interrupted for shutdown.");
                return;
            }
            LoggerUtils.warning(this.logger, this.repImpl, "Replica loop unexpected interrupt.");
            throw new InterruptedException(closedByInterruptException.getMessage());
        }
        catch (IOException e) {
            LoggerUtils.info(this.logger, this.repImpl, "Replica IO exception: " + e.getClass().getName() + " Message:" + e.getMessage() + (String)(this.logger.isLoggable(Level.FINE) ? "\n" + LoggerUtils.getStackTrace(e) : ""));
        }
        catch (DiskLimitException | RetryException e) {
            throw e;
        }
        catch (GroupShutdownException e) {
            this.shutdownException = e;
            throw e;
        }
        catch (RuntimeException e) {
            this.shutdownException = e;
            LoggerUtils.severe(this.logger, this.repImpl, "Replica unexpected exception " + e + " " + LoggerUtils.getStackTrace(e));
            throw e;
        }
        catch (MasterStatus.MasterSyncException e) {
            LoggerUtils.info(this.logger, this.repImpl, e.getMessage());
        }
        catch (HardRecoveryElectionException e) {
            this.hardRecoveryElectionException = e;
            LoggerUtils.info(this.logger, this.repImpl, e.getMessage());
        }
        catch (Exception e) {
            this.shutdownException = e;
            LoggerUtils.severe(this.logger, this.repImpl, "Replica unexpected exception " + e + " " + LoggerUtils.getStackTrace(e));
            throw EnvironmentFailureException.unexpectedException(e);
        }
        finally {
            this.loopExitCleanup();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected void doRunReplicaLoopInternalWork() throws Exception {
        int timeoutMs = this.repNode.getConfigManager().getDuration(RepParams.REPLICA_TIMEOUT);
        this.replicaFeederChannel.setTimeoutMs(timeoutMs);
        this.replayQueue.clear();
        this.repImpl.getReplay().reset();
        this.replicaOutputThread = new ReplicaOutputThread(this.repImpl);
        this.replicaOutputThread.start();
        ReplayThread replayThread = new ReplayThread();
        replayThread.start();
        try {
            block17: while (true) {
                BinaryProtocol.Message message = this.protocol.read(this.replicaFeederChannel);
                if (this.repNode.isShutdownOrInvalid()) return;
                if (message == null) {
                    return;
                }
                this.repNode.getRepImpl().checkDiskLimitViolation();
                if (message.getOp() == Protocol.HEARTBEAT) {
                    this.replay.noteEnqueueHeartbeatRequest((BaseProtocol.Heartbeat)message);
                }
                while (true) {
                    if (this.replayQueue.offer(message, 1000000000L, TimeUnit.NANOSECONDS)) continue block17;
                    if (!replayThread.isAlive()) {
                        return;
                    }
                    this.nMessageQueueOverflows.increment();
                    continue;
                    break;
                }
                break;
            }
        }
        catch (IOException ioe) {
            replayThread.exitRequest = ReplayExitType.SOFT;
            if (replayThread.exitRequest == ReplayExitType.SOFT) {
                replayThread.join();
            }
            try {
                if (replayThread.exception != null) {
                    throw replayThread.exception;
                }
                if (this.replicaOutputThread.getException() == null) return;
                throw this.replicaOutputThread.getException();
            }
            finally {
                replayThread.exitRequest = ReplayExitType.IMMEDIATE;
                replayThread.join();
                this.replicaOutputThread.shutdownThread(this.logger);
                this.replicaOutputThread = null;
            }
        }
        finally {
            if (replayThread.exitRequest == ReplayExitType.SOFT) {
                replayThread.join();
            }
            try {
                if (replayThread.exception != null) {
                    throw replayThread.exception;
                }
                if (this.replicaOutputThread.getException() != null) {
                    throw this.replicaOutputThread.getException();
                }
            }
            finally {
                replayThread.exitRequest = ReplayExitType.IMMEDIATE;
                replayThread.join();
                this.replicaOutputThread.shutdownThread(this.logger);
                this.replicaOutputThread = null;
            }
        }
    }

    private GroupShutdownException processShutdown(BaseProtocol.ShutdownRequest shutdown) throws IOException {
        this.replay.queueAck(ReplicaOutputThread.SHUTDOWN_ACK);
        this.replicaFeederChannel.setTimeoutMs(Integer.MAX_VALUE);
        this.repNode.getRepImpl().requestShutdownDaemons();
        LoggerUtils.info(this.logger, this.repImpl, "Checkpoint initiated.");
        CheckpointConfig config = new CheckpointConfig();
        config.setForce(true);
        config.setMinimizeRecoveryTime(true);
        try {
            this.repNode.getRepImpl().invokeCheckpoint(config, "Group Shutdown");
            LoggerUtils.info(this.logger, this.repImpl, "Checkpoint completed.");
        }
        catch (Exception e) {
            LoggerUtils.info(this.logger, this.repImpl, "Checkpoint failed: " + e);
        }
        this.repNode.getRepImpl().shutdownDaemons();
        return new GroupShutdownException(this.logger, this.repNode, shutdown.getShutdownTimeMs());
    }

    private void initReplicaLoop() throws IOException, ConnectRetryException, DatabaseException, BinaryProtocol.ProtocolException, InterruptedException, HardRecoveryElectionException {
        boolean hardRecoveryNeedsElection;
        this.createReplicaFeederChannel();
        ReplicaFeederHandshake handshake = new ReplicaFeederHandshake(new RepFeederHandshakeConfig());
        this.protocol = handshake.execute();
        this.repNode.notifyReplicaConnected();
        this.repNode.globalCBVLSN.init(this.repNode, handshake.getFeederMinJEVersion());
        if (this.hardRecoveryElectionException != null) {
            LoggerUtils.info(this.logger, this.repImpl, "Replica syncup after election to verify master:" + this.hardRecoveryElectionException.getMaster() + " elected master:" + this.repNode.getMasterStatus().getNodeMasterNameId());
            hardRecoveryNeedsElection = false;
        } else {
            hardRecoveryNeedsElection = true;
        }
        this.hardRecoveryElectionException = null;
        ReplicaFeederSyncup syncup = new ReplicaFeederSyncup(this.repNode, this.replay, this.replicaFeederChannel, this.protocol, hardRecoveryNeedsElection);
        syncup.execute(this.repNode.getCBVLSNTracker());
        this.txnEndVLSN = syncup.getMatchedVLSN();
        long matchedTxnEndTime = syncup.getMatchedVLSNTime();
        this.consistencyTracker.reinit(this.txnEndVLSN.getSequence(), matchedTxnEndTime);
        BaseProtocol.Heartbeat heartbeat = this.protocol.read(this.replicaFeederChannel.getChannel(), BaseProtocol.Heartbeat.class);
        this.processHeartbeat(heartbeat);
        long replicaDelta = this.consistencyTracker.getMasterTxnEndVLSN() - this.consistencyTracker.lastReplayedVLSN.getSequence();
        LoggerUtils.info(this.logger, this.repImpl, String.format("Replica initialization completed. Replica VLSN: %s  Heartbeat master commit VLSN: %,d  DTVLSN:%,d Replica VLSN delta: %,d", this.consistencyTracker.lastReplayedVLSN, this.consistencyTracker.getMasterTxnEndVLSN(), this.repNode.getAnyDTVLSN(), replicaDelta));
        this.repNode.getReadyLatch().countDown();
    }

    private void processHeartbeat(BaseProtocol.Heartbeat heartbeat) throws IOException {
        boolean trackResponse = this.replay.noteDequeueHeartbeatRequest(heartbeat);
        this.replay.queueAck(trackResponse ? ReplicaOutputThread.HEARTBEAT_ACK_TIMED : ReplicaOutputThread.HEARTBEAT_ACK);
        this.consistencyTracker.trackHeartbeat(heartbeat);
    }

    private void loopExitCleanup() {
        if (this.shutdownException != null) {
            if (this.shutdownException instanceof RetryException) {
                LoggerUtils.info(this.logger, this.repImpl, "Retrying connection to feeder. Message: " + this.shutdownException.getMessage());
            } else if (this.shutdownException instanceof GroupShutdownException) {
                LoggerUtils.info(this.logger, this.repImpl, "Exiting inner Replica loop. Master requested shutdown.");
            } else {
                LoggerUtils.warning(this.logger, this.repImpl, "Exiting inner Replica loop with exception " + this.shutdownException + "\n" + LoggerUtils.getStackTrace(this.shutdownException));
            }
        } else {
            LoggerUtils.info(this.logger, this.repImpl, "Exiting inner Replica loop.");
        }
        this.clearDbTreeCache();
        RepUtils.shutdownChannel(this.replicaFeederChannel);
        if (this.consistencyTracker != null) {
            this.consistencyTracker.logStats();
        }
        if (this.protocol != null) {
            this.aggProtoStats.addAll(this.protocol.getStats(StatsConfig.DEFAULT));
        }
        this.protocol = null;
        if (this.repNode.getNodeType().hasTransientId()) {
            this.repNode.getNameIdPair().revertToNull();
        }
    }

    void clearDbTreeCache() {
        this.dbCache.clear();
    }

    void masterTransitionCleanup() throws DatabaseException {
        MasterTransfer activeTransfer = this.repNode.getActiveTransfer();
        if (activeTransfer != null) {
            String msg = "Master state transition while there is an ongoing master transfer initiated at:" + new TracerFormatter().getDate(activeTransfer.getStartTime());
            throw EnvironmentFailureException.unexpectedState(this.repImpl, msg);
        }
        this.hardRecoveryElectionException = null;
        this.replay.abortOldTxns();
        this.consistencyTracker.forceTripLatches(new MasterStateException(this.repNode.getRepImpl().getStateChangeEvent()));
    }

    void replicaTransitionCleanup() {
        if (this.repImpl.getState() == ReplicatedEnvironment.State.MASTER) {
            throw EnvironmentFailureException.unexpectedState(this.repImpl, "Should not be in MASTER state when converting from master to replica state");
        }
        Set<MasterTxn> existingMasterTxns = this.repImpl.getExistingMasterTxns();
        LoggerUtils.info(this.logger, this.repImpl, "Transitioning node to replica state, " + existingMasterTxns.size() + " txns to clean up");
        for (MasterTxn masterTxn : existingMasterTxns) {
            masterTxn.freeze();
        }
        this.repImpl.unblockTxnCompletion();
        for (MasterTxn masterTxn : existingMasterTxns) {
            ReplayTxn replayTxn = masterTxn.convertToReplayTxnAndClose(this.logger, this.repImpl.getReplay());
            if (replayTxn == null) {
                LoggerUtils.info(this.logger, this.repImpl, "Master Txn " + masterTxn.getId() + " has no locks, nothing to transfer");
                continue;
            }
            this.repImpl.getTxnManager().registerTxn(replayTxn);
            LoggerUtils.info(this.logger, this.repImpl, "state for replay transaction " + replayTxn.getId() + " = " + replayTxn.getState());
        }
        this.repNode.clearActiveTransfer();
    }

    private void createReplicaFeederChannel() throws IOException, ConnectRetryException {
        DataChannel dataChannel = null;
        DbConfigManager configManager = this.repNode.getConfigManager();
        int timeoutMs = configManager.getDuration(RepParams.PRE_HEARTBEAT_TIMEOUT);
        try {
            dataChannel = this.repImpl.getChannelFactory().connect(this.repNode.getMasterStatus().getNodeMaster(), this.repImpl.getHostAddress(), this.repImpl.getFeederConnectOptions().setBindAnyLocalAddr(this.repImpl.getConfigManager().getBoolean(RepParams.BIND_INADDR_ANY)));
            this.replicaFeederChannel = new NamedChannelWithTimeout(this.repNode, dataChannel, timeoutMs);
            ServiceDispatcher.doServiceHandshake(dataChannel, "Feeder");
        }
        catch (ConnectException e) {
            throw new ConnectRetryException(e.getMessage(), 2, 100);
        }
        catch (ServiceDispatcher.ServiceConnectFailedException e) {
            if (e.getResponse() == ServiceDispatcher.Response.UNKNOWN_SERVICE) {
                throw new ConnectRetryException(e.getMessage(), 10, 100);
            }
            throw EnvironmentFailureException.unexpectedException(e);
        }
    }

    public StatGroup getReplayStats(StatsConfig config) {
        return this.replay.getStats(config);
    }

    public StatGroup getProtocolStats(StatsConfig config) {
        StatGroup protoStats = this.aggProtoStats.cloneGroup(config.getClear());
        Protocol prot = this.protocol;
        if (prot != null) {
            protoStats.addAll(prot.getStats(config));
        }
        return protoStats;
    }

    public StatGroup getTrackerStats(StatsConfig config) {
        return this.consistencyTracker.getStats(config);
    }

    public void resetStats() {
        this.replay.resetStats();
        this.aggProtoStats.clear();
        if (this.protocol != null) {
            this.protocol.resetStats();
        }
        this.consistencyTracker.resetStats();
    }

    public static void setInitialReplayHook(TestHook<BinaryProtocol.Message> hook) {
        initialReplayHook = hook;
    }

    public void setReplicaFeederSyncupHook(ReplicaFeederSyncup.TestHook<Object> syncupHook) {
        this.replicaFeederSyncupHook = syncupHook;
    }

    public ReplicaFeederSyncup.TestHook<Object> getReplicaFeederSyncupHook() {
        return this.replicaFeederSyncupHook;
    }

    public static class HardRecoveryElectionException
    extends Exception {
        final NameIdPair masterNameIdPair;
        final VLSN lastTxnEnd;
        final VLSN matchpointVLSN;

        public HardRecoveryElectionException(NameIdPair masterNameIdPair, VLSN lastTxnEnd, VLSN matchpointVLSN) {
            this.masterNameIdPair = masterNameIdPair;
            this.lastTxnEnd = lastTxnEnd;
            this.matchpointVLSN = matchpointVLSN;
        }

        public NameIdPair getMaster() {
            return this.masterNameIdPair;
        }

        @Override
        public String getMessage() {
            return "Need election preceding hard recovery to verify master:" + this.masterNameIdPair + " last txn end:" + this.lastTxnEnd + " matchpoint VLSN:" + this.matchpointVLSN;
        }
    }

    static class ConnectRetryException
    extends RetryException {
        ConnectRetryException(String message, int retries, int retrySleepMs) {
            super(message, retries, retrySleepMs);
        }
    }

    static abstract class RetryException
    extends Exception {
        final int retries;
        final int retrySleepMs;

        RetryException(String message, int retries, int retrySleepMs) {
            super(message);
            this.retries = retries;
            this.retrySleepMs = retrySleepMs;
        }

        @Override
        public String getMessage() {
            return "Failed after retries: " + this.retries + " with retry interval: " + this.retrySleepMs + "ms. (error: " + super.getMessage() + ")";
        }
    }

    private abstract class OrderedLatches {
        final EnvironmentImpl envImpl;
        final SortedMap<Long, RepUtils.ExceptionAwareCountDownLatch> latchMap = new TreeMap<Long, RepUtils.ExceptionAwareCountDownLatch>();

        abstract boolean tripPredicate(long var1, long var3);

        OrderedLatches(EnvironmentImpl envImpl) {
            this.envImpl = envImpl;
        }

        synchronized RepUtils.ExceptionAwareCountDownLatch getOrCreate(Long key) {
            RepUtils.ExceptionAwareCountDownLatch latch = (RepUtils.ExceptionAwareCountDownLatch)this.latchMap.get(key);
            if (latch == null) {
                latch = new RepUtils.ExceptionAwareCountDownLatch(this.envImpl, 1);
                this.latchMap.put(key, latch);
            }
            return latch;
        }

        synchronized void trip(long tripValue, DatabaseException exception) {
            while (this.latchMap.size() > 0) {
                Long key = this.latchMap.firstKey();
                if (!this.tripPredicate(key, tripValue)) {
                    return;
                }
                RepUtils.ExceptionAwareCountDownLatch latch = (RepUtils.ExceptionAwareCountDownLatch)this.latchMap.remove(key);
                latch.releaseAwait(exception);
            }
        }
    }

    public class ConsistencyTracker {
        private final long NULL_VLSN_SEQUENCE = -1L;
        private long lastReplayedTxnVLSN = -1L;
        private VLSN lastReplayedVLSN = VLSN.NULL_VLSN;
        private long masterTxnEndTime = 0L;
        private volatile long masterTxnEndVLSN;
        private volatile long masterNow = 0L;
        private final StatGroup stats = new StatGroup("ConsistencyTracker", "Statistics on the delays experienced by read requests at the replica in order to conform to the specified ReplicaConsistencyPolicy.");
        private final LongStat nLagConsistencyWaits = new LongStat(this.stats, ReplicaStatDefinition.N_LAG_CONSISTENCY_WAITS);
        private final LongStat nLagConsistencyWaitMs = new LongStat(this.stats, ReplicaStatDefinition.N_LAG_CONSISTENCY_WAIT_MS);
        private final LongStat nVLSNConsistencyWaits = new LongStat(this.stats, ReplicaStatDefinition.N_VLSN_CONSISTENCY_WAITS);
        private final LongStat nVLSNConsistencyWaitMs = new LongStat(this.stats, ReplicaStatDefinition.N_VLSN_CONSISTENCY_WAIT_MS);
        private final OrderedLatches vlsnLatches;
        private final OrderedLatches lagLatches;

        public ConsistencyTracker() {
            this.vlsnLatches = new OrderedLatches((EnvironmentImpl)Replica.this.repNode.getRepImpl()){

                @Override
                boolean tripPredicate(long keyVLSN, long tripVLSN) {
                    return keyVLSN <= tripVLSN;
                }
            };
            this.lagLatches = new OrderedLatches((EnvironmentImpl)Replica.this.repNode.getRepImpl()){

                @Override
                boolean tripPredicate(long keyLag, long currentLag) {
                    return currentLag <= keyLag;
                }
            };
        }

        void reinit(long matchedTxnVLSN, long matchedTxnEndTime) {
            this.lastReplayedVLSN = new VLSN(matchedTxnVLSN);
            this.lastReplayedTxnVLSN = matchedTxnVLSN;
            this.masterTxnEndTime = matchedTxnEndTime;
        }

        public long getMasterTxnEndVLSN() {
            return this.masterTxnEndVLSN;
        }

        void close() {
            this.logStats();
        }

        void logStats() {
            if (Replica.this.logger.isLoggable(Level.INFO)) {
                LoggerUtils.info(Replica.this.logger, Replica.this.repImpl, "Replica stats - Lag waits: " + this.nLagConsistencyWaits.get() + " Lag wait time: " + this.nLagConsistencyWaitMs.get() + "ms.  VLSN waits: " + this.nVLSNConsistencyWaits.get() + " Lag wait time: " + this.nVLSNConsistencyWaitMs.get() + "ms.");
            }
        }

        private long currentLag() {
            if (this.masterNow == 0L) {
                return Integer.MAX_VALUE;
            }
            long lag = this.lastReplayedTxnVLSN < this.masterTxnEndVLSN ? System.currentTimeMillis() - this.masterTxnEndTime : (this.lastReplayedTxnVLSN == this.masterTxnEndVLSN ? System.currentTimeMillis() - this.masterNow : System.currentTimeMillis() - this.masterNow);
            return lag;
        }

        synchronized void forceTripLatches(DatabaseException exception) {
            assert (exception != null);
            this.vlsnLatches.trip(Long.MAX_VALUE, exception);
            this.lagLatches.trip(0L, exception);
        }

        synchronized void trackTxnEnd() {
            Replay.TxnInfo lastReplayedTxn = Replica.this.replay.getLastReplayedTxn();
            this.lastReplayedTxnVLSN = lastReplayedTxn.getTxnVLSN().getSequence();
            this.masterTxnEndTime = lastReplayedTxn.getMasterTxnEndTime();
            if (this.lastReplayedTxnVLSN > this.masterTxnEndVLSN && this.masterTxnEndTime >= this.masterNow) {
                this.masterTxnEndVLSN = this.lastReplayedTxnVLSN;
                this.masterNow = this.masterTxnEndTime;
            }
            this.vlsnLatches.trip(this.lastReplayedTxnVLSN, null);
            this.lagLatches.trip(this.currentLag(), null);
        }

        synchronized void trackVLSN() {
            this.lastReplayedVLSN = Replica.this.replay.getLastReplayedVLSN();
            this.vlsnLatches.trip(this.lastReplayedVLSN.getSequence(), null);
        }

        synchronized void trackHeartbeat(BaseProtocol.Heartbeat heartbeat) {
            this.masterTxnEndVLSN = heartbeat.getCurrentTxnEndVLSN();
            this.masterNow = heartbeat.getMasterNow();
            this.lagLatches.trip(this.currentLag(), null);
        }

        public void lagAwait(TimeConsistencyPolicy consistencyPolicy) throws InterruptedException, ReplicaConsistencyException, DatabaseException {
            long lag;
            long currentLag = this.currentLag();
            if (currentLag <= (lag = consistencyPolicy.getPermissibleLag(TimeUnit.MILLISECONDS))) {
                return;
            }
            long waitStart = System.currentTimeMillis();
            RepUtils.ExceptionAwareCountDownLatch waitLagLatch = this.lagLatches.getOrCreate(lag);
            this.await(waitLagLatch, consistencyPolicy);
            this.nLagConsistencyWaits.increment();
            this.nLagConsistencyWaitMs.add(System.currentTimeMillis() - waitStart);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void awaitVLSN(long vlsn, ReplicaConsistencyPolicy consistencyPolicy) throws InterruptedException, ReplicaConsistencyException, DatabaseException {
            long waitStart = System.currentTimeMillis();
            RepUtils.ExceptionAwareCountDownLatch waitVLSNLatch = null;
            ConsistencyTracker consistencyTracker = this;
            synchronized (consistencyTracker) {
                long compareVLSN;
                long l = compareVLSN = consistencyPolicy instanceof CommitPointConsistencyPolicy ? this.lastReplayedTxnVLSN : this.lastReplayedVLSN.getSequence();
                if (vlsn <= compareVLSN) {
                    return;
                }
                waitVLSNLatch = this.vlsnLatches.getOrCreate(vlsn);
            }
            this.await(waitVLSNLatch, consistencyPolicy);
            this.nVLSNConsistencyWaits.increment();
            this.nVLSNConsistencyWaitMs.add(System.currentTimeMillis() - waitStart);
        }

        private void await(RepUtils.ExceptionAwareCountDownLatch consistencyLatch, ReplicaConsistencyPolicy consistencyPolicy) throws ReplicaConsistencyException, DatabaseException, InterruptedException {
            if (!consistencyLatch.awaitOrException(consistencyPolicy.getTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)) {
                RepImpl rimpl = Replica.this.repNode.getRepImpl();
                boolean inactive = !rimpl.getState().isActive();
                String rnName = rimpl.getNameIdPair().getName();
                throw new ReplicaConsistencyException(consistencyPolicy, rnName, inactive);
            }
        }

        private StatGroup getStats(StatsConfig config) {
            return this.stats.cloneGroup(config.getClear());
        }

        private void resetStats() {
            this.stats.clear();
        }

        public void shutdown() {
            Exception savedShutdownException = Replica.this.repNode.getSavedShutdownException();
            EnvironmentFailureException latchException = savedShutdownException instanceof EnvironmentFailureException ? (EnvironmentFailureException)savedShutdownException : EnvironmentFailureException.unexpectedException("Node: " + Replica.this.repNode.getNameIdPair() + " was shut down.", savedShutdownException);
            this.forceTripLatches(latchException);
        }
    }

    private class RepFeederHandshakeConfig
    implements ReplicaFeederHandshakeConfig {
        private RepFeederHandshakeConfig() {
        }

        @Override
        public RepImpl getRepImpl() {
            return Replica.this.repNode.getRepImpl();
        }

        @Override
        public NameIdPair getNameIdPair() {
            return Replica.this.repNode.getNameIdPair();
        }

        @Override
        public RepUtils.Clock getClock() {
            return Replica.this.repNode.getClock();
        }

        @Override
        public NodeType getNodeType() {
            return Replica.this.repNode.getNodeType();
        }

        @Override
        public RepGroupImpl getGroup() {
            return Replica.this.repNode.getGroup();
        }

        @Override
        public NamedChannel getNamedChannel() {
            return Replica.this.replicaFeederChannel;
        }
    }

    class ReplayThread
    extends StoppableThread {
        private volatile Exception exception;
        volatile ReplayExitType exitRequest;
        private static final long QUEUE_POLL_INTERVAL_NS = 1000000000L;

        protected ReplayThread() {
            super(Replica.this.repImpl, "ReplayThread");
            this.exitRequest = null;
        }

        @Override
        protected int initiateSoftShutdown() {
            this.exitRequest = ReplayExitType.IMMEDIATE;
            return 0;
        }

        @Override
        public void run() {
            LoggerUtils.info(Replica.this.logger, Replica.this.repImpl, "Replay thread started. Message queue size:" + Replica.this.replayQueue.remainingCapacity());
            int dbTreeCacheClearingOpCount = Replica.this.repNode.getDbTreeCacheClearingOpCount();
            long opCount = 0L;
            try {
                while (true) {
                    long pollIntervalNs = Replica.this.replay.getPollIntervalNs(1000000000L);
                    BinaryProtocol.Message message = Replica.this.replayQueue.poll(pollIntervalNs, TimeUnit.NANOSECONDS);
                    if (this.exitRequest == ReplayExitType.IMMEDIATE || this.exitRequest == ReplayExitType.SOFT && message == null || Replica.this.repNode.isShutdownOrInvalid()) {
                        if (this.exitRequest == ReplayExitType.SOFT) {
                            Replica.this.replay.flushPendingAcks(Long.MAX_VALUE);
                        }
                        return;
                    }
                    long startNs = System.nanoTime();
                    Replica.this.replay.flushPendingAcks(startNs);
                    Replica.this.repNode.getMasterStatus().assertSync();
                    if (message == null) continue;
                    assert (TestHookExecute.doHookIfSet(Replica.this.replayHook, message));
                    BinaryProtocol.MessageOp messageOp = message.getOp();
                    if (messageOp == Protocol.SHUTDOWN_REQUEST) {
                        throw Replica.this.processShutdown((BaseProtocol.ShutdownRequest)message);
                    }
                    if (messageOp == Protocol.HEARTBEAT) {
                        Replica.this.processHeartbeat((BaseProtocol.Heartbeat)message);
                        Replica.this.dbCache.tick();
                    } else {
                        if (Replica.this.dontProcessStream) continue;
                        Replica.this.replay.replayEntry(startNs, (BaseProtocol.Entry)message);
                        if (((BaseProtocol.Entry)message).isTxnEnd()) {
                            Replica.this.txnEndVLSN = Replica.this.replay.getLastReplayedVLSN();
                            Replica.this.consistencyTracker.trackTxnEnd();
                        }
                        Replica.this.consistencyTracker.trackVLSN();
                    }
                    if (Replica.this.testDelayMs > 0) {
                        Thread.sleep(Replica.this.testDelayMs);
                    }
                    if (opCount++ % (long)dbTreeCacheClearingOpCount != 0L) continue;
                    Replica.this.clearDbTreeCache();
                }
            }
            catch (Exception e) {
                this.exception = e;
                Replica.this.replayQueue.clear();
                LoggerUtils.info(Replica.this.logger, Replica.this.repImpl, "closing replicaFeederChannel = " + Replica.this.replicaFeederChannel);
                RepUtils.shutdownChannel(Replica.this.replicaFeederChannel);
                LoggerUtils.info(Replica.this.logger, Replica.this.repImpl, "Replay thread exiting with exception:" + e.getMessage());
                return;
            }
        }

        @Override
        protected Logger getLogger() {
            return Replica.this.logger;
        }
    }

    private static enum ReplayExitType {
        IMMEDIATE,
        SOFT;

    }
}

