/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.raft.roles;

import com.google.common.base.Throwables;
import io.atomix.raft.RaftError;
import io.atomix.raft.RaftException;
import io.atomix.raft.RaftServer;
import io.atomix.raft.cluster.RaftMember;
import io.atomix.raft.cluster.impl.DefaultRaftMember;
import io.atomix.raft.cluster.impl.RaftMemberContext;
import io.atomix.raft.impl.RaftContext;
import io.atomix.raft.protocol.AppendRequest;
import io.atomix.raft.protocol.AppendResponse;
import io.atomix.raft.protocol.ConfigureRequest;
import io.atomix.raft.protocol.ConfigureResponse;
import io.atomix.raft.protocol.PollRequest;
import io.atomix.raft.protocol.PollResponse;
import io.atomix.raft.protocol.RaftResponse;
import io.atomix.raft.protocol.ReconfigureRequest;
import io.atomix.raft.protocol.ReconfigureResponse;
import io.atomix.raft.protocol.TransferRequest;
import io.atomix.raft.protocol.TransferResponse;
import io.atomix.raft.protocol.VoteRequest;
import io.atomix.raft.protocol.VoteResponse;
import io.atomix.raft.roles.ActiveRole;
import io.atomix.raft.roles.LeaderAppender;
import io.atomix.raft.roles.RaftRole;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.storage.log.entry.ApplicationEntry;
import io.atomix.raft.storage.log.entry.ConfigurationEntry;
import io.atomix.raft.storage.log.entry.InitialEntry;
import io.atomix.raft.storage.log.entry.RaftLogEntry;
import io.atomix.raft.storage.system.Configuration;
import io.atomix.raft.zeebe.ValidationResult;
import io.atomix.raft.zeebe.ZeebeLogAppender;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Scheduled;
import io.camunda.zeebe.journal.JournalException;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;

