/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.lifecycle;

import com.google.common.annotations.VisibleForTesting;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import lombok.NonNull;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.ConsumerState;
import software.amazon.kinesis.lifecycle.ConsumerStates;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
import software.amazon.kinesis.lifecycle.ShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskExecutionListener;
import software.amazon.kinesis.lifecycle.TaskOutcome;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;

@KinesisClientInternalApi
public class ShardConsumer {
    private static final Logger log = LoggerFactory.getLogger(ShardConsumer.class);
    public static final int MAX_TIME_BETWEEN_REQUEST_RESPONSE = 35000;
    private final RecordsPublisher recordsPublisher;
    private final ExecutorService executorService;
    private final Scheduler scheduler;
    private final ShardInfo shardInfo;
    private final ShardConsumerArgument shardConsumerArgument;
    @NonNull
    private final Optional<Long> logWarningForTaskAfterMillis;
    private final Function<ConsumerTask, ConsumerTask> taskMetricsDecorator;
    private final int bufferSize;
    private final TaskExecutionListener taskExecutionListener;
    private ConsumerTask currentTask;
    private TaskOutcome taskOutcome;
    private final AtomicReference<Throwable> processFailure = new AtomicReference<Object>(null);
    private final AtomicReference<Throwable> dispatchFailure = new AtomicReference<Object>(null);
    private CompletableFuture<Boolean> stateChangeFuture;
    private boolean needsInitialization = true;
    private volatile Instant taskDispatchedAt;
    private volatile boolean taskIsRunning = false;
    private ConsumerState currentState;
    private volatile ShutdownReason shutdownReason;
    private volatile ShutdownNotification shutdownNotification;
    private final InternalSubscriber subscriber;
    private final Object lockObject = new Object();
    private Instant lastRequestTime = null;

