/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.GracefulShutdownContext;
import software.amazon.kinesis.shaded.org.apache.commons.logging.Log;
import software.amazon.kinesis.shaded.org.apache.commons.logging.LogFactory;

class GracefulShutdownCoordinator {
    GracefulShutdownCoordinator() {
    }

    Future<Boolean> startGracefulShutdown(Callable<Boolean> shutdownCallable) {
        FutureTask<Boolean> task = new FutureTask<Boolean>(shutdownCallable);
        Thread shutdownThread = new Thread(task, "RequestedShutdownThread");
        shutdownThread.start();
        return task;
    }

    Callable<Boolean> createGracefulShutdownCallable(Callable<GracefulShutdownContext> startWorkerShutdown) {
        return new GracefulShutdownCallable(startWorkerShutdown);
    }

    static class GracefulShutdownCallable
    implements Callable<Boolean> {
        private static final Log log = LogFactory.getLog(GracefulShutdownCallable.class);
        private final Callable<GracefulShutdownContext> startWorkerShutdown;

        GracefulShutdownCallable(Callable<GracefulShutdownContext> startWorkerShutdown) {
            this.startWorkerShutdown = startWorkerShutdown;
        }

        private boolean isWorkerShutdownComplete(GracefulShutdownContext context) {
            return context.getWorker().isShutdownComplete() || context.getWorker().getShardInfoShardConsumerMap().isEmpty();
        }

        private String awaitingLogMessage(GracefulShutdownContext context) {
            long awaitingNotification = context.getNotificationCompleteLatch().getCount();
            long awaitingFinalShutdown = context.getShutdownCompleteLatch().getCount();
            return String.format("Waiting for %d record process to complete shutdown notification, and %d record processor to complete final shutdown ", awaitingNotification, awaitingFinalShutdown);
        }

        private String awaitingFinalShutdownMessage(GracefulShutdownContext context) {
            long outstanding = context.getShutdownCompleteLatch().getCount();
            return String.format("Waiting for %d record processors to complete final shutdown", outstanding);
        }

        private boolean waitForRecordProcessors(GracefulShutdownContext context) {
            try {
                while (!context.getNotificationCompleteLatch().await(1L, TimeUnit.SECONDS)) {
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                    log.info(this.awaitingLogMessage(context));
                    if (!this.workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) continue;
                    return false;
                }
            }
            catch (InterruptedException ie) {
                log.warn("Interrupted while waiting for notification complete, terminating shutdown.  " + this.awaitingLogMessage(context));
                return false;
            }
            if (Thread.interrupted()) {
                log.warn("Interrupted before worker shutdown, terminating shutdown");
                return false;
            }
            context.getWorker().shutdown();
            if (Thread.interrupted()) {
                log.warn("Interrupted after worker shutdown, terminating shutdown");
                return false;
            }
            try {
                while (!context.getShutdownCompleteLatch().await(1L, TimeUnit.SECONDS)) {
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                    log.info(this.awaitingFinalShutdownMessage(context));
                    if (!this.workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) continue;
                    return false;
                }
            }
            catch (InterruptedException ie) {
                log.warn("Interrupted while waiting for shutdown completion, terminating shutdown. " + this.awaitingFinalShutdownMessage(context));
                return false;
            }
            return true;
        }

        private boolean workerShutdownWithRemaining(long outstanding, GracefulShutdownContext context) {
            if (this.isWorkerShutdownComplete(context) && outstanding != 0L) {
                log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding + " with a current value of " + context.getShutdownCompleteLatch().getCount() + ". shutdownComplete: " + context.getWorker().isShutdownComplete() + " -- Consumer Map: " + context.getWorker().getShardInfoShardConsumerMap().size());
                return true;
            }
            return false;
        }

        @Override
        public Boolean call() throws Exception {
            GracefulShutdownContext context;
            try {
                context = this.startWorkerShutdown.call();
            }
            catch (Exception ex) {
                log.warn("Caught exception while requesting initial worker shutdown.", ex);
                throw ex;
            }
            return context.isShutdownAlreadyCompleted() || this.waitForRecordProcessors(context);
        }
    }
}

