/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.zookeeper.server.quorum;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.shade.org.apache.zookeeper.common.Time;
import org.apache.pulsar.shade.org.apache.zookeeper.server.ExitCode;
import org.apache.pulsar.shade.org.apache.zookeeper.server.Request;
import org.apache.pulsar.shade.org.apache.zookeeper.server.RequestProcessor;
import org.apache.pulsar.shade.org.apache.zookeeper.server.ServerMetrics;
import org.apache.pulsar.shade.org.apache.zookeeper.server.WorkerService;
import org.apache.pulsar.shade.org.apache.zookeeper.server.ZooKeeperCriticalThread;
import org.apache.pulsar.shade.org.apache.zookeeper.server.ZooKeeperServerListener;
import org.apache.pulsar.shade.org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitProcessor
extends ZooKeeperCriticalThread
implements RequestProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(CommitProcessor.class);
    public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS = "zookeeper.commitProcessor.numWorkerThreads";
    public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT = "zookeeper.commitProcessor.shutdownTimeout";
    public static final String ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE = "zookeeper.commitProcessor.maxReadBatchSize";
    public static final String ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE = "zookeeper.commitProcessor.maxCommitBatchSize";
    protected LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue();
    protected final LinkedBlockingQueue<Request> queuedWriteRequests = new LinkedBlockingQueue();
    private AtomicInteger numReadQueuedRequests = new AtomicInteger(0);
    private AtomicInteger numWriteQueuedRequests = new AtomicInteger(0);
    protected final LinkedBlockingQueue<Request> committedRequests = new LinkedBlockingQueue();
    protected final Map<Long, Deque<Request>> pendingRequests = new HashMap<Long, Deque<Request>>(10000);
    protected final AtomicInteger numRequestsProcessing = new AtomicInteger(0);
    RequestProcessor nextProcessor;
    protected volatile boolean stoppedMainLoop = true;
    protected volatile boolean stopped = true;
    private long workerShutdownTimeoutMS;
    protected WorkerService workerPool;
    private Object emptyPoolSync = new Object();
    private static volatile int maxReadBatchSize;
    private static volatile int maxCommitBatchSize;
    boolean matchSyncs;

    public CommitProcessor(RequestProcessor nextProcessor, String id, boolean matchSyncs, ZooKeeperServerListener listener) {
        super("CommitProcessor:" + id, listener);
        this.nextProcessor = nextProcessor;
        this.matchSyncs = matchSyncs;
    }

    private boolean isProcessingRequest() {
        return this.numRequestsProcessing.get() != 0;
    }

    protected boolean needCommit(Request request) {
        if (request.isThrottled()) {
            return false;
        }
        switch (request.type) {
            case 1: 
            case 2: 
            case 5: 
            case 7: 
            case 13: 
            case 14: 
            case 15: 
            case 16: 
            case 19: 
            case 20: 
            case 21: {
                return true;
            }
            case 9: {
                return this.matchSyncs;
            }
            case -11: 
            case -10: {
                return !request.isLocalSession();
            }
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            int requestsToProcess = 0;
            boolean commitIsWaiting = false;
            do {
                Request request;
                CommitProcessor commitProcessor = this;
                synchronized (commitProcessor) {
                    commitIsWaiting = !this.committedRequests.isEmpty();
                    requestsToProcess = this.queuedRequests.size();
                    if (requestsToProcess == 0 && !commitIsWaiting) {
                        while (!this.stopped && requestsToProcess == 0 && !commitIsWaiting) {
                            this.wait();
                            commitIsWaiting = !this.committedRequests.isEmpty();
                            requestsToProcess = this.queuedRequests.size();
                        }
                    }
                }
                ServerMetrics.getMetrics().READS_QUEUED_IN_COMMIT_PROCESSOR.add(this.numReadQueuedRequests.get());
                ServerMetrics.getMetrics().WRITES_QUEUED_IN_COMMIT_PROCESSOR.add(this.numWriteQueuedRequests.get());
                ServerMetrics.getMetrics().COMMITS_QUEUED_IN_COMMIT_PROCESSOR.add(this.committedRequests.size());
                long time = Time.currentElapsedTime();
                int readsProcessed = 0;
                while (!(this.stopped || requestsToProcess <= 0 || maxReadBatchSize >= 0 && readsProcessed > maxReadBatchSize || (request = this.queuedRequests.poll()) == null)) {
                    --requestsToProcess;
                    if (this.needCommit(request) || this.pendingRequests.containsKey(request.sessionId)) {
                        Deque requests = this.pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque());
                        requests.addLast(request);
                        ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size());
                    } else {
                        ++readsProcessed;
                        this.numReadQueuedRequests.decrementAndGet();
                        this.sendToNextProcessor(request);
                    }
                    if (maxReadBatchSize >= 0 || this.pendingRequests.isEmpty() || this.committedRequests.isEmpty()) continue;
                    commitIsWaiting = true;
                    break;
                }
                ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(readsProcessed);
                if (!commitIsWaiting) {
                    boolean bl = commitIsWaiting = !this.committedRequests.isEmpty();
                }
                if (commitIsWaiting && !this.stopped) {
                    this.waitForEmptyPool();
                    if (this.stopped) {
                        return;
                    }
                    int commitsToProcess = maxCommitBatchSize;
                    HashSet<Long> queuesToDrain = new HashSet<Long>();
                    long startWriteTime = Time.currentElapsedTime();
                    int commitsProcessed = 0;
                    while (commitIsWaiting && !this.stopped && commitsToProcess > 0) {
                        request = this.committedRequests.peek();
                        if (request.isThrottled()) {
                            LOG.error("Throttled request in committed pool: {}. Exiting.", (Object)request);
                            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
                        }
                        if (!this.queuedWriteRequests.isEmpty() && this.queuedWriteRequests.peek().sessionId == request.sessionId && this.queuedWriteRequests.peek().cxid == request.cxid) {
                            Deque<Request> sessionQueue = this.pendingRequests.get(request.sessionId);
                            ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(this.pendingRequests.size());
                            if (sessionQueue == null || sessionQueue.isEmpty() || !this.needCommit(sessionQueue.peek())) break;
                            ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
                            Request topPending = sessionQueue.poll();
                            topPending.setHdr(request.getHdr());
                            topPending.setTxn(request.getTxn());
                            topPending.setTxnDigest(request.getTxnDigest());
                            topPending.zxid = request.zxid;
                            topPending.commitRecvTime = request.commitRecvTime;
                            request = topPending;
                            if (request.isThrottled()) {
                                LOG.error("Throttled request in committed & pending pool: {}. Exiting.", (Object)request);
                                ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
                            }
                            this.numWriteQueuedRequests.decrementAndGet();
                            this.queuedWriteRequests.poll();
                            queuesToDrain.add(request.sessionId);
                        }
                        this.committedRequests.remove();
                        --commitsToProcess;
                        ++commitsProcessed;
                        this.processWrite(request);
                        commitIsWaiting = !this.committedRequests.isEmpty();
                    }
                    ServerMetrics.getMetrics().WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR.add(Time.currentElapsedTime() - startWriteTime);
                    ServerMetrics.getMetrics().WRITES_ISSUED_IN_COMMIT_PROC.add(commitsProcessed);
                    readsProcessed = 0;
                    for (Long sessionId : queuesToDrain) {
                        Deque<Request> sessionQueue = this.pendingRequests.get(sessionId);
                        int readsAfterWrite = 0;
                        while (!(this.stopped || sessionQueue.isEmpty() || this.needCommit(sessionQueue.peek()))) {
                            this.numReadQueuedRequests.decrementAndGet();
                            this.sendToNextProcessor(sessionQueue.poll());
                            ++readsAfterWrite;
                        }
                        ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
                        readsProcessed += readsAfterWrite;
                        if (!sessionQueue.isEmpty()) continue;
                        this.pendingRequests.remove(sessionId);
                    }
                    ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(queuesToDrain.size());
                    ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(readsProcessed);
                }
                ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time);
                this.endOfIteration();
            } while (!this.stoppedMainLoop);
        }
        catch (Throwable e) {
            this.handleException(this.getName(), e);
        }
        LOG.info("CommitProcessor exited loop!");
    }

    protected void endOfIteration() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForEmptyPool() throws InterruptedException {
        int numRequestsInProcess = this.numRequestsProcessing.get();
        if (numRequestsInProcess != 0) {
            ServerMetrics.getMetrics().CONCURRENT_REQUEST_PROCESSING_IN_COMMIT_PROCESSOR.add(numRequestsInProcess);
        }
        long startWaitTime = Time.currentElapsedTime();
        Object object = this.emptyPoolSync;
        synchronized (object) {
            while (!this.stopped && this.isProcessingRequest()) {
                this.emptyPoolSync.wait();
            }
        }
        ServerMetrics.getMetrics().TIME_WAITING_EMPTY_POOL_IN_COMMIT_PROCESSOR_READ.add(Time.currentElapsedTime() - startWaitTime);
    }

    @Override
    public void start() {
        int numCores = Runtime.getRuntime().availableProcessors();
        int numWorkerThreads = Integer.getInteger(ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, numCores);
        this.workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000L);
        CommitProcessor.initBatchSizes();
        LOG.info("Configuring CommitProcessor with {} worker threads.", numWorkerThreads > 0 ? Integer.valueOf(numWorkerThreads) : "no");
        if (this.workerPool == null) {
            this.workerPool = new WorkerService("CommitProcWork", numWorkerThreads, true);
        }
        this.stopped = false;
        this.stoppedMainLoop = false;
        super.start();
    }

    private void sendToNextProcessor(Request request) {
        this.numRequestsProcessing.incrementAndGet();
        CommitWorkRequest workRequest = new CommitWorkRequest(request);
        this.workerPool.schedule(workRequest, request.sessionId);
    }

    private void processWrite(Request request) throws RequestProcessor.RequestProcessorException {
        CommitProcessor.processCommitMetrics(request, true);
        long timeBeforeFinalProc = Time.currentElapsedTime();
        this.nextProcessor.processRequest(request);
        ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(Time.currentElapsedTime() - timeBeforeFinalProc);
    }

    private static void initBatchSizes() {
        maxReadBatchSize = Integer.getInteger(ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE, -1);
        maxCommitBatchSize = Integer.getInteger(ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE, 1);
        if (maxCommitBatchSize <= 0) {
            String errorMsg = "maxCommitBatchSize must be positive, was " + maxCommitBatchSize;
            throw new IllegalArgumentException(errorMsg);
        }
        LOG.info("Configuring CommitProcessor with readBatchSize {} commitBatchSize {}", (Object)maxReadBatchSize, (Object)maxCommitBatchSize);
    }

    private static void processCommitMetrics(Request request, boolean isWrite) {
        if (isWrite) {
            if (request.commitProcQueueStartTime != -1L && request.commitRecvTime != -1L) {
                long currentTime = Time.currentElapsedTime();
                ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(currentTime - request.commitProcQueueStartTime);
                ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(currentTime - request.commitRecvTime);
            } else if (request.commitRecvTime != -1L) {
                ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add(Time.currentElapsedTime() - request.commitRecvTime);
            }
        } else if (request.commitProcQueueStartTime != -1L) {
            ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add(Time.currentElapsedTime() - request.commitProcQueueStartTime);
        }
    }

    public static int getMaxReadBatchSize() {
        return maxReadBatchSize;
    }

    public static int getMaxCommitBatchSize() {
        return maxCommitBatchSize;
    }

    public static void setMaxReadBatchSize(int size) {
        maxReadBatchSize = size;
        LOG.info("Configuring CommitProcessor with readBatchSize {}", (Object)maxReadBatchSize);
    }

    public static void setMaxCommitBatchSize(int size) {
        if (size > 0) {
            maxCommitBatchSize = size;
            LOG.info("Configuring CommitProcessor with commitBatchSize {}", (Object)maxCommitBatchSize);
        }
    }

    @SuppressFBWarnings(value={"NN_NAKED_NOTIFY"})
    private synchronized void wakeup() {
        this.notifyAll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void wakeupOnEmpty() {
        Object object = this.emptyPoolSync;
        synchronized (object) {
            this.emptyPoolSync.notifyAll();
        }
    }

    public void commit(Request request) {
        if (this.stopped || request == null) {
            return;
        }
        LOG.debug("Committing request:: {}", (Object)request);
        request.commitRecvTime = Time.currentElapsedTime();
        ServerMetrics.getMetrics().COMMITS_QUEUED.add(1L);
        this.committedRequests.add(request);
        this.wakeup();
    }

    @Override
    public void processRequest(Request request) {
        if (this.stopped) {
            return;
        }
        LOG.debug("Processing request:: {}", (Object)request);
        request.commitProcQueueStartTime = Time.currentElapsedTime();
        this.queuedRequests.add(request);
        if (this.needCommit(request)) {
            this.queuedWriteRequests.add(request);
            this.numWriteQueuedRequests.incrementAndGet();
        } else {
            this.numReadQueuedRequests.incrementAndGet();
        }
        this.wakeup();
    }

    private void halt() {
        this.stoppedMainLoop = true;
        this.stopped = true;
        this.wakeupOnEmpty();
        this.wakeup();
        this.queuedRequests.clear();
        if (this.workerPool != null) {
            this.workerPool.stop();
        }
    }

    @Override
    public void shutdown() {
        LOG.info("Shutting down");
        this.halt();
        if (this.workerPool != null) {
            this.workerPool.join(this.workerShutdownTimeoutMS);
        }
        if (this.nextProcessor != null) {
            this.nextProcessor.shutdown();
        }
    }

    private class CommitWorkRequest
    extends WorkerService.WorkRequest {
        private final Request request;

        CommitWorkRequest(Request request) {
            this.request = request;
        }

        @Override
        public void cleanup() {
            if (!CommitProcessor.this.stopped) {
                LOG.error("Exception thrown by downstream processor, unable to continue.");
                CommitProcessor.this.halt();
            }
        }

        @Override
        public void doWork() throws RequestProcessor.RequestProcessorException {
            try {
                CommitProcessor.processCommitMetrics(this.request, CommitProcessor.this.needCommit(this.request));
                long timeBeforeFinalProc = Time.currentElapsedTime();
                CommitProcessor.this.nextProcessor.processRequest(this.request);
                if (CommitProcessor.this.needCommit(this.request)) {
                    ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(Time.currentElapsedTime() - timeBeforeFinalProc);
                } else {
                    ServerMetrics.getMetrics().READ_FINAL_PROC_TIME.add(Time.currentElapsedTime() - timeBeforeFinalProc);
                }
            }
            finally {
                if (CommitProcessor.this.numRequestsProcessing.decrementAndGet() == 0) {
                    CommitProcessor.this.wakeupOnEmpty();
                }
            }
        }
    }
}