    public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo, Optional<Long> logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument, TaskExecutionListener taskExecutionListener) {
        this(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, shardConsumerArgument, ConsumerStates.INITIAL_STATE, ShardConsumer.metricsWrappingFunction(shardConsumerArgument.metricsFactory()), 8, taskExecutionListener);
    }

    public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo, Optional<Long> logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument, ConsumerState initialState, Function<ConsumerTask, ConsumerTask> taskMetricsDecorator, int bufferSize, TaskExecutionListener taskExecutionListener) {
        this.recordsPublisher = recordsPublisher;
        this.executorService = executorService;
        this.shardInfo = shardInfo;
        this.shardConsumerArgument = shardConsumerArgument;
        this.logWarningForTaskAfterMillis = logWarningForTaskAfterMillis;
        this.taskExecutionListener = taskExecutionListener;
        this.currentState = initialState;
        this.taskMetricsDecorator = taskMetricsDecorator;
        this.scheduler = Schedulers.from((Executor)executorService);
        this.subscriber = new InternalSubscriber();
        this.bufferSize = bufferSize;
        if (this.shardInfo.isCompleted()) {
            this.markForShutdown(ShutdownReason.SHARD_END);
        }
    }

    private void startSubscriptions() {
        Flowable.fromPublisher((Publisher)this.recordsPublisher).subscribeOn(this.scheduler).observeOn(this.scheduler, true, this.bufferSize).subscribe((Subscriber)this.subscriber);
    }

    private synchronized void handleInput(ProcessRecordsInput input, Subscription subscription) {
        if (this.isShutdownRequested()) {
            subscription.cancel();
            return;
        }
        this.processData(input);
        if (this.taskOutcome == TaskOutcome.END_OF_SHARD) {
            this.markForShutdown(ShutdownReason.SHARD_END);
            subscription.cancel();
            return;
        }
        subscription.request(1L);
    }

    public void executeLifecycle() {
        Throwable t;
        if (this.isShutdown()) {
            return;
        }
        if (this.stateChangeFuture != null && !this.stateChangeFuture.isDone()) {
            return;
        }
        try {
            if (this.isShutdownRequested()) {
                this.stateChangeFuture = this.shutdownComplete();
            } else if (this.needsInitialization) {
                if (this.stateChangeFuture != null && this.stateChangeFuture.get().booleanValue()) {
                    this.subscribe();
                    this.needsInitialization = false;
                }
                this.stateChangeFuture = this.initializeComplete();
            }
        }
        catch (InterruptedException interruptedException) {
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
        if (ConsumerStates.ShardConsumerState.PROCESSING.equals((Object)this.currentState.state()) && (t = this.healthCheck()) instanceof Error) {
            throw (Error)t;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    Throwable healthCheck() {
        this.logNoDataRetrievedAfterTime();
        this.logLongRunningTask();
        Throwable failure = this.processFailure.get();
        if (!this.processFailure.compareAndSet(failure, null) && failure != null) {
            log.error("{}: processFailure was updated while resetting, this shouldn't happen.  Will retry on next health check", (Object)this.shardInfo.shardId());
            return null;
        }
        if (failure != null) {
            String logMessage = String.format("%s: Failure occurred in retrieval.  Restarting data requests", this.shardInfo.shardId());
            if (failure instanceof RetryableRetrievalException) {
                log.debug(logMessage, failure.getCause());
            } else {
                log.warn(logMessage, failure);
            }
            this.startSubscriptions();
            return failure;
        }
        Throwable expectedDispatchFailure = this.dispatchFailure.get();
        if (expectedDispatchFailure != null) {
            if (!this.dispatchFailure.compareAndSet(expectedDispatchFailure, null)) {
                log.info("{}: Unable to reset the dispatch failure, this can happen if the record processor is failing aggressively.", (Object)this.shardInfo.shardId());
                return null;
            }
            log.warn("Exception occurred while dispatching incoming data.  The incoming data has been skipped", expectedDispatchFailure);
            return expectedDispatchFailure;
        }
        Object object = this.lockObject;
        synchronized (object) {
            Instant now;
            Duration timeSinceLastResponse;
            if (this.lastRequestTime != null && (timeSinceLastResponse = Duration.between(this.lastRequestTime, now = Instant.now())).toMillis() > 35000L) {
                log.error("{}: Last request was dispatched at {}, but no response as of {} ({}).  Cancelling subscription, and restarting.", new Object[]{this.shardInfo.shardId(), this.lastRequestTime, now, timeSinceLastResponse});
                if (this.subscriber != null) {
                    this.subscriber.cancel();
                }
                this.lastRequestTime = Instant.now();
                this.startSubscriptions();
            }
        }
        return null;
    }

    Duration taskRunningTime() {
        if (this.taskDispatchedAt != null && this.taskIsRunning) {
            return Duration.between(this.taskDispatchedAt, Instant.now());
        }
        return null;
    }

    String longRunningTaskMessage(Duration taken) {
        if (taken != null) {
            return String.format("Previous %s task still pending for shard %s since %s ago. ", new Object[]{this.currentTask.taskType(), this.shardInfo.shardId(), taken});
        }
        return null;
    }

    private void logNoDataRetrievedAfterTime() {
        this.logWarningForTaskAfterMillis.ifPresent(value -> {
            Instant lastDataArrival = this.subscriber.lastDataArrival;
            if (lastDataArrival != null) {
                Instant now = Instant.now();
                Duration timeSince = Duration.between(this.subscriber.lastDataArrival, now);
                if (timeSince.toMillis() > value) {
                    log.warn("Last time data arrived: {} ({})", (Object)lastDataArrival, (Object)timeSince);
                }
            }
        });
    }

    private void logLongRunningTask() {
        Duration taken = this.taskRunningTime();
        if (taken != null) {
            String message = this.longRunningTaskMessage(taken);
            if (log.isDebugEnabled()) {
                log.debug("{} Not submitting new task.", (Object)message);
            }
            this.logWarningForTaskAfterMillis.ifPresent(value -> {
                if (taken.toMillis() > value) {
                    log.warn(message);
                }
            });
        }
    }

    @VisibleForTesting
    void subscribe() {
        this.startSubscriptions();
    }

    @VisibleForTesting
    synchronized CompletableFuture<Boolean> initializeComplete() {
        if (this.taskOutcome != null) {
            this.updateState(this.taskOutcome);
        }
        if (this.currentState.state() == ConsumerStates.ShardConsumerState.PROCESSING) {
            return CompletableFuture.completedFuture(true);
        }
        return CompletableFuture.supplyAsync(() -> {
            if (this.isShutdownRequested()) {
                throw new IllegalStateException("Shutdown requested while initializing");
            }
            this.executeTask(null);
            if (this.isShutdownRequested()) {
                throw new IllegalStateException("Shutdown requested while initializing");
            }
            return false;
        }, this.executorService);
    }

    @VisibleForTesting
    synchronized CompletableFuture<Boolean> shutdownComplete() {
        if (this.taskOutcome != null) {
            this.updateState(this.taskOutcome);
        } else {
            this.updateState(TaskOutcome.SUCCESSFUL);
        }
        if (this.isShutdown()) {
            return CompletableFuture.completedFuture(true);
        }
        return CompletableFuture.supplyAsync(() -> {
            this.executeTask(null);
            return false;
        });
    }

    private synchronized void processData(ProcessRecordsInput input) {
        this.executeTask(input);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void executeTask(ProcessRecordsInput input) {
        TaskExecutionListenerInput taskExecutionListenerInput = TaskExecutionListenerInput.builder().shardInfo(this.shardInfo).taskType(this.currentState.taskType()).build();
        this.taskExecutionListener.beforeTaskExecution(taskExecutionListenerInput);
        ConsumerTask task = this.currentState.createTask(this.shardConsumerArgument, this, input);
        if (task != null) {
            TaskResult result;
            this.taskDispatchedAt = Instant.now();
            this.currentTask = task;
            this.taskIsRunning = true;
            try {
                result = task.call();
            }
            finally {
                this.taskIsRunning = false;
            }
            this.taskOutcome = this.resultToOutcome(result);
            taskExecutionListenerInput = taskExecutionListenerInput.toBuilder().taskOutcome(this.taskOutcome).build();
        }
        this.taskExecutionListener.afterTaskExecution(taskExecutionListenerInput);
    }

    private TaskOutcome resultToOutcome(TaskResult result) {
        if (result.getException() == null) {
            if (result.isShardEndReached()) {
                return TaskOutcome.END_OF_SHARD;
            }
            return TaskOutcome.SUCCESSFUL;
        }
        this.logTaskException(result);
        return TaskOutcome.FAILURE;
    }

    private synchronized void updateState(TaskOutcome outcome) {
        ConsumerState nextState = this.currentState;
        switch (outcome) {
            case SUCCESSFUL: {
                nextState = this.currentState.successTransition();
                break;
            }
            case END_OF_SHARD: {
                this.markForShutdown(ShutdownReason.SHARD_END);
                break;
            }
            case FAILURE: {
                nextState = this.currentState.failureTransition();
                break;
            }
            default: {
                log.error("No handler for outcome of {}", (Object)outcome.name());
                nextState = this.currentState.failureTransition();
            }
        }
        this.currentState = nextState = this.handleShutdownTransition(outcome, nextState);
    }

    private ConsumerState handleShutdownTransition(TaskOutcome outcome, ConsumerState nextState) {
        if (this.isShutdownRequested() && outcome != TaskOutcome.FAILURE) {
            return this.currentState.shutdownTransition(this.shutdownReason);
        }
        return nextState;
    }

    private void logTaskException(TaskResult taskResult) {
        if (log.isDebugEnabled()) {
            Exception taskException = taskResult.getException();
            if (taskException instanceof BlockedOnParentShardException) {
                log.debug("Shard {} is blocked on completion of parent shard.", (Object)this.shardInfo.shardId());
            } else {
                log.debug("Caught exception running {} task: ", (Object)this.currentTask.taskType(), (Object)taskResult.getException());
            }
        }
    }

    public void gracefulShutdown(ShutdownNotification shutdownNotification) {
        if (this.subscriber != null) {
            this.subscriber.cancel();
        }
        this.shutdownNotification = shutdownNotification;
        this.markForShutdown(ShutdownReason.REQUESTED);
    }

    public boolean leaseLost() {
        log.debug("Shutdown({}): Lease lost triggered.", (Object)this.shardInfo.shardId());
        if (this.subscriber != null) {
            this.subscriber.cancel();
            log.debug("Shutdown({}): Subscriber cancelled.", (Object)this.shardInfo.shardId());
        }
        this.markForShutdown(ShutdownReason.LEASE_LOST);
        return this.isShutdown();
    }

    synchronized void markForShutdown(ShutdownReason reason) {
        if (this.shutdownReason == null || this.shutdownReason.canTransitionTo(reason)) {
            this.shutdownReason = reason;
        }
    }

    public boolean isShutdown() {
        return this.currentState.isTerminal();
    }

    @VisibleForTesting
    public boolean isShutdownRequested() {
        return this.shutdownReason != null;
    }

    private static Function<ConsumerTask, ConsumerTask> metricsWrappingFunction(MetricsFactory metricsFactory) {
        return task -> {
            if (task == null) {
                return null;
            }
            return new MetricsCollectingTaskDecorator((ConsumerTask)task, metricsFactory);
        };
    }

    RecordsPublisher recordsPublisher() {
        return this.recordsPublisher;
    }

    ExecutorService executorService() {
        return this.executorService;
    }

    Scheduler scheduler() {
        return this.scheduler;
    }

    ShardInfo shardInfo() {
        return this.shardInfo;
    }

    ShardConsumerArgument shardConsumerArgument() {
        return this.shardConsumerArgument;
    }

    @NonNull
    Optional<Long> logWarningForTaskAfterMillis() {
        return this.logWarningForTaskAfterMillis;
    }

    Function<ConsumerTask, ConsumerTask> taskMetricsDecorator() {
        return this.taskMetricsDecorator;
    }

    int bufferSize() {
        return this.bufferSize;
    }

    TaskExecutionListener taskExecutionListener() {
        return this.taskExecutionListener;
    }

    ConsumerTask currentTask() {
        return this.currentTask;
    }

    TaskOutcome taskOutcome() {
        return this.taskOutcome;
    }

    AtomicReference<Throwable> processFailure() {
        return this.processFailure;
    }

    AtomicReference<Throwable> dispatchFailure() {
        return this.dispatchFailure;
    }

    CompletableFuture<Boolean> stateChangeFuture() {
        return this.stateChangeFuture;
    }

    boolean needsInitialization() {
        return this.needsInitialization;
    }

    Instant taskDispatchedAt() {
        return this.taskDispatchedAt;
    }

    boolean taskIsRunning() {
        return this.taskIsRunning;
    }

    ConsumerState currentState() {
        return this.currentState;
    }

    ShutdownNotification shutdownNotification() {
        return this.shutdownNotification;
    }

    InternalSubscriber subscriber() {
        return this.subscriber;
    }

    Object lockObject() {
        return this.lockObject;
    }

    Instant lastRequestTime() {
        return this.lastRequestTime;
    }

    public ShutdownReason shutdownReason() {
        return this.shutdownReason;
    }

    private class InternalSubscriber
    implements Subscriber<ProcessRecordsInput> {
        private Subscription subscription;
        private volatile Instant lastDataArrival;

        private InternalSubscriber() {
        }

        public void onSubscribe(Subscription s) {
            this.subscription = s;
            this.subscription.request(1L);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onNext(ProcessRecordsInput input) {
            Object object;
            try {
                object = ShardConsumer.this.lockObject;
                synchronized (object) {
                    ShardConsumer.this.lastRequestTime = null;
                }
                this.lastDataArrival = Instant.now();
                ShardConsumer.this.handleInput(input.toBuilder().cacheExitTime(Instant.now()).build(), this.subscription);
            }
            catch (Throwable t) {
                log.warn("{}: Caught exception from handleInput", (Object)ShardConsumer.this.shardInfo.shardId(), (Object)t);
                ShardConsumer.this.dispatchFailure.set(t);
            }
            finally {
                this.subscription.request(1L);
                object = ShardConsumer.this.lockObject;
                synchronized (object) {
                    ShardConsumer.this.lastRequestTime = Instant.now();
                }
            }
        }

        public void onError(Throwable t) {
            log.warn("{}: onError().  Cancelling subscription, and marking self as failed.", (Object)ShardConsumer.this.shardInfo.shardId(), (Object)t);
            this.subscription.cancel();
            ShardConsumer.this.processFailure.set(t);
        }

        public void onComplete() {
            log.debug("{}: onComplete(): Received onComplete.  Activity should be triggered externally", (Object)ShardConsumer.this.shardInfo.shardId());
        }

        public void cancel() {
            if (this.subscription != null) {
                this.subscription.cancel();
            }
        }
    }
}

