/*
 * Decompiled with CFR 0.152.
 */
package com.sleepycat.je.rep.impl.node;

import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Durability;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.JEVersion;
import com.sleepycat.je.StatsConfig;
import com.sleepycat.je.dbi.DbConfigManager;
import com.sleepycat.je.dbi.EnvironmentFailureReason;
import com.sleepycat.je.dbi.EnvironmentImpl;
import com.sleepycat.je.log.ChecksumException;
import com.sleepycat.je.rep.ReplicationSecurityException;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.impl.RepNodeImpl;
import com.sleepycat.je.rep.impl.RepParams;
import com.sleepycat.je.rep.impl.node.FeederManager;
import com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition;
import com.sleepycat.je.rep.impl.node.MasterTransfer;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.impl.node.RepNode;
import com.sleepycat.je.rep.impl.node.cbvlsn.LocalCBVLSNUpdater;
import com.sleepycat.je.rep.net.DataChannel;
import com.sleepycat.je.rep.stream.ArbiterFeederSource;
import com.sleepycat.je.rep.stream.BaseProtocol;
import com.sleepycat.je.rep.stream.FeederFilter;
import com.sleepycat.je.rep.stream.FeederReplicaHandshake;
import com.sleepycat.je.rep.stream.FeederReplicaSyncup;
import com.sleepycat.je.rep.stream.FeederSource;
import com.sleepycat.je.rep.stream.FeederTxns;
import com.sleepycat.je.rep.stream.MasterFeederSource;
import com.sleepycat.je.rep.stream.MasterStatus;
import com.sleepycat.je.rep.stream.OutputWireRecord;
import com.sleepycat.je.rep.stream.Protocol;
import com.sleepycat.je.rep.subscription.StreamAuthenticator;
import com.sleepycat.je.rep.txn.MasterTxn;
import com.sleepycat.je.rep.utilint.BinaryProtocol;
import com.sleepycat.je.rep.utilint.NamedChannel;
import com.sleepycat.je.rep.utilint.NamedChannelWithTimeout;
import com.sleepycat.je.rep.utilint.RepUtils;
import com.sleepycat.je.rep.vlsn.VLSNIndex;
import com.sleepycat.je.rep.vlsn.VLSNRange;
import com.sleepycat.je.utilint.AtomicLongComponent;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.LongAvgRate;
import com.sleepycat.je.utilint.LongAvgRateStat;
import com.sleepycat.je.utilint.LongDiffStat;
import com.sleepycat.je.utilint.LongMaxZeroStat;
import com.sleepycat.je.utilint.StatGroup;
import com.sleepycat.je.utilint.StoppableThread;
import com.sleepycat.je.utilint.StringStat;
import com.sleepycat.je.utilint.TestHook;
import com.sleepycat.je.utilint.TestHookExecute;
import com.sleepycat.je.utilint.VLSN;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class Feeder {
    private int heartbeatMs;
    private final FeederManager feederManager;
    private final RepNode repNode;
    private final RepImpl repImpl;
    private final NamedChannelWithTimeout feederReplicaChannel;
    private final InputThread inputThread;
    private final OutputThread outputThread;
    private FeederFilter feederFilter;
    private final StreamAuthenticator authenticator;
    private final long securityChkIntvMs;
    private boolean isArbiterFeeder = false;
    private FeederSource feederSource;
    private int protocolVersion;
    private volatile VLSN feederVLSN = VLSN.NULL_VLSN;
    private volatile VLSN replicaTxnEndVLSN = VLSN.NULL_VLSN;
    private volatile long lastResponseTime = 0L;
    private volatile MasterTransfer masterXfr;
    private volatile boolean caughtUp = false;
    private final MasterStatus masterStatus;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final Logger logger;
    private final NameIdPair nameIdPair;
    private volatile NameIdPair replicaNameIdPair = NameIdPair.NULL;
    private volatile int streamLogVersion = 0;
    private volatile JEVersion replicaJEVersion = null;
    private volatile RepNodeImpl replicaNode = null;
    private volatile long lastHeartbeatTime;
    private volatile long lastHeartbeatCommitVLSN;
    private volatile long lastHeartbeatCommitTimestamp;
    private final LongAvgRateStat vlsnRate;
    private volatile TestHook<BinaryProtocol.Message> writeMessageHook;
    private static volatile TestHook<BinaryProtocol.Message> initialWriteMessageHook;

    private NamedChannelWithTimeout configureChannel(DataChannel channel) throws IOException {
        try {
            String remoteEndpoint;
            channel.configureBlocking(true);
            try {
                remoteEndpoint = channel.getRemoteAddress().toString();
            }
            catch (IOException e) {
                remoteEndpoint = "unknown";
                LoggerUtils.info(this.logger, this.repImpl, "Could not determine remote address. " + e.getMessage());
            }
            LoggerUtils.info(this.logger, this.repImpl, "Feeder accepted connection from " + remoteEndpoint);
            int timeoutMs = this.repNode.getConfigManager().getDuration(RepParams.PRE_HEARTBEAT_TIMEOUT);
            boolean tcpNoDelay = this.repNode.getConfigManager().getBoolean(RepParams.FEEDER_TCP_NO_DELAY);
            channel.socket().setTcpNoDelay(tcpNoDelay);
            return new NamedChannelWithTimeout(this.repNode, channel, timeoutMs);
        }
        catch (IOException e) {
            LoggerUtils.warning(this.logger, this.repImpl, "IO exception while configuring channel Exception:" + e.getMessage());
            throw e;
        }
    }

    Feeder(FeederManager feederManager, DataChannel dataChannel) throws DatabaseException, IOException {
        this.feederManager = feederManager;
        this.repNode = feederManager.repNode();
        this.repImpl = this.repNode.getRepImpl();
        this.masterStatus = this.repNode.getMasterStatus();
        this.nameIdPair = this.repNode.getNameIdPair();
        this.feederSource = null;
        this.logger = LoggerUtils.getLogger(this.getClass());
        this.feederReplicaChannel = this.configureChannel(dataChannel);
        this.inputThread = new InputThread();
        this.outputThread = new OutputThread();
        this.heartbeatMs = feederManager.repNode().getHeartbeatInterval();
        this.vlsnRate = this.repImpl.getFeederTxns().getVLSNRate();
        this.writeMessageHook = initialWriteMessageHook;
        this.feederFilter = null;
        this.authenticator = feederManager.repNode().getAuthenticator();
        this.securityChkIntvMs = feederManager.repNode().getSecurityCheckInterval();
    }

    void startFeederThreads() {
        this.inputThread.start();
    }

    public Feeder() {
        this.feederManager = null;
        this.repNode = null;
        this.repImpl = null;
        this.masterStatus = null;
        this.feederSource = null;
        this.feederReplicaChannel = null;
        this.nameIdPair = NameIdPair.NULL;
        this.logger = LoggerUtils.getLoggerFixedPrefix(this.getClass(), "TestFeeder");
        this.inputThread = null;
        this.outputThread = null;
        this.shutdown.set(true);
        this.vlsnRate = null;
        this.writeMessageHook = initialWriteMessageHook;
        this.feederFilter = null;
        this.authenticator = null;
        this.securityChkIntvMs = 0L;
    }

    public void initMasterFeederSource(VLSN startVLSN) throws IOException {
        this.replicaTxnEndVLSN = startVLSN.getPrev();
        if (this.replicaTxnEndVLSN.compareTo(this.repNode.getCurrentTxnEndVLSN()) >= 0) {
            this.caughtUp = true;
        }
        this.feederVLSN = startVLSN;
        this.feederSource = new MasterFeederSource(this.repNode.getRepImpl(), this.repNode.getVLSNIndex(), this.replicaNameIdPair, startVLSN);
    }

    private void initArbiterFeederSource() {
        this.feederSource = new ArbiterFeederSource(this.repNode.getRepImpl());
        this.feederVLSN = VLSN.NULL_VLSN;
        this.isArbiterFeeder = true;
    }

    public StatGroup getProtocolStats(StatsConfig config) {
        Protocol protocol = this.outputThread.protocol;
        return protocol != null ? protocol.getStats(config) : new StatGroup("BinaryProtocol", "Network traffic due to the replication stream.");
    }

    void resetStats() {
        Protocol protocol = this.outputThread.protocol;
        if (protocol != null) {
            protocol.resetStats();
        }
    }

    void setMasterTransfer(MasterTransfer mt) {
        this.masterXfr = mt;
        if (this.caughtUp) {
            this.adviseMasterTransferProgress();
        }
    }

    void adviseMasterTransferProgress() {
        MasterTransfer mt = this.masterXfr;
        if (mt != null) {
            mt.noteProgress(new MasterTransfer.VLSNProgress(this.replicaTxnEndVLSN, this.replicaNameIdPair.getName()));
        }
    }

    public RepNode getRepNode() {
        return this.repNode;
    }

    public NameIdPair getReplicaNameIdPair() {
        return this.replicaNameIdPair;
    }

    public void setFeederFilter(FeederFilter filter) {
        this.feederFilter = filter;
    }

    public VLSN getReplicaTxnEndVLSN() {
        return this.replicaTxnEndVLSN;
    }

    public VLSN getFeederVLSN() {
        return this.feederVLSN;
    }

    public Channel getChannel() {
        return this.feederReplicaChannel;
    }

    public JEVersion getReplicaJEVersion() {
        return this.replicaJEVersion;
    }

    public RepNodeImpl getReplicaNode() {
        return this.replicaNode;
    }

    public void shutdown(Exception shutdownException) {
        StatGroup pstats;
        boolean changed = this.shutdown.compareAndSet(false, true);
        if (!changed) {
            return;
        }
        MasterTransfer mt = this.masterXfr;
        String replicaName = this.replicaNameIdPair.getName();
        if (mt != null) {
            mt.giveUp(replicaName);
        }
        this.feederManager.removeFeeder(this);
        if (this.feederSource != null) {
            this.feederSource.shutdown(this.repImpl);
        }
        StatGroup statGroup = pstats = this.inputThread.protocol != null ? this.inputThread.protocol.getStats(StatsConfig.DEFAULT) : new StatGroup("BinaryProtocol", "Network traffic due to the replication stream.");
        if (this.outputThread.protocol != null) {
            pstats.addAll(this.outputThread.protocol.getStats(StatsConfig.DEFAULT));
        }
        this.feederManager.incStats(pstats);
        this.feederManager.getReplicaDelayMap().removeStat(replicaName);
        this.feederManager.getReplicaLastCommitTimestampMap().removeStat(replicaName);
        this.feederManager.getReplicaLastCommitVLSNMap().removeStat(replicaName);
        this.feederManager.getReplicaVLSNLagMap().removeStat(replicaName);
        this.feederManager.getReplicaVLSNRateMap().removeStat(replicaName);
        LoggerUtils.info(this.logger, this.repImpl, "Shutting down feeder for replica " + replicaName + (shutdownException == null ? "" : " Reason: " + shutdownException.getMessage()) + RepUtils.writeTimesString(pstats));
        if (this.repNode.getReplicaCloseCatchupMs() >= 0L) {
            try {
                this.inputThread.join();
            }
            catch (InterruptedException e) {
                LoggerUtils.warning(this.logger, this.repImpl, "Interrupted while waiting to join thread:" + this.outputThread);
            }
        }
        this.outputThread.shutdownThread(this.logger);
        this.inputThread.shutdownThread(this.logger);
        this.feederManager.decrementManagedFeederCount();
        LoggerUtils.finest(this.logger, this.repImpl, this.feederReplicaChannel + " isOpen=" + this.feederReplicaChannel.getChannel().isOpen());
    }

    public boolean isShutdown() {
        return this.shutdown.get();
    }

    public ArbiterFeederSource getArbiterFeederSource() {
        if (this.feederSource != null && this.feederSource instanceof ArbiterFeederSource) {
            return (ArbiterFeederSource)this.feederSource;
        }
        return null;
    }

    public StreamAuthenticator getAuthenticator() {
        return this.authenticator;
    }

    private void deemAcked(long txnId) {
        FeederTxns.TxnInfo txnInfo = this.repNode.getFeederTxns().noteReplicaAck(this.replicaNode, txnId);
        if (txnInfo == null) {
            return;
        }
        VLSN commitVLSN = txnInfo.getCommitVLSN();
        if (commitVLSN == null) {
            return;
        }
        if (commitVLSN.compareTo(this.replicaTxnEndVLSN) > 0) {
            this.replicaTxnEndVLSN = commitVLSN;
            if (txnInfo.getPendingAcks() == 0) {
                this.repNode.updateDTVLSN(this.replicaTxnEndVLSN.getSequence());
            }
        }
        this.caughtUp = true;
        this.adviseMasterTransferProgress();
    }

    public String dumpState() {
        return "feederVLSN=" + this.feederVLSN + " replicaTxnEndVLSN=" + this.replicaTxnEndVLSN + (this.replicaNode != null && !this.replicaNode.getType().isElectable() ? " nodeType=" + (Object)((Object)this.replicaNode.getType()) : "");
    }

    public void setWriteMessageHook(TestHook<BinaryProtocol.Message> writeMessageHook) {
        this.writeMessageHook = writeMessageHook;
    }

    public TestHook<BinaryProtocol.Message> getWriteMessageHook() {
        return this.writeMessageHook;
    }

    public static void setInitialWriteMessageHook(TestHook<BinaryProtocol.Message> initialWriteMessageHook) {
        Feeder.initialWriteMessageHook = initialWriteMessageHook;
    }

    public boolean needSecurityChecks() {
        if (this.authenticator == null) {
            return false;
        }
        DataChannel channel = this.feederReplicaChannel.getChannel();
        return channel.isTrustCapable() && !channel.isTrusted();
    }

    private void doSecurityCheck() throws ReplicationSecurityException {
        if (!this.needSecurityChecks()) {
            return;
        }
        long curr = System.currentTimeMillis();
        if (curr - this.authenticator.getLastCheckTimeMs() >= this.securityChkIntvMs) {
            Feeder.checkAccess(this.authenticator, this.feederReplicaChannel.getNameIdPair().getName());
        }
    }

    private void processReauthenticate(BinaryProtocol.Message msg) {
        if (!this.getReplicaNode().getType().isExternal()) {
            return;
        }
        if (this.authenticator == null) {
            return;
        }
        Protocol.ReAuthenticate reauth = (Protocol.ReAuthenticate)msg;
        this.authenticator.setToken(reauth.getTokenBytes());
        Feeder.checkAccess(this.authenticator, this.feederReplicaChannel.getNameIdPair().getName());
    }

    private static void checkAccess(StreamAuthenticator authenticator, String replicaName) {
        if (!authenticator.checkAccess()) {
            String err = "replica " + replicaName + "fails the security check to stream requested data.";
            throw new ReplicationSecurityException(err, replicaName, null);
        }
    }

    public static class ExitException
    extends Exception {
        final boolean failReplica;

        public ExitException(String message) {
            super(message);
            this.failReplica = true;
        }

        public ExitException(Throwable cause, boolean failReplica) {
            super(cause);
            this.failReplica = failReplica;
        }

        public boolean failReplica() {
            return this.failReplica;
        }
    }

    private class IOThreadsHandler
    implements Thread.UncaughtExceptionHandler {
        private IOThreadsHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            LoggerUtils.severe(Feeder.this.logger, Feeder.this.repImpl, "Uncaught exception in feeder thread " + t + e.getMessage() + LoggerUtils.getStackTrace(e));
            Feeder.this.feederManager.setRepNodeShutdownException(EnvironmentFailureException.promote(Feeder.this.repNode.getRepImpl(), EnvironmentFailureReason.UNCAUGHT_EXCEPTION, "Uncaught exception in feeder thread:" + t, e));
            Feeder.this.repNode.interrupt();
        }
    }

    private class OutputThread
    extends StoppableThread {
        Protocol protocol;
        private long totalTransferDelay;
        private long shutdownRequestStart;
        private final boolean commitToNetwork;
        private final int transferLoggingThresholdMs;
        private final int batchNs;
        private final ByteBuffer batchBuff;
        private final LongMaxZeroStat nMaxReplicaLag;
        private final StringStat nMaxReplicaLagName;
        private final VLSNIndex vlsnIndex;
        private long lastCommitTimestamp;
        private long lastCommitVLSN;
        final int testDelayMs;

        OutputThread() {
            super(Feeder.this.repImpl, new IOThreadsHandler(), "Feeder Output");
            this.protocol = null;
            this.totalTransferDelay = 0L;
            this.shutdownRequestStart = 0L;
            DbConfigManager configManager = Feeder.this.repNode.getConfigManager();
            this.commitToNetwork = configManager.getBoolean(RepParams.COMMIT_TO_NETWORK);
            this.transferLoggingThresholdMs = configManager.getDuration(RepParams.TRANSFER_LOGGING_THRESHOLD);
            this.batchNs = Math.min(configManager.getInt(RepParams.FEEDER_BATCH_NS), Feeder.this.heartbeatMs * 1000000);
            int batchBuffSize = configManager.getInt(RepParams.FEEDER_BATCH_BUFF_KB) * 1024;
            this.batchBuff = ByteBuffer.allocateDirect(batchBuffSize);
            if (Feeder.this.feederManager != null) {
                this.nMaxReplicaLag = Feeder.this.feederManager.getnMaxReplicaLag();
                this.nMaxReplicaLagName = Feeder.this.feederManager.getnMaxReplicaLagName();
            } else {
                StatGroup stats = new StatGroup("FeederManager", "A feeder is a replication stream connection between a master and replica nodes.");
                this.nMaxReplicaLag = new LongMaxZeroStat(stats, FeederManagerStatDefinition.N_MAX_REPLICA_LAG);
                this.nMaxReplicaLagName = new StringStat(stats, FeederManagerStatDefinition.N_MAX_REPLICA_LAG_NAME);
            }
            this.testDelayMs = Feeder.this.feederManager.getTestDelayMs();
            if (this.testDelayMs > 0) {
                LoggerUtils.info(Feeder.this.logger, Feeder.this.repImpl, "Test delay of:" + this.testDelayMs + "ms. after each message sent");
            }
            this.vlsnIndex = Feeder.this.repNode.getVLSNIndex();
        }

        private boolean checkShutdown() throws IOException {
            if (!Feeder.this.shutdown.get()) {
                return false;
            }
            if (Feeder.this.repNode.getReplicaCloseCatchupMs() >= 0L) {
                boolean timedOut;
                if (this.shutdownRequestStart == 0L) {
                    this.shutdownRequestStart = System.currentTimeMillis();
                }
                boolean bl = timedOut = System.currentTimeMillis() - this.shutdownRequestStart > Feeder.this.repNode.getReplicaCloseCatchupMs();
                if (!timedOut && !Feeder.this.isArbiterFeeder && Feeder.this.feederVLSN.compareTo(Feeder.this.repNode.getCurrentTxnEndVLSN()) <= 0) {
                    return false;
                }
                Protocol protocol = this.protocol;
                protocol.getClass();
                this.writeMessage(new BaseProtocol.ShutdownRequest((BaseProtocol)protocol, this.shutdownRequestStart), Feeder.this.feederReplicaChannel);
                String shutdownMessage = String.format("Shutdown message sent to: %s. Feeder vlsn: %,d. Shutdown elapsed time: %,dms", Feeder.this.replicaNameIdPair, Feeder.this.feederVLSN.getSequence(), System.currentTimeMillis() - this.shutdownRequestStart);
                LoggerUtils.info(Feeder.this.logger, Feeder.this.repImpl, shutdownMessage);
                return true;
            }
            return true;
        }

        private void writeMessage(BinaryProtocol.Message message, NamedChannel namedChannel) throws IOException {
            assert (TestHookExecute.doHookIfSet(Feeder.this.writeMessageHook, message));
            this.protocol.write(message, namedChannel);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            this.protocol = Protocol.get(Feeder.this.repNode, Feeder.this.protocolVersion, Feeder.this.protocolVersion, Feeder.this.streamLogVersion);
            Thread.currentThread().setName("Feeder Output for " + Feeder.this.getReplicaNameIdPair().getName());
            VLSNRange range = this.vlsnIndex.getRange();
            LoggerUtils.info(Feeder.this.logger, Feeder.this.repImpl, String.format("Feeder output thread for replica %s started at VLSN %,d master at %,d (DTVLSN:%,d) VLSN delta=%,d socket=%s", Feeder.this.replicaNameIdPair.getName(), Feeder.this.feederVLSN.getSequence(), range.getLast().getSequence(), Feeder.this.repNode.getAnyDTVLSN(), range.getLast().getSequence() - Feeder.this.feederVLSN.getSequence(), Feeder.this.feederReplicaChannel));
            Error feederOutputError = null;
            Exception shutdownException = null;
            try {
                this.sendHeartbeat();
                int timeoutMs = Feeder.this.repNode.getConfigManager().getDuration(RepParams.FEEDER_TIMEOUT);
                Feeder.this.feederReplicaChannel.setTimeoutMs(timeoutMs);
                while (!this.checkShutdown()) {
                    if (Feeder.this.feederVLSN.compareTo(Feeder.this.repNode.getCurrentTxnEndVLSN()) >= 0) {
                        Feeder.this.repNode.getArbiter().endArbitration();
                    }
                    Feeder.this.doSecurityCheck();
                    this.writeAvailableEntries();
                    Feeder.this.masterStatus.assertSync();
                    this.sendHeartbeat();
                    if (this.testDelayMs <= 0) continue;
                    Thread.sleep(this.testDelayMs);
                }
            }
            catch (IOException e) {
                shutdownException = e;
            }
            catch (MasterStatus.MasterSyncException e) {
                shutdownException = e;
            }
            catch (InterruptedException e) {
                shutdownException = e;
            }
            catch (ReplicationSecurityException ure) {
                shutdownException = ure;
                LoggerUtils.warning(Feeder.this.logger, Feeder.this.repImpl, "Unauthorized replication stream consumer " + ure.getConsumer() + ", exception: " + ure.getMessage());
            }
            catch (RuntimeException e) {
                shutdownException = e;
                LoggerUtils.severe(Feeder.this.logger, Feeder.this.repImpl, "Unexpected exception: " + e.getMessage() + LoggerUtils.getStackTrace(e));
                throw e;
            }
            catch (Error e) {
                feederOutputError = e;
                Feeder.this.repNode.getRepImpl().invalidate(e);
            }
            finally {
                if (feederOutputError != null) {
                    throw feederOutputError;
                }
                LoggerUtils.info(Feeder.this.logger, Feeder.this.repImpl, "Feeder output for " + Feeder.this.replicaNameIdPair.getName() + " shutdown. feeder VLSN: " + Feeder.this.feederVLSN + " currentTxnEndVLSN: " + Feeder.this.repNode.getCurrentTxnEndVLSN());
                Feeder.this.shutdown(shutdownException);
                this.cleanup();
            }
        }

        private void writeAvailableEntries() throws DatabaseException, InterruptedException, IOException, MasterStatus.MasterSyncException {
            long batchLimitNs = System.nanoTime() + (long)Feeder.this.heartbeatMs * 1000000L;
            boolean batchNeedsAcks = false;
            int nMessages = 0;
            this.batchBuff.clear();
            do {
                BaseProtocol.Commit commit;
                OutputWireRecord record = Feeder.this.feederSource.getWireRecord(Feeder.this.feederVLSN, Feeder.this.heartbeatMs);
                Feeder.this.masterStatus.assertSync();
                if (record == null) {
                    this.lastCommitTimestamp = Feeder.this.repNode.getFeederTxns().getLastCommitTimestamp().get();
                    this.lastCommitVLSN = Feeder.this.repNode.getFeederTxns().getLastCommitVLSN().get();
                    break;
                }
                if (Feeder.this.feederFilter != null && (record = Feeder.this.feederFilter.execute(record, Feeder.this.repImpl)) == null) {
                    Feeder.this.feederVLSN = Feeder.this.feederVLSN.getNext();
                    continue;
                }
                long txnId = record.getCommitTxnId();
                long commitTimestamp = record.getCommitTimeStamp();
                if (commitTimestamp != 0L) {
                    this.lastCommitTimestamp = commitTimestamp;
                    this.lastCommitVLSN = record.getVLSN().getSequence();
                }
                if (this.commitToNetwork && txnId != 0L) {
                    Feeder.this.deemAcked(txnId);
                }
                if (Feeder.this.isArbiterFeeder) {
                    Feeder.this.feederVLSN = record.getVLSN();
                }
                this.validate(record);
                BinaryProtocol.Message message = this.createMessage(txnId, record);
                if (!batchNeedsAcks && txnId != 0L && (commit = (BaseProtocol.Commit)message).getNeedsAck()) {
                    batchNeedsAcks = true;
                    long ackLimitNs = System.nanoTime() + (long)this.batchNs;
                    long l = batchLimitNs = ackLimitNs < batchLimitNs ? ackLimitNs : batchLimitNs;
                }
                assert (TestHookExecute.doHookIfSet(Feeder.this.writeMessageHook, message));
                ++nMessages;
                nMessages = this.protocol.bufferWrite(Feeder.this.feederReplicaChannel, this.batchBuff, nMessages, message);
                Feeder.this.feederVLSN = Feeder.this.feederVLSN.getNext();
            } while (this.testDelayMs == 0 && this.vlsnIndex.getLatestAllocatedVal() >= Feeder.this.feederVLSN.getSequence() && System.nanoTime() - batchLimitNs < 0L);
            if (this.batchBuff.position() == 0) {
                return;
            }
            this.protocol.flushBufferedWrites(Feeder.this.feederReplicaChannel, this.batchBuff, nMessages);
        }

        private void sendHeartbeat() throws IOException {
            long now = System.currentTimeMillis();
            long interval = now - Feeder.this.lastHeartbeatTime;
            if (interval <= (long)Feeder.this.heartbeatMs) {
                return;
            }
            VLSN vlsn = Feeder.this.repNode.getCurrentTxnEndVLSN();
            Protocol protocol = this.protocol;
            protocol.getClass();
            this.writeMessage(new BaseProtocol.Heartbeat(protocol, now, vlsn.getSequence()), Feeder.this.feederReplicaChannel);
            Feeder.this.lastHeartbeatTime = now;
            if (Feeder.this.isArbiterFeeder) {
                return;
            }
            if (this.lastCommitTimestamp != 0L) {
                Feeder.this.lastHeartbeatCommitTimestamp = this.lastCommitTimestamp;
                Feeder.this.lastHeartbeatCommitVLSN = this.lastCommitVLSN;
            } else {
                Feeder.this.lastHeartbeatCommitTimestamp = 0L;
                Feeder.this.lastHeartbeatCommitVLSN = 0L;
            }
            long lag = vlsn.getSequence() - Feeder.this.feederVLSN.getSequence();
            if (this.nMaxReplicaLag.setMax(lag)) {
                this.nMaxReplicaLagName.set(Feeder.this.replicaNameIdPair.getName());
            }
        }

        @Override
        protected int initiateSoftShutdown() {
            RepUtils.shutdownChannel(Feeder.this.feederReplicaChannel);
            return Feeder.this.repNode.getThreadWaitInterval();
        }

        private BinaryProtocol.Message createMessage(long txnId, OutputWireRecord wireRecord) throws DatabaseException {
            boolean needsAck;
            if (txnId == 0L) {
                Protocol protocol = this.protocol;
                protocol.getClass();
                return new BaseProtocol.Entry((BaseProtocol)protocol, wireRecord);
            }
            MasterTxn ackTxn = Feeder.this.repNode.getFeederTxns().getAckTxn(txnId);
            Durability.SyncPolicy replicaSync = Durability.SyncPolicy.NO_SYNC;
            if (ackTxn != null) {
                ackTxn.stampRepWriteTime();
                long messageTransferMs = ackTxn.messageTransferMs();
                this.totalTransferDelay += messageTransferMs;
                if (messageTransferMs > (long)this.transferLoggingThresholdMs) {
                    String message = String.format("Feeder for: %s, Txn: %,d  log to rep stream time %,dms. Total transfer time: %,dms.", Feeder.this.replicaNameIdPair.getName(), txnId, messageTransferMs, this.totalTransferDelay);
                    LoggerUtils.info(Feeder.this.logger, Feeder.this.repImpl, message);
                }
                needsAck = !this.commitToNetwork && Feeder.this.repNode.getDurabilityQuorum().replicaAcksQualify(Feeder.this.replicaNode);
                replicaSync = ackTxn.getCommitDurability().getReplicaSync();
            } else {
                needsAck = false;
                replicaSync = Durability.SyncPolicy.NO_SYNC;
            }
            Protocol protocol = this.protocol;
            protocol.getClass();
            return new BaseProtocol.Commit((BaseProtocol)protocol, needsAck, replicaSync, wireRecord);
        }

        private void validate(OutputWireRecord record) {
            if (!record.getVLSN().equals(Feeder.this.feederVLSN)) {
                throw EnvironmentFailureException.unexpectedState("Expected VLSN:" + Feeder.this.feederVLSN + " log entry VLSN:" + record.getVLSN());
            }
            if (!Feeder.this.repImpl.isRepConverted()) assert (record.verifyNegativeSequences("node=" + Feeder.this.nameIdPair));
        }

        @Override
        protected Logger getLogger() {
            return Feeder.this.logger;
        }
    }

    private class InputThread
    extends StoppableThread {
        Protocol protocol;
        private LocalCBVLSNUpdater replicaCBVLSN;
        private volatile LongDiffStat replicaDelay;
        private volatile AtomicLongComponent replicaLastCommitTimestamp;
        private volatile AtomicLongComponent replicaLastCommitVLSN;
        private volatile LongDiffStat replicaVLSNLag;
        private volatile LongAvgRate replicaVLSNRate;

        InputThread() {
            super(Feeder.this.repImpl, new IOThreadsHandler(), "Feeder Input");
            this.protocol = null;
        }

        @Override
        public void run() {
            Error feederInputError = null;
            Exception shutdownException = null;
            try {
                FeederReplicaHandshake handshake = new FeederReplicaHandshake(Feeder.this.repNode, Feeder.this, Feeder.this.feederReplicaChannel);
                this.protocol = handshake.execute();
                Feeder.this.protocolVersion = this.protocol.getVersion();
                Feeder.this.replicaNameIdPair = handshake.getReplicaNameIdPair();
                Feeder.this.streamLogVersion = handshake.getStreamLogVersion();
                Feeder.this.replicaJEVersion = handshake.getReplicaJEVersion();
                Feeder.this.replicaNode = handshake.getReplicaNode();
                Thread.currentThread().setName("Feeder Input for " + Feeder.this.replicaNameIdPair.getName());
                if (Feeder.this.replicaNode.getType().isArbiter()) {
                    Feeder.this.initArbiterFeederSource();
                } else {
                    FeederReplicaSyncup syncup = new FeederReplicaSyncup(Feeder.this, Feeder.this.feederReplicaChannel, this.protocol);
                    if (Feeder.this.replicaNode.getType().isDataNode()) {
                        this.replicaCBVLSN = new LocalCBVLSNUpdater(Feeder.this.replicaNameIdPair, Feeder.this.replicaNode.getType(), Feeder.this.repNode);
                    }
                    syncup.execute();
                }
                this.replicaDelay = Feeder.this.feederManager.getReplicaDelayMap().createStat(Feeder.this.replicaNameIdPair.getName(), Feeder.this.repNode.getFeederTxns().getLastCommitTimestamp());
                this.replicaLastCommitTimestamp = Feeder.this.feederManager.getReplicaLastCommitTimestampMap().createStat(Feeder.this.replicaNameIdPair.getName());
                this.replicaLastCommitVLSN = Feeder.this.feederManager.getReplicaLastCommitVLSNMap().createStat(Feeder.this.replicaNameIdPair.getName());
                this.replicaVLSNLag = Feeder.this.feederManager.getReplicaVLSNLagMap().createStat(Feeder.this.replicaNameIdPair.getName(), Feeder.this.repNode.getFeederTxns().getLastCommitVLSN());
                this.replicaVLSNRate = Feeder.this.feederManager.getReplicaVLSNRateMap().createStat(Feeder.this.replicaNameIdPair.getName());
                Feeder.this.outputThread.start();
                Feeder.this.lastResponseTime = System.currentTimeMillis();
                Feeder.this.masterStatus.assertSync();
                Feeder.this.feederManager.activateFeeder(Feeder.this);
                this.runResponseLoop();
            }
            catch (ReplicationSecurityException ue) {
                shutdownException = ue;
                LoggerUtils.warning(Feeder.this.logger, Feeder.this.repImpl, ue.getMessage());
            }
            catch (FeederReplicaSyncup.NetworkRestoreException e) {
                shutdownException = e;
                LoggerUtils.info(Feeder.this.logger, Feeder.this.repImpl, e.getMessage());
            }
            catch (IOException e) {
                shutdownException = e;
            }
            catch (MasterStatus.MasterSyncException e) {
                shutdownException = e;
            }
            catch (InterruptedException e) {
                shutdownException = e;
            }
            catch (ExitException e) {
                shutdownException = e;
                LoggerUtils.warning(Feeder.this.logger, Feeder.this.repImpl, "Exiting feeder loop: " + e.getMessage());
            }
            catch (Error e) {
                feederInputError = e;
                Feeder.this.repNode.getRepImpl().invalidate(e);
            }
            catch (ChecksumException e) {
                shutdownException = e;
                throw new EnvironmentFailureException((EnvironmentImpl)Feeder.this.repNode.getRepImpl(), EnvironmentFailureReason.LOG_CHECKSUM, (Throwable)e);
            }
            catch (RuntimeException e) {
                shutdownException = e;
                LoggerUtils.severe(Feeder.this.logger, Feeder.this.repImpl, "Unexpected exception: " + e.getMessage() + LoggerUtils.getStackTrace(e));
                throw e;
            }
            finally {
                if (feederInputError != null) {
                    throw feederInputError;
                }
                Feeder.this.shutdown(shutdownException);
                this.cleanup();
            }
        }

        private void runResponseLoop() throws IOException, MasterStatus.MasterSyncException {
            while (!this.checkShutdown()) {
                BinaryProtocol.Message response = this.protocol.read(Feeder.this.feederReplicaChannel);
                if (this.checkShutdown()) break;
                Feeder.this.masterStatus.assertSync();
                Feeder.this.lastResponseTime = System.currentTimeMillis();
                if (response.getOp() == Protocol.HEARTBEAT_RESPONSE) {
                    this.processHeartbeatResponse(response);
                    continue;
                }
                if (response.getOp() == Protocol.ACK) {
                    long txnId = ((BaseProtocol.Ack)response).getTxnId();
                    if (Feeder.this.logger.isLoggable(Level.FINE)) {
                        LoggerUtils.fine(Feeder.this.logger, Feeder.this.repImpl, "Ack for: " + txnId);
                    }
                    Feeder.this.deemAcked(txnId);
                    continue;
                }
                if (response.getOp() == Protocol.GROUP_ACK) {
                    long[] txnIds;
                    for (long txnId : txnIds = ((BaseProtocol.GroupAck)response).getTxnIds()) {
                        if (Feeder.this.logger.isLoggable(Level.FINE)) {
                            LoggerUtils.fine(Feeder.this.logger, Feeder.this.repImpl, "Group Ack for: " + txnId);
                        }
                        Feeder.this.deemAcked(txnId);
                    }
                    continue;
                }
                if (response.getOp() == Protocol.SHUTDOWN_RESPONSE) {
                    LoggerUtils.info(Feeder.this.logger, Feeder.this.repImpl, "Shutdown confirmed by replica " + Feeder.this.replicaNameIdPair.getName());
                    break;
                }
                if (response.getOp() == Protocol.REAUTHENTICATE) {
                    Feeder.this.processReauthenticate(response);
                    continue;
                }
                throw EnvironmentFailureException.unexpectedState("Unexpected message: " + response);
            }
        }

        private void processHeartbeatResponse(BinaryProtocol.Message response) {
            long statCommitTimestamp;
            VLSN replicaTxnVLSN;
            BaseProtocol.HeartbeatResponse hbResponse = (BaseProtocol.HeartbeatResponse)response;
            if (Feeder.this.replicaNode.getType().isArbiter()) {
                return;
            }
            if (this.replicaCBVLSN != null) {
                this.replicaCBVLSN.updateForReplica(hbResponse);
            }
            if ((replicaTxnVLSN = hbResponse.getTxnEndVLSN()) == null) {
                return;
            }
            Feeder.this.replicaTxnEndVLSN = replicaTxnVLSN;
            long replicaTxnVLSNSeq = replicaTxnVLSN.getSequence();
            Feeder.this.feederManager.updateDTVLSN(replicaTxnVLSNSeq);
            if (replicaTxnVLSN.compareTo(Feeder.this.repNode.getCurrentTxnEndVLSN()) >= 0) {
                Feeder.this.caughtUp = true;
                Feeder.this.adviseMasterTransferProgress();
            }
            long commitVLSN = Feeder.this.lastHeartbeatCommitVLSN;
            long commitTimestamp = Feeder.this.lastHeartbeatCommitTimestamp;
            if (commitVLSN == 0L || commitTimestamp == 0L) {
                return;
            }
            long statCommitVLSN = commitVLSN <= replicaTxnVLSNSeq ? commitVLSN : replicaTxnVLSNSeq;
            this.replicaLastCommitVLSN.set(statCommitVLSN);
            this.replicaVLSNLag.set(statCommitVLSN, Feeder.this.lastResponseTime);
            this.replicaVLSNRate.add(statCommitVLSN, Feeder.this.lastResponseTime);
            if (commitVLSN <= replicaTxnVLSNSeq) {
                statCommitTimestamp = commitTimestamp;
            } else {
                long vlsnRatePerMinute = Feeder.this.vlsnRate.get();
                if (vlsnRatePerMinute <= 0L) {
                    return;
                }
                long vlsnLag = commitVLSN - replicaTxnVLSNSeq;
                long timeLagMillis = (long)(60000.0 * ((double)vlsnLag / (double)vlsnRatePerMinute));
                statCommitTimestamp = commitTimestamp - timeLagMillis;
            }
            this.replicaLastCommitTimestamp.set(statCommitTimestamp);
            this.replicaDelay.set(statCommitTimestamp, Feeder.this.lastResponseTime);
        }

        private boolean checkShutdown() {
            return Feeder.this.shutdown.get() && Feeder.this.repNode.getReplicaCloseCatchupMs() < 0L;
        }

        @Override
        protected int initiateSoftShutdown() {
            RepUtils.shutdownChannel(Feeder.this.feederReplicaChannel);
            return Feeder.this.repNode.getThreadWaitInterval();
        }

        @Override
        protected Logger getLogger() {
            return Feeder.this.logger;
        }
    }
}

