/*
 * Decompiled with CFR 0.152.
 */
package io.temporal.internal.worker;

import com.uber.m3.tally.Scope;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.internal.BackoffThrottler;
import io.temporal.internal.common.GrpcUtils;
import io.temporal.internal.worker.ExecutorThreadFactory;
import io.temporal.internal.worker.PollerOptions;
import io.temporal.internal.worker.ShutdownManager;
import io.temporal.internal.worker.ShutdownableTaskExecutor;
import io.temporal.internal.worker.SuspendableWorker;
import io.temporal.internal.worker.Throttler;
import io.temporal.internal.worker.WorkerLifecycleState;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class Poller<T>
implements SuspendableWorker {
    private final String identity;
    private final ShutdownableTaskExecutor<T> taskExecutor;
    private final PollTask<T> pollTask;
    private final PollerOptions pollerOptions;
    private static final Logger log = LoggerFactory.getLogger(Poller.class);
    private ThreadPoolExecutor pollExecutor;
    private final Scope workerMetricsScope;
    private final AtomicReference<CountDownLatch> suspendLatch = new AtomicReference();
    private Throttler pollRateThrottler;
    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new PollerUncaughtExceptionHandler();

    public Poller(String identity, PollTask<T> pollTask, ShutdownableTaskExecutor<T> taskExecutor, PollerOptions pollerOptions, Scope workerMetricsScope) {
        Objects.requireNonNull(identity, "identity cannot be null");
        Objects.requireNonNull(pollTask, "poll service should not be null");
        Objects.requireNonNull(taskExecutor, "taskExecutor should not be null");
        Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");
        Objects.requireNonNull(workerMetricsScope, "workerMetricsScope should not be null");
        this.identity = identity;
        this.pollTask = pollTask;
        this.taskExecutor = taskExecutor;
        this.pollerOptions = pollerOptions;
        this.workerMetricsScope = workerMetricsScope;
    }

    @Override
    public boolean start() {
        log.info("start: {}", (Object)this);
        if (this.pollerOptions.getMaximumPollRatePerSecond() > 0.0) {
            this.pollRateThrottler = new Throttler("poller", this.pollerOptions.getMaximumPollRatePerSecond(), this.pollerOptions.getMaximumPollRateIntervalMilliseconds());
        }
        this.pollExecutor = new ThreadPoolExecutor(this.pollerOptions.getPollThreadCount(), this.pollerOptions.getPollThreadCount(), 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(this.pollerOptions.getPollThreadCount()));
        this.pollExecutor.setThreadFactory(new ExecutorThreadFactory(this.pollerOptions.getPollThreadNamePrefix(), this.pollerOptions.getUncaughtExceptionHandler()));
        for (int i = 0; i < this.pollerOptions.getPollThreadCount(); ++i) {
            this.pollExecutor.execute(new PollLoopTask(new PollExecutionTask()));
            this.workerMetricsScope.counter("temporal_poller_start").inc(1L);
        }
        return true;
    }

    @Override
    public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
        log.info("shutdown: {}", (Object)this);
        WorkerLifecycleState lifecycleState = this.getLifecycleState();
        switch (lifecycleState) {
            case NOT_STARTED: 
            case TERMINATED: {
                return CompletableFuture.completedFuture(null);
            }
        }
        return shutdownManager.shutdownExecutorNow(this.pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1L)).exceptionally(e -> {
            log.error("Unexpected exception during shutdown", e);
            return null;
        });
    }

    @Override
    public void awaitTermination(long timeout, TimeUnit unit) {
        WorkerLifecycleState lifecycleState = this.getLifecycleState();
        switch (lifecycleState) {
            case NOT_STARTED: 
            case TERMINATED: {
                return;
            }
        }
        long timeoutMillis = unit.toMillis(timeout);
        ShutdownManager.awaitTermination(this.pollExecutor, timeoutMillis);
    }

    @Override
    public void suspendPolling() {
        if (this.suspendLatch.compareAndSet(null, new CountDownLatch(1))) {
            log.info("Suspend Polling: {}", (Object)this);
        } else {
            log.info("Polling is already suspended: {}", (Object)this);
        }
    }

    @Override
    public void resumePolling() {
        CountDownLatch existing = this.suspendLatch.getAndSet(null);
        if (existing != null) {
            log.info("Resume Polling {}", (Object)this);
            existing.countDown();
        }
    }

    @Override
    public boolean isSuspended() {
        return this.suspendLatch.get() != null;
    }

    @Override
    public boolean isShutdown() {
        return this.pollExecutor.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return this.pollExecutor.isTerminated() && this.taskExecutor.isTerminated();
    }

    @Override
    public WorkerLifecycleState getLifecycleState() {
        if (this.pollExecutor == null) {
            return WorkerLifecycleState.NOT_STARTED;
        }
        if (this.suspendLatch.get() != null) {
            return WorkerLifecycleState.SUSPENDED;
        }
        if (this.pollExecutor.isShutdown()) {
            if (this.pollExecutor.isTerminated() && this.taskExecutor.isTerminated()) {
                return WorkerLifecycleState.TERMINATED;
            }
            return WorkerLifecycleState.SHUTDOWN;
        }
        return WorkerLifecycleState.ACTIVE;
    }

    public String toString() {
        return String.format("Poller{name=%s, identity=%s}", this.pollerOptions.getPollThreadNamePrefix(), this.identity);
    }

    private final class PollerUncaughtExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        private PollerUncaughtExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            if (!Poller.this.pollExecutor.isShutdown() || !this.shouldIgnoreDuringShutdown(e)) {
                this.logPollErrors(t, e);
            } else {
                this.logPollExceptionsSuppressedDuringShutdown(t, e);
            }
        }

        private void logPollErrors(Thread t, Throwable e) {
            StatusRuntimeException te;
            if (e instanceof StatusRuntimeException && (te = (StatusRuntimeException)e).getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
                log.info("DEADLINE_EXCEEDED in poller thread {}", (Object)t.getName(), (Object)e);
                return;
            }
            log.warn("Failure in poller thread {}", (Object)t.getName(), (Object)e);
        }

        private void logPollExceptionsSuppressedDuringShutdown(Thread t, Throwable e) {
            log.trace("Failure in thread {} is suppressed, considered normal during shutdown", (Object)t.getName(), (Object)e);
        }

        private boolean shouldIgnoreDuringShutdown(Throwable ex) {
            if (ex instanceof StatusRuntimeException && GrpcUtils.isChannelShutdownException((StatusRuntimeException)ex)) {
                return true;
            }
            return ex instanceof RejectedExecutionException || ex instanceof InterruptedException || ex.getCause() instanceof InterruptedException;
        }
    }

    private class PollExecutionTask
    implements ThrowingRunnable {
        private PollExecutionTask() {
        }

        @Override
        public void run() throws Exception {
            Object task = Poller.this.pollTask.poll();
            if (task != null) {
                Poller.this.taskExecutor.process(task);
            }
        }
    }

    private class PollLoopTask
    implements Runnable {
        private final ThrowingRunnable task;
        private final BackoffThrottler pollBackoffThrottler;

        PollLoopTask(ThrowingRunnable task) {
            this.task = task;
            this.pollBackoffThrottler = new BackoffThrottler(Poller.this.pollerOptions.getBackoffInitialInterval(), Poller.this.pollerOptions.getBackoffCongestionInitialInterval(), Poller.this.pollerOptions.getBackoffMaximumInterval(), Poller.this.pollerOptions.getBackoffCoefficient(), Poller.this.pollerOptions.getBackoffMaximumJitterCoefficient());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                CountDownLatch suspender;
                long throttleMs = this.pollBackoffThrottler.getSleepTime();
                if (throttleMs > 0L) {
                    Thread.sleep(throttleMs);
                }
                if (Poller.this.pollRateThrottler != null) {
                    Poller.this.pollRateThrottler.throttle();
                }
                if ((suspender = (CountDownLatch)Poller.this.suspendLatch.get()) != null) {
                    if (log.isDebugEnabled()) {
                        log.debug("poll task suspending latchCount=" + suspender.getCount());
                    }
                    suspender.await();
                }
                if (this.shouldTerminate()) {
                    return;
                }
                this.task.run();
                this.pollBackoffThrottler.success();
            }
            catch (Throwable e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                } else {
                    this.pollBackoffThrottler.failure(e instanceof StatusRuntimeException ? ((StatusRuntimeException)e).getStatus().getCode() : Status.Code.UNKNOWN);
                }
                Poller.this.uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
            }
            finally {
                if (!this.shouldTerminate()) {
                    Poller.this.pollExecutor.execute(this);
                } else {
                    log.info("poll loop is terminated: {}", (Object)Poller.this.pollTask.getClass().getSimpleName());
                }
            }
        }

        private boolean shouldTerminate() {
            return Poller.this.pollExecutor.isShutdown() || Thread.currentThread().isInterrupted();
        }
    }

    static interface ThrowingRunnable {
        public void run() throws Throwable;
    }

    public static interface PollTask<TT> {
        public TT poll();
    }
}

