/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.server.impl;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.FollowerInfo;
import org.apache.ratis.server.impl.LeaderState;
import org.apache.ratis.server.impl.LogAppender;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.DataQueue;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogAppender {
    public static final Logger LOG = LoggerFactory.getLogger(LogAppender.class);
    private final String name;
    private final RaftServerImpl server;
    private final LeaderState leaderState;
    private final RaftLog raftLog;
    private final FollowerInfo follower;
    private final DataQueue<RaftLog.EntryWithData> buffer;
    private final int snapshotChunkMaxSize;
    private final long halfMinTimeoutMs;
    private final AppenderDaemon daemon;

    public LogAppender(RaftServerImpl server, LeaderState leaderState, FollowerInfo f) {
        this.follower = f;
        this.name = this.follower.getName() + "-" + this.getClass().getSimpleName();
        this.server = server;
        this.leaderState = leaderState;
        this.raftLog = server.getState().getLog();
        RaftProperties properties = server.getProxy().getProperties();
        this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax((RaftProperties)properties).getSizeInt();
        this.halfMinTimeoutMs = server.getMinTimeoutMs() / 2;
        SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit((RaftProperties)properties);
        int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit((RaftProperties)properties);
        this.buffer = new DataQueue((Object)this, bufferByteLimit, bufferElementLimit, RaftLog.EntryWithData::getSerializedSize);
        this.daemon = new AppenderDaemon(this);
    }

    public RaftServerImpl getServer() {
        return this.server;
    }

    public RaftLog getRaftLog() {
        return this.raftLog;
    }

    public long getHalfMinTimeoutMs() {
        return this.halfMinTimeoutMs;
    }

    public String toString() {
        return this.name;
    }

    void startAppender() {
        this.daemon.start();
    }

    public boolean isAppenderRunning() {
        return this.daemon.isRunning();
    }

    public void stopAppender() {
        this.daemon.stop();
    }

    public FollowerInfo getFollower() {
        return this.follower;
    }

    protected RaftPeerId getFollowerId() {
        return this.getFollower().getPeer().getId();
    }

    private TermIndex getPrevious(long nextIndex) {
        TermIndex snapshotTermIndex;
        if (nextIndex == 0L) {
            return null;
        }
        long previousIndex = nextIndex - 1L;
        TermIndex previous = this.raftLog.getTermIndex(previousIndex);
        if (previous != null) {
            return previous;
        }
        SnapshotInfo snapshot = this.server.getState().getLatestSnapshot();
        if (snapshot != null && (snapshotTermIndex = snapshot.getTermIndex()).getIndex() == previousIndex) {
            return snapshotTermIndex;
        }
        return null;
    }

    protected RaftProtos.AppendEntriesRequestProto createRequest(long callId) throws RaftLogIOException {
        TermIndex previous = this.getPrevious(this.follower.getNextIndex());
        long snapshotIndex = this.follower.getSnapshotIndex();
        long heartbeatRemainingMs = this.getHeartbeatRemainingTime();
        if (heartbeatRemainingMs <= 0L) {
            return this.leaderState.newAppendEntriesRequestProto(this.getFollowerId(), previous, Collections.emptyList(), !this.follower.isAttendingVote(), callId);
        }
        Preconditions.assertTrue((boolean)this.buffer.isEmpty(), () -> "buffer has " + this.buffer.getNumElements() + " elements.");
        long leaderNext = this.raftLog.getNextIndex();
        long followerNext = this.follower.getNextIndex();
        long halfMs = heartbeatRemainingMs / 2L;
        long next = followerNext;
        while (leaderNext > next && this.getHeartbeatRemainingTime() - halfMs > 0L && this.buffer.offer((Object)this.raftLog.getEntryWithData(next++))) {
        }
        if (this.buffer.isEmpty()) {
            return null;
        }
        List protos = this.buffer.pollList(this.getHeartbeatRemainingTime(), RaftLog.EntryWithData::getEntry, (entry, time, exception) -> LOG.warn("{}: Failed to get {} in {}: {}", new Object[]{this.follower.getName(), entry, time, exception}));
        this.buffer.clear();
        this.assertProtos(protos, followerNext, previous, snapshotIndex);
        return this.leaderState.newAppendEntriesRequestProto(this.getFollowerId(), previous, protos, !this.follower.isAttendingVote(), callId);
    }

    private void assertProtos(List<RaftProtos.LogEntryProto> protos, long nextIndex, TermIndex previous, long snapshotIndex) {
        if (protos.isEmpty()) {
            return;
        }
        long firstIndex = protos.get(0).getIndex();
        Preconditions.assertTrue((firstIndex == nextIndex ? 1 : 0) != 0, () -> this.follower.getName() + ": firstIndex = " + firstIndex + " != nextIndex = " + nextIndex);
        if (firstIndex > 0L && nextIndex != snapshotIndex + 1L) {
            Objects.requireNonNull(previous, () -> this.follower.getName() + ": Previous TermIndex not found for firstIndex = " + firstIndex);
            Preconditions.assertTrue((previous.getIndex() == firstIndex - 1L ? 1 : 0) != 0, () -> this.follower.getName() + ": Previous = " + previous + " but firstIndex = " + firstIndex);
        }
    }

    private RaftProtos.AppendEntriesReplyProto sendAppendEntriesWithRetries() throws InterruptedException, InterruptedIOException, RaftLogIOException {
        int retry = 0;
        RaftProtos.AppendEntriesRequestProto request = null;
        while (this.isAppenderRunning()) {
            try {
                if (request == null || request.getEntriesCount() == 0) {
                    request = this.createRequest(0L);
                }
                if (request == null) {
                    LOG.trace("{} no entries to send now, wait ...", (Object)this);
                    return null;
                }
                if (!this.isAppenderRunning()) {
                    LOG.info("{} is stopped. Skip appendEntries.", (Object)this);
                    return null;
                }
                this.follower.updateLastRpcSendTime();
                RaftProtos.AppendEntriesReplyProto r = this.server.getServerRpc().appendEntries(request);
                this.follower.updateLastRpcResponseTime();
                this.updateCommitIndex(r.getFollowerCommit());
                return r;
            }
            catch (InterruptedIOException | RaftLogIOException e) {
                throw e;
            }
            catch (IOException ioe) {
                if (retry++ % 10 == 0) {
                    LOG.warn("{}: Failed to appendEntries (retry={}): {}", new Object[]{this, retry++, ioe});
                }
                this.handleException((Exception)ioe);
                if (!this.isAppenderRunning()) continue;
                this.leaderState.getSyncInterval().sleep();
            }
        }
        return null;
    }

    protected void updateCommitIndex(long commitIndex) {
        if (this.follower.updateCommitIndex(commitIndex)) {
            this.leaderState.commitIndexChanged();
        }
    }

    protected RaftProtos.InstallSnapshotRequestProto createInstallSnapshotNotificationRequest(TermIndex firstLogStartTermIndex) {
        return this.server.createInstallSnapshotRequest(this.getFollowerId(), firstLogStartTermIndex);
    }

    private RaftProtos.FileChunkProto readFileChunk(FileInfo fileInfo, FileInputStream in, byte[] buf, int length, long offset, int chunkIndex) throws IOException {
        RaftProtos.FileChunkProto.Builder builder = RaftProtos.FileChunkProto.newBuilder().setOffset(offset).setChunkIndex(chunkIndex);
        IOUtils.readFully((InputStream)in, (byte[])buf, (int)0, (int)length);
        Path relativePath = this.server.getState().getStorage().getStorageDir().relativizeToRoot(fileInfo.getPath());
        builder.setFilename(relativePath.toString());
        builder.setDone(offset + (long)length == fileInfo.getFileSize());
        builder.setFileDigest(ByteString.copyFrom((byte[])fileInfo.getFileDigest().getDigest()));
        builder.setData(ByteString.copyFrom((byte[])buf, (int)0, (int)length));
        return builder.build();
    }

    private RaftProtos.InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
        String requestId = UUID.randomUUID().toString();
        RaftProtos.InstallSnapshotReplyProto reply = null;
        try {
            for (RaftProtos.InstallSnapshotRequestProto request : new SnapshotRequestIter(this, snapshot, requestId)) {
                this.follower.updateLastRpcSendTime();
                reply = this.server.getServerRpc().installSnapshot(request);
                this.follower.updateLastRpcResponseTime();
                if (reply.getServerReply().getSuccess()) continue;
                return reply;
            }
        }
        catch (InterruptedIOException iioe) {
            throw iioe;
        }
        catch (Exception ioe) {
            LOG.warn("{}: Failed to installSnapshot {}: {}", new Object[]{this, snapshot, ioe});
            this.handleException(ioe);
            return null;
        }
        if (reply != null) {
            this.follower.setSnapshotIndex(snapshot.getTermIndex().getIndex());
            LOG.info("{}: installSnapshot {} successfully", (Object)this, (Object)snapshot);
            this.server.getRaftServerMetrics().getCounter("numInstallSnapshot").inc();
        }
        return reply;
    }

    protected SnapshotInfo shouldInstallSnapshot() {
        long logStartIndex = this.raftLog.getStartIndex();
        if (this.follower.getNextIndex() < this.raftLog.getNextIndex()) {
            SnapshotInfo snapshot = this.server.getState().getLatestSnapshot();
            if (this.follower.getNextIndex() < logStartIndex || logStartIndex == -1L && snapshot != null) {
                return snapshot;
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void runAppenderImpl() throws InterruptedException, IOException {
        while (this.isAppenderRunning()) {
            long waitTime;
            if (this.shouldSendRequest()) {
                RaftProtos.AppendEntriesReplyProto r;
                SnapshotInfo snapshot = this.shouldInstallSnapshot();
                if (snapshot != null) {
                    LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower", new Object[]{this, this.follower.getNextIndex(), this.raftLog.getStartIndex(), snapshot});
                    r = this.installSnapshot(snapshot);
                    if (r != null && r.getResult() == RaftProtos.InstallSnapshotResult.NOT_LEADER) {
                        this.checkResponseTerm(r.getTerm());
                    }
                } else {
                    r = this.sendAppendEntriesWithRetries();
                    if (r != null) {
                        this.handleReply(r);
                    }
                }
            }
            if (this.isAppenderRunning() && !this.shouldAppendEntries(this.follower.getNextIndex()) && (waitTime = this.getHeartbeatRemainingTime()) > 0L) {
                LogAppender logAppender = this;
                synchronized (logAppender) {
                    this.wait(waitTime);
                }
            }
            this.checkSlowness();
        }
    }

    private void handleReply(RaftProtos.AppendEntriesReplyProto reply) throws IllegalArgumentException {
        if (reply != null) {
            switch (1.$SwitchMap$org$apache$ratis$proto$RaftProtos$AppendEntriesReplyProto$AppendResult[reply.getResult().ordinal()]) {
                case 1: {
                    long oldNextIndex = this.follower.getNextIndex();
                    long nextIndex = reply.getNextIndex();
                    if (nextIndex < oldNextIndex) {
                        throw new IllegalStateException("nextIndex=" + nextIndex + " < oldNextIndex=" + oldNextIndex + ", reply=" + ServerProtoUtils.toString((RaftProtos.AppendEntriesReplyProto)reply));
                    }
                    if (nextIndex <= oldNextIndex) break;
                    this.follower.updateMatchIndex(nextIndex - 1L);
                    this.follower.increaseNextIndex(nextIndex);
                    this.submitEventOnSuccessAppend();
                    break;
                }
                case 2: {
                    this.checkResponseTerm(reply.getTerm());
                    break;
                }
                case 3: {
                    this.follower.decreaseNextIndex(reply.getNextIndex());
                    break;
                }
                case 4: {
                    LOG.warn("{}: received {}", (Object)this, (Object)reply.getResult());
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unable to process result " + reply.getResult());
                }
            }
        }
    }

    private void handleException(Exception e) {
        LOG.trace("TRACE", (Throwable)e);
        this.server.getServerRpc().handleException(this.follower.getPeer().getId(), e, false);
    }

    protected void submitEventOnSuccessAppend() {
        if (this.follower.isAttendingVote()) {
            this.leaderState.submitUpdateCommitEvent();
        } else {
            this.leaderState.submitCheckStagingEvent();
        }
    }

    protected void checkSlowness() {
        if (this.follower.isSlow()) {
            this.server.getStateMachine().notifySlowness(this.server.getRoleInfoProto());
        }
        this.leaderState.recordFollowerHeartbeatElapsedTime(this.follower.getPeer(), this.follower.getLastRpcResponseTime().elapsedTime().getDuration());
    }

    public synchronized void notifyAppend() {
        this.notify();
    }

    protected boolean shouldSendRequest() {
        return this.shouldAppendEntries(this.follower.getNextIndex()) || this.shouldHeartbeat();
    }

    private boolean shouldAppendEntries(long followerIndex) {
        return followerIndex < this.raftLog.getNextIndex();
    }

    protected boolean shouldHeartbeat() {
        return this.getHeartbeatRemainingTime() <= 0L;
    }

    protected long getHeartbeatRemainingTime() {
        return this.halfMinTimeoutMs - this.follower.getLastRpcTime().elapsedTimeMs();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean checkResponseTerm(long responseTerm) {
        RaftServerImpl raftServerImpl = this.server;
        synchronized (raftServerImpl) {
            if (this.isAppenderRunning() && this.follower.isAttendingVote() && responseTerm > this.leaderState.getCurrentTerm()) {
                this.leaderState.submitStepDownEvent(responseTerm);
                return true;
            }
        }
        return false;
    }

    static /* synthetic */ LeaderState access$000(LogAppender x0) {
        return x0.leaderState;
    }

    static /* synthetic */ int access$100(LogAppender x0) {
        return x0.snapshotChunkMaxSize;
    }

    static /* synthetic */ RaftProtos.FileChunkProto access$1100(LogAppender x0, FileInfo x1, FileInputStream x2, byte[] x3, int x4, long x5, int x6) throws IOException {
        return x0.readFileChunk(x1, x2, x3, x4, x5, x6);
    }

    static /* synthetic */ FollowerInfo access$1200(LogAppender x0) {
        return x0.follower;
    }

    static /* synthetic */ RaftServerImpl access$1600(LogAppender x0) {
        return x0.server;
    }
}

