/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.server.raftlog.segmented;

import com.codahale.metrics.Timer;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.server.raftlog.segmented.LogSegment;
import org.apache.ratis.server.raftlog.segmented.LogSegmentStartEnd;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogOutputStream;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.DataBlockingQueue;
import org.apache.ratis.util.DataQueue;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * Exception performing whole class analysis ignored.
 */
class SegmentedRaftLogWorker {
    static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogWorker.class);
    static final TimeDuration ONE_SECOND = TimeDuration.valueOf((long)1L, (TimeUnit)TimeUnit.SECONDS);
    private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", (Object)this, s);
    private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", (Object)this, s);
    private final String name;
    private final DataBlockingQueue<SegmentedRaftLog.Task> queue;
    private final WriteLogTasks writeTasks = new WriteLogTasks();
    private volatile boolean running = true;
    private final ExecutorService workerThreadExecutor;
    private final RaftStorage storage;
    private volatile SegmentedRaftLogOutputStream out;
    private final Runnable submitUpdateCommitEvent;
    private final StateMachine stateMachine;
    private final Timer logFlushTimer;
    private final Timer raftLogSyncTimer;
    private final Timer raftLogQueueingTimer;
    private final Timer raftLogEnqueueingDelayTimer;
    private final SegmentedRaftLogMetrics raftLogMetrics;
    private final ByteBuffer writeBuffer;
    private final AtomicReference<byte[]> sharedBuffer;
    private int pendingFlushNum = 0;
    private long lastWrittenIndex;
    private final RaftLogIndex flushIndex = new RaftLogIndex((Object)"flushIndex", 0L);
    private final RaftLogIndex safeCacheEvictIndex = new RaftLogIndex((Object)"safeCacheEvictIndex", 0L);
    private final int forceSyncNum;
    private final long segmentMaxSize;
    private final long preallocatedSize;
    private final RaftServer.Division server;
    private int flushBatchSize;
    private final boolean asyncFlush;
    private final boolean unsafeFlush;
    private final ExecutorService flushExecutor;
    private final StateMachineDataPolicy stateMachineDataPolicy;

    SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent, RaftServer.Division server, RaftStorage storage, RaftProperties properties, SegmentedRaftLogMetrics metricRegistry) {
        this.name = memberId + "-" + JavaUtils.getClassSimpleName(this.getClass());
        LOG.info("new {} for {}", (Object)this.name, (Object)storage);
        this.submitUpdateCommitEvent = submitUpdateCommitEvent;
        this.stateMachine = stateMachine;
        this.raftLogMetrics = metricRegistry;
        this.storage = storage;
        this.server = server;
        SizeInBytes queueByteLimit = RaftServerConfigKeys.Log.queueByteLimit((RaftProperties)properties);
        int queueElementLimit = RaftServerConfigKeys.Log.queueElementLimit((RaftProperties)properties);
        this.queue = new DataBlockingQueue((Object)this.name, queueByteLimit, queueElementLimit, SegmentedRaftLog.Task::getSerializedSize);
        this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax((RaftProperties)properties).getSize();
        this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize((RaftProperties)properties).getSize();
        this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum((RaftProperties)properties);
        this.flushBatchSize = 0;
        this.stateMachineDataPolicy = new StateMachineDataPolicy(properties, metricRegistry);
        this.workerThreadExecutor = ConcurrentUtils.newSingleThreadExecutor((String)this.name);
        metricRegistry.addDataQueueSizeGauge((DataQueue)this.queue);
        metricRegistry.addLogWorkerQueueSizeGauge(WriteLogTasks.access$000((WriteLogTasks)this.writeTasks));
        metricRegistry.addFlushBatchSizeGauge(() -> () -> this.flushBatchSize);
        this.logFlushTimer = metricRegistry.getFlushTimer();
        this.raftLogSyncTimer = metricRegistry.getRaftLogSyncTimer();
        this.raftLogQueueingTimer = metricRegistry.getRaftLogQueueTimer();
        this.raftLogEnqueueingDelayTimer = metricRegistry.getRaftLogEnqueueDelayTimer();
        int bufferSize = RaftServerConfigKeys.Log.writeBufferSize((RaftProperties)properties).getSizeInt();
        this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
        int logEntryLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit((RaftProperties)properties).getSizeInt();
        this.sharedBuffer = new AtomicReference<byte[]>(new byte[logEntryLimit + 8]);
        this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled((RaftProperties)properties);
        this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled((RaftProperties)properties);
        if (this.asyncFlush && this.unsafeFlush) {
            throw new IllegalStateException("Cannot enable both raft.server.log.unsafe-flush.enabled and raft.server.log.async-flush.enabled");
        }
        this.flushExecutor = !this.asyncFlush && !this.unsafeFlush ? null : ConcurrentUtils.newSingleThreadExecutor((String)(this.name + "-flush"));
    }

    void start(long latestIndex, long evictIndex, File openSegmentFile) throws IOException {
        LOG.trace("{} start(latestIndex={}, openSegmentFile={})", new Object[]{this.name, latestIndex, openSegmentFile});
        this.lastWrittenIndex = latestIndex;
        this.flushIndex.setUnconditionally(latestIndex, this.infoIndexChange);
        this.safeCacheEvictIndex.setUnconditionally(evictIndex, this.infoIndexChange);
        if (openSegmentFile != null) {
            Preconditions.assertTrue((boolean)openSegmentFile.exists());
            this.allocateSegmentedRaftLogOutputStream(openSegmentFile, true);
        }
        this.workerThreadExecutor.submit(() -> this.run());
    }

    void close() {
        this.running = false;
        this.sharedBuffer.set(null);
        Optional.ofNullable(this.flushExecutor).ifPresent(ExecutorService::shutdown);
        ConcurrentUtils.shutdownAndWait((TimeDuration)TimeDuration.ONE_SECOND.multiply(3.0), (ExecutorService)this.workerThreadExecutor, timeout -> LOG.warn("{}: shutdown timeout in " + timeout, (Object)this.name));
        IOUtils.cleanup((Logger)LOG, (Closeable[])new Closeable[]{this.out});
        LOG.info("{} close()", (Object)this.name);
    }

    void syncWithSnapshot(long lastSnapshotIndex) {
        this.queue.clear();
        this.lastWrittenIndex = lastSnapshotIndex;
        this.flushIndex.setUnconditionally(lastSnapshotIndex, this.infoIndexChange);
        this.safeCacheEvictIndex.setUnconditionally(lastSnapshotIndex, this.infoIndexChange);
        this.pendingFlushNum = 0;
    }

    public String toString() {
        return this.name;
    }

    private SegmentedRaftLog.Task addIOTask(SegmentedRaftLog.Task task) {
        LOG.debug("{} adds IO task {}", (Object)this.name, (Object)task);
        try {
            Timer.Context enqueueTimerContext = this.raftLogEnqueueingDelayTimer.time();
            while (!this.queue.offer((Object)task, ONE_SECOND)) {
                Preconditions.assertTrue((boolean)this.isAlive(), (Object)"the worker thread is not alive");
            }
            enqueueTimerContext.stop();
            task.startTimerOnEnqueue(this.raftLogQueueingTimer);
        }
        catch (Exception e) {
            if (e instanceof InterruptedException && !this.running) {
                LOG.info("Got InterruptedException when adding task " + task + ". The SegmentedRaftLogWorker already stopped.");
            }
            LOG.error("Failed to add IO task {}", (Object)task, (Object)e);
            Optional.ofNullable(this.server).ifPresent(RaftServer.Division::close);
        }
        return task;
    }

    boolean isAlive() {
        return this.running && !this.workerThreadExecutor.isTerminated();
    }

    private void run() {
        RaftLogIOException logIOException = null;
        while (this.running) {
            try {
                SegmentedRaftLog.Task task = (SegmentedRaftLog.Task)this.queue.poll(ONE_SECOND);
                if (task == null) continue;
                task.stopTimerOnDequeue();
                try {
                    if (logIOException != null) {
                        throw logIOException;
                    }
                    Timer.Context executionTimeContext = this.raftLogMetrics.getRaftLogTaskExecutionTimer(JavaUtils.getClassSimpleName(task.getClass()).toLowerCase()).time();
                    task.execute();
                    executionTimeContext.stop();
                }
                catch (IOException e) {
                    if (task.getEndIndex() < this.lastWrittenIndex) {
                        LOG.info("Ignore IOException when handling task " + task + " which is smaller than the lastWrittenIndex. There should be a snapshot installed.", (Throwable)e);
                    }
                    task.failed(e);
                    if (logIOException != null) continue;
                    logIOException = new RaftLogIOException("Log already failed at index " + task.getEndIndex() + " for task " + task, (Throwable)e);
                    continue;
                }
                task.done();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (this.running) {
                    LOG.warn("{} got interrupted while still running", (Object)Thread.currentThread().getName());
                }
                LOG.info(Thread.currentThread().getName() + " was interrupted, exiting. There are " + this.queue.getNumElements() + " tasks remaining in the queue.");
                return;
            }
            catch (Exception e) {
                if (!this.running) {
                    LOG.info("{} got closed and hit exception", (Object)Thread.currentThread().getName(), (Object)e);
                    continue;
                }
                LOG.error("{} hit exception", (Object)Thread.currentThread().getName(), (Object)e);
                Optional.ofNullable(this.server).ifPresent(RaftServer.Division::close);
            }
        }
    }

    private boolean shouldFlush() {
        if (this.out == null) {
            return false;
        }
        if (this.pendingFlushNum >= this.forceSyncNum) {
            return true;
        }
        return this.pendingFlushNum > 0 && this.queue.isEmpty();
    }

    private void flushIfNecessary() throws IOException {
        if (this.shouldFlush()) {
            this.raftLogMetrics.onRaftLogFlush();
            LOG.debug("{}: flush {}", (Object)this.name, (Object)this.out);
            Timer.Context timerContext = this.logFlushTimer.time();
            try {
                CompletableFuture f;
                CompletableFuture completableFuture = f = this.stateMachine != null ? this.stateMachine.data().flush(this.lastWrittenIndex) : CompletableFuture.completedFuture(null);
                if (this.stateMachineDataPolicy.isSync()) {
                    this.stateMachineDataPolicy.getFromFuture(f, () -> this + "-flushStateMachineData");
                }
                this.flushBatchSize = (int)(this.lastWrittenIndex - this.flushIndex.get());
                if (this.unsafeFlush) {
                    this.unsafeFlushOutStream();
                    this.updateFlushedIndexIncreasingly();
                } else if (this.asyncFlush) {
                    this.asyncFlushOutStream(f);
                } else {
                    this.flushOutStream();
                    if (!this.stateMachineDataPolicy.isSync()) {
                        IOUtils.getFromFuture((CompletableFuture)f, () -> this + "-flushStateMachineData");
                    }
                    this.updateFlushedIndexIncreasingly();
                }
            }
            finally {
                timerContext.stop();
            }
        }
    }

    private void unsafeFlushOutStream() throws IOException {
        Timer.Context logSyncTimerContext = this.raftLogSyncTimer.time();
        this.out.asyncFlush(this.flushExecutor).whenComplete((v, e) -> logSyncTimerContext.stop());
    }

    private void asyncFlushOutStream(CompletableFuture<Void> stateMachineFlush) throws IOException {
        Timer.Context logSyncTimerContext = this.raftLogSyncTimer.time();
        ((CompletableFuture)this.out.asyncFlush(this.flushExecutor).thenCombine(stateMachineFlush, (async, sm) -> async)).whenComplete((v, e) -> {
            this.updateFlushedIndexIncreasingly(this.lastWrittenIndex);
            logSyncTimerContext.stop();
        });
    }

    private void flushOutStream() throws IOException {
        Timer.Context logSyncTimerContext = this.raftLogSyncTimer.time();
        try {
            this.out.flush();
        }
        finally {
            logSyncTimerContext.stop();
        }
    }

    private void updateFlushedIndexIncreasingly() {
        this.updateFlushedIndexIncreasingly(this.lastWrittenIndex);
    }

    private void updateFlushedIndexIncreasingly(long index) {
        long i = index;
        this.flushIndex.updateIncreasingly(i, this.traceIndexChange);
        this.postUpdateFlushedIndex(Math.toIntExact(this.lastWrittenIndex - index));
        this.writeTasks.updateIndex(i);
    }

    private void postUpdateFlushedIndex(int count) {
        this.pendingFlushNum = count;
        Optional.ofNullable(this.submitUpdateCommitEvent).ifPresent(Runnable::run);
    }

    void startLogSegment(long startIndex) {
        LOG.info("{}: Starting segment from index:{}", (Object)this.name, (Object)startIndex);
        this.addIOTask((SegmentedRaftLog.Task)new StartLogSegment(this, startIndex));
    }

    void rollLogSegment(LogSegment segmentToClose) {
        LOG.info("{}: Rolling segment {} to index:{}", new Object[]{this.name, segmentToClose.toString(), segmentToClose.getEndIndex()});
        this.addIOTask((SegmentedRaftLog.Task)new FinalizeLogSegment(this, segmentToClose));
        this.addIOTask((SegmentedRaftLog.Task)new StartLogSegment(this, segmentToClose.getEndIndex() + 1L));
    }

    SegmentedRaftLog.Task writeLogEntry(RaftProtos.LogEntryProto entry) {
        return this.addIOTask((SegmentedRaftLog.Task)new WriteLog(this, entry));
    }

    SegmentedRaftLog.Task truncate(SegmentedRaftLogCache.TruncationSegments ts, long index) {
        LOG.info("{}: Truncating segments {}, start index {}", new Object[]{this.name, ts, index});
        return this.addIOTask((SegmentedRaftLog.Task)new TruncateLog(this, ts, index));
    }

    void closeLogSegment(LogSegment segmentToClose) {
        LOG.info("{}: Closing segment {} to index: {}", new Object[]{this.name, segmentToClose.toString(), segmentToClose.getEndIndex()});
        this.addIOTask((SegmentedRaftLog.Task)new FinalizeLogSegment(this, segmentToClose));
    }

    SegmentedRaftLog.Task purge(SegmentedRaftLogCache.TruncationSegments ts) {
        return this.addIOTask((SegmentedRaftLog.Task)new PurgeLog(this, ts, null));
    }

    File getFile(long startIndex, Long endIndex) {
        return LogSegmentStartEnd.valueOf((long)startIndex, (Long)endIndex).getFile(this.storage);
    }

    long getFlushIndex() {
        return this.flushIndex.get();
    }

    long getSafeCacheEvictIndex() {
        return this.safeCacheEvictIndex.get();
    }

    private void freeSegmentedRaftLogOutputStream() {
        IOUtils.cleanup((Logger)LOG, (Closeable[])new Closeable[]{this.out});
        this.out = null;
        Preconditions.assertTrue((this.writeBuffer.position() == 0 ? 1 : 0) != 0);
    }

    private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException {
        Preconditions.assertTrue((this.out == null && this.writeBuffer.position() == 0 ? 1 : 0) != 0);
        this.out = new SegmentedRaftLogOutputStream(file, append, this.segmentMaxSize, this.preallocatedSize, this.writeBuffer, this.sharedBuffer::get);
    }

    static /* synthetic */ SegmentedRaftLogMetrics access$200(SegmentedRaftLogWorker x0) {
        return x0.raftLogMetrics;
    }

    static /* synthetic */ RaftStorage access$300(SegmentedRaftLogWorker x0) {
        return x0.storage;
    }

    static /* synthetic */ StateMachine access$400(SegmentedRaftLogWorker x0) {
        return x0.stateMachine;
    }

    static /* synthetic */ RaftServer.Division access$500(SegmentedRaftLogWorker x0) {
        return x0.server;
    }

    static /* synthetic */ String access$600(SegmentedRaftLogWorker x0) {
        return x0.name;
    }

    static /* synthetic */ WriteLogTasks access$700(SegmentedRaftLogWorker x0) {
        return x0.writeTasks;
    }

    static /* synthetic */ StateMachineDataPolicy access$800(SegmentedRaftLogWorker x0) {
        return x0.stateMachineDataPolicy;
    }

    static /* synthetic */ SegmentedRaftLogOutputStream access$900(SegmentedRaftLogWorker x0) {
        return x0.out;
    }

    static /* synthetic */ long access$1000(SegmentedRaftLogWorker x0) {
        return x0.lastWrittenIndex;
    }

    static /* synthetic */ long access$1002(SegmentedRaftLogWorker x0, long x1) {
        x0.lastWrittenIndex = x1;
        return x0.lastWrittenIndex;
    }

    static /* synthetic */ int access$1108(SegmentedRaftLogWorker x0) {
        return x0.pendingFlushNum++;
    }

    static /* synthetic */ void access$1200(SegmentedRaftLogWorker x0) throws IOException {
        x0.flushIfNecessary();
    }

    static /* synthetic */ void access$1300(SegmentedRaftLogWorker x0) {
        x0.freeSegmentedRaftLogOutputStream();
    }

    static /* synthetic */ void access$1400(SegmentedRaftLogWorker x0) {
        x0.updateFlushedIndexIncreasingly();
    }

    static /* synthetic */ Consumer access$1500(SegmentedRaftLogWorker x0) {
        return x0.traceIndexChange;
    }

    static /* synthetic */ RaftLogIndex access$1600(SegmentedRaftLogWorker x0) {
        return x0.safeCacheEvictIndex;
    }

    static /* synthetic */ int access$1100(SegmentedRaftLogWorker x0) {
        return x0.pendingFlushNum;
    }

    static /* synthetic */ void access$1700(SegmentedRaftLogWorker x0, File x1, boolean x2) throws IOException {
        x0.allocateSegmentedRaftLogOutputStream(x1, x2);
    }

    static /* synthetic */ Consumer access$1800(SegmentedRaftLogWorker x0) {
        return x0.infoIndexChange;
    }

    static /* synthetic */ RaftLogIndex access$1900(SegmentedRaftLogWorker x0) {
        return x0.flushIndex;
    }

    static /* synthetic */ void access$2000(SegmentedRaftLogWorker x0, int x1) {
        x0.postUpdateFlushedIndex(x1);
    }
}

