/*
 * Decompiled with CFR 0.152.
 */
package com.uber.cadence.internal.worker;

import com.uber.cadence.internal.common.BackoffThrottler;
import com.uber.cadence.internal.common.InternalUtils;
import com.uber.cadence.internal.worker.ExecutorThreadFactory;
import com.uber.cadence.internal.worker.PollerOptions;
import com.uber.cadence.internal.worker.ShutdownableTaskExecutor;
import com.uber.cadence.internal.worker.SuspendableWorker;
import com.uber.cadence.internal.worker.Throttler;
import com.uber.cadence.internal.worker.autoscaler.AutoScaler;
import com.uber.cadence.internal.worker.autoscaler.AutoScalerFactory;
import com.uber.m3.tally.Scope;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Poller<T>
implements SuspendableWorker {
    private final String identity;
    private final ShutdownableTaskExecutor<T> taskExecutor;
    private final PollTask<T> pollTask;
    private final PollerOptions pollerOptions;
    private static final Logger log = LoggerFactory.getLogger(Poller.class);
    private ThreadPoolExecutor pollExecutor;
    private final Scope metricsScope;
    private final AtomicReference<CountDownLatch> suspendLatch = new AtomicReference();
    private BackoffThrottler pollBackoffThrottler;
    private Throttler pollRateThrottler;
    private Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (t, e) -> {
        TTransportException te;
        if (e instanceof TTransportException && (te = (TTransportException)e).getType() == 3) {
            log.warn("Failure in thread " + t.getName(), e);
            return;
        }
        log.error("Failure in thread " + t.getName(), e);
    };
    private final AutoScaler pollerAutoScaler;

    public Poller(String identity, PollTask<T> pollTask, ShutdownableTaskExecutor<T> taskExecutor, PollerOptions pollerOptions, Scope metricsScope) {
        Objects.requireNonNull(identity, "identity cannot be null");
        Objects.requireNonNull(pollTask, "poll service should not be null");
        Objects.requireNonNull(taskExecutor, "taskExecutor should not be null");
        Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");
        Objects.requireNonNull(metricsScope, "metricsScope should not be null");
        this.identity = identity;
        this.pollTask = pollTask;
        this.taskExecutor = taskExecutor;
        this.pollerOptions = pollerOptions;
        this.metricsScope = metricsScope;
        this.pollerAutoScaler = AutoScalerFactory.getInstance().createAutoScaler(pollerOptions);
    }

    @Override
    public void start() {
        if (log.isDebugEnabled()) {
            log.debug("start(): " + this.toString());
        }
        if (this.pollerOptions.getMaximumPollRatePerSecond() > 0.0) {
            this.pollRateThrottler = new Throttler("poller", this.pollerOptions.getMaximumPollRatePerSecond(), this.pollerOptions.getMaximumPollRateIntervalMilliseconds());
        }
        this.pollExecutor = new ThreadPoolExecutor(this.pollerOptions.getPollThreadCount(), this.pollerOptions.getPollThreadCount(), 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(this.pollerOptions.getPollThreadCount()));
        this.pollExecutor.setThreadFactory(new ExecutorThreadFactory(this.pollerOptions.getPollThreadNamePrefix(), this.pollerOptions.getUncaughtExceptionHandler()));
        this.pollBackoffThrottler = new BackoffThrottler(this.pollerOptions.getPollBackoffInitialInterval(), this.pollerOptions.getPollBackoffMaximumInterval(), this.pollerOptions.getPollBackoffCoefficient());
        for (int i = 0; i < this.pollerOptions.getPollThreadCount(); ++i) {
            this.pollExecutor.execute(new PollLoopTask(new PollExecutionTask()));
            this.metricsScope.counter("cadence-poller-start").inc(1L);
        }
        this.pollerAutoScaler.start();
    }

    @Override
    public boolean isStarted() {
        return this.pollExecutor != null;
    }

    @Override
    public boolean isShutdown() {
        return this.pollExecutor.isShutdown() && this.taskExecutor.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return this.pollExecutor.isTerminated() && this.taskExecutor.isTerminated();
    }

    @Override
    public void shutdown() {
        log.debug("shutdown");
        if (!this.isStarted()) {
            return;
        }
        this.pollExecutor.shutdownNow();
        try {
            this.pollExecutor.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.taskExecutor.shutdown();
        this.pollerAutoScaler.stop();
    }

    @Override
    public void shutdownNow() {
        if (log.isDebugEnabled()) {
            log.debug("shutdownNow poller=" + this.pollerOptions.getPollThreadNamePrefix());
        }
        if (!this.isStarted()) {
            return;
        }
        this.pollExecutor.shutdownNow();
        this.taskExecutor.shutdownNow();
    }

    @Override
    public void awaitTermination(long timeout, TimeUnit unit) {
        if (!this.isStarted()) {
            return;
        }
        long timeoutMillis = unit.toMillis(timeout);
        timeoutMillis = InternalUtils.awaitTermination(this.pollExecutor, timeoutMillis);
        InternalUtils.awaitTermination(this.taskExecutor, timeoutMillis);
    }

    @Override
    public void suspendPolling() {
        log.info("suspendPolling");
        this.suspendLatch.set(new CountDownLatch(1));
    }

    @Override
    public void resumePolling() {
        log.info("resumePolling");
        CountDownLatch existing = this.suspendLatch.getAndSet(null);
        if (existing != null) {
            existing.countDown();
        }
    }

    @Override
    public boolean isSuspended() {
        return this.suspendLatch.get() != null;
    }

    public String toString() {
        return "Poller{options=" + this.pollerOptions + ", identity=" + this.identity + '}';
    }

    private class PollExecutionTask
    implements ThrowingRunnable {
        private static final int EXECUTOR_CAPACITY_CHECK_INTERVAL_MS = 100;
        private static final int EXECUTOR_CAPACITY_CHECK_OFFSET_MS = 10;

        PollExecutionTask() {
        }

        @Override
        public void run() throws Exception {
            try {
                Poller.this.pollerAutoScaler.acquire();
                try {
                    Object task = Poller.this.pollTask.poll();
                    if (task == null) {
                        Poller.this.pollerAutoScaler.increaseNoopPollCount();
                        return;
                    }
                    Poller.this.pollerAutoScaler.increaseActionablePollCount();
                    Poller.this.taskExecutor.process(task);
                }
                finally {
                    this.checkIfTaskHasExecutorHasCapacity();
                }
            }
            finally {
                Poller.this.pollerAutoScaler.release();
            }
        }

        private void checkIfTaskHasExecutorHasCapacity() {
            if (Poller.this.pollerOptions.getPollOnlyIfExecutorHasCapacity().booleanValue()) {
                while (true) {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    if (Poller.this.taskExecutor.hasCapacity()) break;
                    try {
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }
    }

    private class PollLoopTask
    implements Runnable {
        private final ThrowingRunnable task;

        PollLoopTask(ThrowingRunnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            try {
                CountDownLatch suspender;
                if (Poller.this.pollExecutor.isTerminating()) {
                    return;
                }
                Poller.this.pollBackoffThrottler.throttle();
                if (Poller.this.pollExecutor.isTerminating()) {
                    return;
                }
                if (Poller.this.pollRateThrottler != null) {
                    Poller.this.pollRateThrottler.throttle();
                }
                if ((suspender = (CountDownLatch)Poller.this.suspendLatch.get()) != null) {
                    if (log.isDebugEnabled()) {
                        log.debug("poll task suspending latchCount=" + suspender.getCount());
                    }
                    suspender.await();
                }
                if (Poller.this.pollExecutor.isTerminating()) {
                    return;
                }
                this.task.run();
                Poller.this.pollBackoffThrottler.success();
            }
            catch (Throwable e) {
                Poller.this.pollBackoffThrottler.failure();
                if (!(e.getCause() instanceof InterruptedException)) {
                    Poller.this.uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
                }
            }
            finally {
                if (!Poller.this.pollExecutor.isTerminating()) {
                    Poller.this.pollExecutor.execute(this);
                } else {
                    log.debug("poll loop done");
                }
            }
        }
    }

    static interface ThrowingRunnable {
        public void run() throws Throwable;
    }

    public static interface PollTask<TT> {
        public TT poll() throws TException;
    }
}