public final class LeaderRole
extends ActiveRole
implements ZeebeLogAppender {
    private static final int MAX_APPEND_ATTEMPTS = 5;
    private final LeaderAppender appender = new LeaderAppender(this);
    private Scheduled appendTimer;
    private long configuring;
    private CompletableFuture<Void> commitInitialEntriesFuture;
    private ApplicationEntry lastZbEntry = null;

    public LeaderRole(RaftContext context) {
        super(context);
    }

    @Override
    public synchronized CompletableFuture<RaftRole> start() {
        this.raft.getRaftRoleMetrics().setElectionLatency(System.currentTimeMillis() - this.raft.getLastHeartbeat());
        this.takeLeadership();
        this.appendInitialEntries().join();
        this.commitInitialEntriesFuture = this.commitInitialEntries();
        this.lastZbEntry = this.findLastZeebeEntry();
        return ((CompletableFuture)super.start().thenRun(this::startTimers)).thenApply(v -> this);
    }

    @Override
    public synchronized CompletableFuture<Void> stop() {
        this.raft.resetLastHeartbeat();
        return ((CompletableFuture)((CompletableFuture)super.stop().thenRun(this.appender::close)).thenRun(this::cancelTimers)).thenRun(this::stepDown);
    }

    @Override
    protected PersistedSnapshotListener createSnapshotListener() {
        return null;
    }

    @Override
    public RaftServer.Role role() {
        return RaftServer.Role.LEADER;
    }

    @Override
    public CompletableFuture<ReconfigureResponse> onReconfigure(ReconfigureRequest request) {
        this.raft.checkThread();
        this.logRequest(request);
        if (this.configuring() || this.initializing()) {
            return CompletableFuture.completedFuture(this.logResponse(((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.ERROR)).build()));
        }
        DefaultRaftMember existingMember = this.raft.getCluster().getMember(request.member().memberId());
        if (existingMember == null) {
            return CompletableFuture.completedFuture(this.logResponse(((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.UNKNOWN_SESSION)).build()));
        }
        if (request.index() > 0L && request.index() < this.raft.getCluster().getConfiguration().index() || request.term() != this.raft.getCluster().getConfiguration().term()) {
            return CompletableFuture.completedFuture(this.logResponse(((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.CONFIGURATION_ERROR)).build()));
        }
        if (existingMember.getType() == request.member().getType()) {
            Configuration configuration = this.raft.getCluster().getConfiguration();
            return CompletableFuture.completedFuture(this.logResponse(((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.OK)).withIndex(configuration.index())).withTerm(this.raft.getCluster().getConfiguration().term())).withTime(this.raft.getCluster().getConfiguration().time())).withMembers(configuration.members())).build()));
        }
        existingMember.update(request.member().getType(), Instant.now());
        Collection<RaftMember> members = this.raft.getCluster().getMembers();
        CompletableFuture<ReconfigureResponse> future = new CompletableFuture<ReconfigureResponse>();
        this.configure(members).whenComplete((index, error) -> {
            if (error == null) {
                future.complete(this.logResponse(((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.OK)).withIndex((long)index)).withTerm(this.raft.getCluster().getConfiguration().term())).withTime(this.raft.getCluster().getConfiguration().time())).withMembers(members)).build()));
            } else {
                future.complete(this.logResponse(((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.PROTOCOL_ERROR)).build()));
            }
        });
        return future;
    }

    private ApplicationEntry findLastZeebeEntry() {
        IndexedRaftLogEntry lastEntry;
        RaftLogReader reader = this.raft.getLogReader();
        reader.seekToAsqn(Long.MAX_VALUE);
        if (reader.hasNext() && (lastEntry = (IndexedRaftLogEntry)reader.next()) != null && lastEntry.isApplicationEntry()) {
            return lastEntry.getApplicationEntry();
        }
        return null;
    }

    private void cancelTimers() {
        if (this.appendTimer != null) {
            this.log.trace("Cancelling append timer");
            this.appendTimer.cancel();
        }
    }

    private void stepDown() {
        if (this.raft.getLeader() != null && this.raft.getLeader().equals(this.raft.getCluster().getLocalMember())) {
            this.raft.setLeader(null);
        }
    }

    private void takeLeadership() {
        this.raft.setLeader(this.raft.getCluster().getLocalMember().memberId());
        this.raft.getCluster().getRemoteMemberStates().forEach(m -> m.resetState(this.raft.getLog()));
    }

    private CompletableFuture<Void> appendInitialEntries() {
        long term = this.raft.getTerm();
        return this.append(new RaftLogEntry(term, new InitialEntry())).thenApply(index -> null);
    }

    private CompletableFuture<Void> commitInitialEntries() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.appender.appendEntries(this.appender.getIndex()).whenComplete((resultIndex, error) -> {
            this.raft.checkThread();
            if (this.isRunning()) {
                if (error == null) {
                    future.complete(null);
                } else {
                    this.log.info("Failed to commit the initial entry, stepping down");
                    this.raft.setLeader(null);
                    this.raft.transition(RaftServer.Role.FOLLOWER);
                }
            }
        });
        return future;
    }

    private void startTimers() {
        this.log.trace("Starting append timer on fix rate of {}", (Object)this.raft.getHeartbeatInterval());
        this.appendTimer = this.raft.getThreadContext().schedule(Duration.ZERO, this.raft.getHeartbeatInterval(), this::appendMembers);
    }

    private void appendMembers() {
        this.raft.checkThread();
        if (this.isRunning()) {
            this.appender.appendEntries();
        }
    }

    private boolean configuring() {
        return this.configuring > 0L;
    }

    private boolean initializing() {
        return this.appender.getIndex() == 0L || this.raft.getCommitIndex() < this.appender.getIndex();
    }

    protected CompletableFuture<Long> configure(Collection<RaftMember> members) {
        this.raft.checkThread();
        long term = this.raft.getTerm();
        ConfigurationEntry configurationEntry = new ConfigurationEntry(System.currentTimeMillis(), members);
        return this.append(new RaftLogEntry(term, configurationEntry)).thenCompose(entry -> {
            this.configuring = entry.index();
            this.raft.getCluster().configure(new Configuration(entry.index(), entry.term(), configurationEntry.timestamp(), configurationEntry.members()));
            return this.appender.appendEntries(entry.index()).whenComplete((commitIndex, commitError) -> {
                this.raft.checkThread();
                this.configuring = 0L;
            });
        });
    }

    @Override
    public CompletableFuture<ConfigureResponse> onConfigure(ConfigureRequest request) {
        if (this.updateTermAndLeader(request.term(), request.leader())) {
            this.raft.transition(RaftServer.Role.FOLLOWER);
        }
        return super.onConfigure(request);
    }

    @Override
    public CompletableFuture<TransferResponse> onTransfer(TransferRequest request) {
        this.logRequest(request);
        RaftMemberContext member = this.raft.getCluster().getMemberState(request.member());
        if (member == null) {
            return CompletableFuture.completedFuture(this.logResponse(((TransferResponse.Builder)((TransferResponse.Builder)TransferResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.ILLEGAL_MEMBER_STATE)).build()));
        }
        CompletableFuture<TransferResponse> future = new CompletableFuture<TransferResponse>();
        this.appender.appendEntries(this.raft.getLog().getLastIndex()).whenComplete((result, error) -> {
            if (this.isRunning()) {
                if (error == null) {
                    this.log.info("Transferring leadership to {}", (Object)request.member());
                    this.raft.transition(RaftServer.Role.FOLLOWER);
                    future.complete(this.logResponse(((TransferResponse.Builder)TransferResponse.builder().withStatus(RaftResponse.Status.OK)).build()));
                } else if (error instanceof CompletionException && error.getCause() instanceof RaftException) {
                    future.complete(this.logResponse(((TransferResponse.Builder)((TransferResponse.Builder)TransferResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(((RaftException)error.getCause()).getType(), error.getMessage())).build()));
                } else if (error instanceof RaftException) {
                    future.complete(this.logResponse(((TransferResponse.Builder)((TransferResponse.Builder)TransferResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(((RaftException)error).getType(), error.getMessage())).build()));
                } else {
                    future.complete(this.logResponse(((TransferResponse.Builder)((TransferResponse.Builder)TransferResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.PROTOCOL_ERROR, error.getMessage())).build()));
                }
            } else {
                future.complete(this.logResponse(((TransferResponse.Builder)((TransferResponse.Builder)TransferResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.ILLEGAL_MEMBER_STATE)).build()));
            }
        });
        return future;
    }

    @Override
    public CompletableFuture<AppendResponse> onAppend(AppendRequest request) {
        this.raft.checkThread();
        if (this.updateTermAndLeader(request.term(), request.leader())) {
            CompletableFuture<AppendResponse> future = super.onAppend(request);
            this.raft.transition(RaftServer.Role.FOLLOWER);
            return future;
        }
        if (request.term() < this.raft.getTerm()) {
            this.logRequest(request);
            return CompletableFuture.completedFuture(this.logResponse(((AppendResponse.Builder)AppendResponse.builder().withStatus(RaftResponse.Status.OK)).withTerm(this.raft.getTerm()).withSucceeded(false).withLastLogIndex(this.raft.getLog().getLastIndex()).withLastSnapshotIndex(this.raft.getCurrentSnapshotIndex()).build()));
        }
        this.raft.setLeader(request.leader());
        this.raft.transition(RaftServer.Role.FOLLOWER);
        return super.onAppend(request);
    }

    @Override
    public CompletableFuture<PollResponse> onPoll(PollRequest request) {
        this.logRequest(request);
        RaftMemberContext member = this.raft.getCluster().getMemberState(request.candidate());
        if (member != null) {
            member.resetFailureCount();
        }
        return CompletableFuture.completedFuture(this.logResponse(((PollResponse.Builder)PollResponse.builder().withStatus(RaftResponse.Status.OK)).withTerm(this.raft.getTerm()).withAccepted(false).build()));
    }

    @Override
    public CompletableFuture<VoteResponse> onVote(VoteRequest request) {
        if (this.updateTermAndLeader(request.term(), null)) {
            this.log.info("Received greater term from {}", (Object)request.candidate());
            this.raft.transition(RaftServer.Role.FOLLOWER);
            return super.onVote(request);
        }
        this.logRequest(request);
        return CompletableFuture.completedFuture(this.logResponse(((VoteResponse.Builder)VoteResponse.builder().withStatus(RaftResponse.Status.OK)).withTerm(this.raft.getTerm()).withVoted(false).build()));
    }

    private CompletableFuture<IndexedRaftLogEntry> append(RaftLogEntry entry) {
        CompletableFuture resultingFuture = null;
        int retries = 0;
        do {
            try {
                resultingFuture = this.tryToAppend(entry);
            }
            catch (JournalException storageException) {
                if (++retries > 5) {
                    this.log.info("Failed to append after {} retries, stepping down", (Object)retries, (Object)storageException);
                    this.raft.transition(RaftServer.Role.FOLLOWER);
                    resultingFuture = Futures.exceptionalFuture((Throwable)storageException);
                }
                this.log.error("Error on appending entry {}, retry.", (Object)entry, (Object)storageException);
            }
            catch (Exception e) {
                this.log.error("Unexpected exception on appending entry {}.", (Object)entry, (Object)e);
                resultingFuture = Futures.exceptionalFuture((Throwable)e);
            }
        } while (resultingFuture == null);
        return resultingFuture;
    }

    private CompletableFuture<IndexedRaftLogEntry> tryToAppend(RaftLogEntry entry) {
        CompletableFuture<IndexedRaftLogEntry> resultingFuture = null;
        try {
            IndexedRaftLogEntry indexedEntry = this.raft.getLog().append(entry);
            this.raft.getReplicationMetrics().setAppendIndex(indexedEntry.index());
            this.log.trace("Appended {}", (Object)indexedEntry);
            resultingFuture = CompletableFuture.completedFuture(indexedEntry);
        }
        catch (JournalException.OutOfDiskSpace e) {
            this.log.warn("Out of disk space, stepping down", (Throwable)e);
            this.raft.transition(RaftServer.Role.FOLLOWER);
            resultingFuture = Futures.exceptionalFuture((Throwable)e);
        }
        return resultingFuture;
    }

    @Override
    public void appendEntry(long lowestPosition, long highestPosition, ByteBuffer data, ZeebeLogAppender.AppendListener appendListener) {
        this.raft.getThreadContext().execute(() -> this.safeAppendEntry(lowestPosition, highestPosition, data, appendListener));
    }

    private void safeAppendEntry(long lowestPosition, long highestPosition, ByteBuffer data, ZeebeLogAppender.AppendListener appendListener) {
        this.raft.checkThread();
        ApplicationEntry entry = new ApplicationEntry(lowestPosition, highestPosition, data);
        if (!this.isRunning()) {
            appendListener.onWriteError(new RaftException.NoLeader("LeaderRole is closed and cannot be used as appender", new Object[0]));
            return;
        }
        ValidationResult result = this.raft.getEntryValidator().validateEntry(this.lastZbEntry, entry);
        if (result.failed()) {
            appendListener.onWriteError(new IllegalStateException(result.getErrorMessage()));
            this.raft.transition(RaftServer.Role.FOLLOWER);
        } else {
            this.append(new RaftLogEntry(this.raft.getTerm(), entry)).whenComplete((indexed, error) -> {
                if (error != null) {
                    appendListener.onWriteError(Throwables.getRootCause((Throwable)error));
                    if (!(error instanceof JournalException)) {
                        this.log.info("Unexpected error occurred while appending to local log, stepping down");
                        this.raft.transition(RaftServer.Role.FOLLOWER);
                    }
                } else {
                    if (indexed.isApplicationEntry()) {
                        this.lastZbEntry = indexed.getApplicationEntry();
                    }
                    appendListener.onWrite((IndexedRaftLogEntry)indexed);
                    this.replicate((IndexedRaftLogEntry)indexed, appendListener);
                }
            });
        }
    }

    private void replicate(IndexedRaftLogEntry indexed, ZeebeLogAppender.AppendListener appendListener) {
        this.raft.checkThread();
        this.appender.appendEntries(indexed.index()).whenCompleteAsync((commitIndex, commitError) -> {
            if (!this.isRunning()) {
                return;
            }
            if (commitError == null) {
                appendListener.onCommit(indexed);
                this.raft.notifyCommitListeners(indexed);
            } else {
                appendListener.onCommitError(indexed, (Throwable)commitError);
                this.log.error("Failed to replicate entry: {}", (Object)indexed, commitError);
            }
        }, (Executor)this.raft.getThreadContext());
    }

    public synchronized void onInitialEntriesCommitted(Runnable runnable) {
        this.commitInitialEntriesFuture.whenComplete((v, error) -> {
            if (error == null) {
                runnable.run();
            }
        });
    }
}

