/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.lifecycle;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;
import lombok.NonNull;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.ConsumerState;
import software.amazon.kinesis.lifecycle.ConsumerStates;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
import software.amazon.kinesis.lifecycle.ShardConsumerSubscriber;
import software.amazon.kinesis.lifecycle.ShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskExecutionListener;
import software.amazon.kinesis.lifecycle.TaskOutcome;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher;

@KinesisClientInternalApi
public class ShardConsumer {
    private static final Logger log = LoggerFactory.getLogger(ShardConsumer.class);
    public static final int MAX_TIME_BETWEEN_REQUEST_RESPONSE = 60000;
    private final RecordsPublisher recordsPublisher;
    private final ExecutorService executorService;
    private final ShardInfo shardInfo;
    private final ShardConsumerArgument shardConsumerArgument;
    @NonNull
    private final Optional<Long> logWarningForTaskAfterMillis;
    private final Function<ConsumerTask, ConsumerTask> taskMetricsDecorator;
    private final int bufferSize;
    private final TaskExecutionListener taskExecutionListener;
    private final String streamIdentifier;
    private ConsumerTask currentTask;
    private TaskOutcome taskOutcome;
    private CompletableFuture<Boolean> stateChangeFuture;
    private boolean needsInitialization = true;
    private volatile Instant taskDispatchedAt;
    private volatile boolean taskIsRunning = false;
    private ConsumerState currentState;
    private final Object shutdownLock = new Object();
    private volatile ShutdownReason shutdownReason;
    private volatile ShutdownNotification shutdownNotification;
    private final ShardConsumerSubscriber subscriber;
    private ProcessRecordsInput shardEndProcessRecordsInput;

    @Deprecated
    public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo, Optional<Long> logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument, TaskExecutionListener taskExecutionListener) {
        this(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, shardConsumerArgument, ConsumerStates.INITIAL_STATE, ShardConsumer.metricsWrappingFunction(shardConsumerArgument.metricsFactory()), 8, taskExecutionListener, 0);
    }

    public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo, Optional<Long> logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument, TaskExecutionListener taskExecutionListener, int readTimeoutsToIgnoreBeforeWarning) {
        this(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, shardConsumerArgument, ConsumerStates.INITIAL_STATE, ShardConsumer.metricsWrappingFunction(shardConsumerArgument.metricsFactory()), 8, taskExecutionListener, readTimeoutsToIgnoreBeforeWarning);
    }

    @Deprecated
    public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo, Optional<Long> logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument, ConsumerState initialState, Function<ConsumerTask, ConsumerTask> taskMetricsDecorator, int bufferSize, TaskExecutionListener taskExecutionListener) {
        this(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, shardConsumerArgument, initialState, taskMetricsDecorator, bufferSize, taskExecutionListener, 0);
    }

    public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo, Optional<Long> logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument, ConsumerState initialState, Function<ConsumerTask, ConsumerTask> taskMetricsDecorator, int bufferSize, TaskExecutionListener taskExecutionListener, int readTimeoutsToIgnoreBeforeWarning) {
        this.recordsPublisher = recordsPublisher;
        this.executorService = executorService;
        this.shardInfo = shardInfo;
        this.streamIdentifier = shardInfo.streamIdentifierSerOpt().orElse("single_stream_mode");
        this.shardConsumerArgument = shardConsumerArgument;
        this.logWarningForTaskAfterMillis = logWarningForTaskAfterMillis;
        this.taskExecutionListener = taskExecutionListener;
        this.currentState = initialState;
        this.taskMetricsDecorator = taskMetricsDecorator;
        this.subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, this, readTimeoutsToIgnoreBeforeWarning);
        this.bufferSize = bufferSize;
        if (this.shardInfo.isCompleted()) {
            this.markForShutdown(ShutdownReason.SHARD_END);
        }
    }

    synchronized void handleInput(ProcessRecordsInput input, Subscription subscription) {
        if (this.isShutdownRequested()) {
            subscription.cancel();
            return;
        }
        this.processData(input);
        if (this.taskOutcome == TaskOutcome.END_OF_SHARD) {
            this.markForShutdown(ShutdownReason.SHARD_END);
            this.shardEndProcessRecordsInput = input;
            subscription.cancel();
            return;
        }
        subscription.request(1L);
    }

    public void executeLifecycle() {
        Throwable t;
        if (this.isShutdown()) {
            return;
        }
        if (this.stateChangeFuture != null && !this.stateChangeFuture.isDone()) {
            return;
        }
        try {
            if (this.isShutdownRequested()) {
                this.stateChangeFuture = this.shutdownComplete();
            } else if (this.needsInitialization) {
                if (this.stateChangeFuture != null && this.stateChangeFuture.get().booleanValue()) {
                    this.subscribe();
                    this.needsInitialization = false;
                }
                this.stateChangeFuture = this.initializeComplete();
            }
        }
        catch (InterruptedException interruptedException) {
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
        catch (RejectedExecutionException e) {
            this.taskOutcome = TaskOutcome.FAILURE;
        }
        if (ConsumerStates.ShardConsumerState.PROCESSING.equals((Object)this.currentState.state()) && (t = this.healthCheck()) instanceof Error) {
            throw (Error)t;
        }
    }

    @VisibleForTesting
    Throwable healthCheck() {
        this.logNoDataRetrievedAfterTime();
        this.logLongRunningTask();
        Throwable failure = this.subscriber.healthCheck(60000L);
        if (failure != null) {
            return failure;
        }
        Throwable dispatchFailure = this.subscriber.getAndResetDispatchFailure();
        if (dispatchFailure != null) {
            log.warn("{} : Exception occurred while dispatching incoming data.  The incoming data has been skipped", (Object)this.streamIdentifier, (Object)dispatchFailure);
            return dispatchFailure;
        }
        return null;
    }

    Duration taskRunningTime() {
        if (this.taskDispatchedAt != null && this.taskIsRunning) {
            return Duration.between(this.taskDispatchedAt, Instant.now());
        }
        return null;
    }

    String longRunningTaskMessage(Duration taken) {
        if (taken != null) {
            return String.format("Previous %s task still pending for shard %s since %s ago. ", new Object[]{this.currentTask.taskType(), this.shardInfo.shardId(), taken});
        }
        return null;
    }

    private void logNoDataRetrievedAfterTime() {
        this.logWarningForTaskAfterMillis.ifPresent(value -> {
            Instant lastDataArrival = this.subscriber.lastDataArrival();
            if (lastDataArrival != null) {
                Instant now = Instant.now();
                Duration timeSince = Duration.between(this.subscriber.lastDataArrival(), now);
                if (timeSince.toMillis() > value) {
                    log.warn("{} : Last time data arrived: {} ({})", new Object[]{this.streamIdentifier, lastDataArrival, timeSince});
                }
            }
        });
    }

    private void logLongRunningTask() {
        Duration taken = this.taskRunningTime();
        if (taken != null) {
            String message = this.longRunningTaskMessage(taken);
            if (log.isDebugEnabled()) {
                log.debug("{} : {} Not submitting new task.", (Object)this.streamIdentifier, (Object)message);
            }
            this.logWarningForTaskAfterMillis.ifPresent(value -> {
                if (taken.toMillis() > value) {
                    log.warn("{} : {}", (Object)this.streamIdentifier, (Object)message);
                }
            });
        }
    }

    @VisibleForTesting
    void subscribe() {
        this.subscriber.startSubscriptions();
    }

    @VisibleForTesting
    synchronized CompletableFuture<Boolean> initializeComplete() {
        if (this.taskOutcome != null) {
            this.updateState(this.taskOutcome);
        }
        if (this.currentState.state() == ConsumerStates.ShardConsumerState.PROCESSING) {
            return CompletableFuture.completedFuture(true);
        }
        return CompletableFuture.supplyAsync(() -> {
            if (this.isShutdownRequested()) {
                throw new IllegalStateException("Shutdown requested while initializing");
            }
            this.executeTask(null);
            if (this.isShutdownRequested()) {
                throw new IllegalStateException("Shutdown requested while initializing");
            }
            return false;
        }, this.executorService);
    }

    @VisibleForTesting
    CompletableFuture<Boolean> shutdownComplete() {
        return CompletableFuture.supplyAsync(() -> {
            ShardConsumer shardConsumer = this;
            synchronized (shardConsumer) {
                if (this.taskOutcome != null) {
                    this.updateState(this.taskOutcome);
                } else {
                    this.updateState(TaskOutcome.SUCCESSFUL);
                }
                if (this.isShutdown()) {
                    return true;
                }
                this.executeTask(this.shardEndProcessRecordsInput);
                return false;
            }
        }, this.executorService);
    }

    private synchronized void processData(ProcessRecordsInput input) {
        this.executeTask(input);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void executeTask(ProcessRecordsInput input) {
        TaskExecutionListenerInput taskExecutionListenerInput = TaskExecutionListenerInput.builder().shardInfo(this.shardInfo).taskType(this.currentState.taskType()).build();
        this.taskExecutionListener.beforeTaskExecution(taskExecutionListenerInput);
        ConsumerTask task = this.currentState.createTask(this.shardConsumerArgument, this, input);
        if (task != null) {
            TaskResult result;
            this.taskDispatchedAt = Instant.now();
            this.currentTask = task;
            this.taskIsRunning = true;
            try {
                result = task.call();
            }
            finally {
                this.taskIsRunning = false;
            }
            this.taskOutcome = this.resultToOutcome(result);
            taskExecutionListenerInput = taskExecutionListenerInput.toBuilder().taskOutcome(this.taskOutcome).build();
        }
        this.taskExecutionListener.afterTaskExecution(taskExecutionListenerInput);
    }

    private TaskOutcome resultToOutcome(TaskResult result) {
        if (result.getException() == null) {
            if (result.isShardEndReached()) {
                return TaskOutcome.END_OF_SHARD;
            }
            return TaskOutcome.SUCCESSFUL;
        }
        this.logTaskException(result);
        return TaskOutcome.FAILURE;
    }

    private synchronized void updateState(TaskOutcome outcome) {
        ConsumerState nextState = this.currentState;
        switch (outcome) {
            case SUCCESSFUL: {
                nextState = this.currentState.successTransition();
                break;
            }
            case END_OF_SHARD: {
                this.markForShutdown(ShutdownReason.SHARD_END);
                break;
            }
            case FAILURE: {
                nextState = this.currentState.failureTransition();
                break;
            }
            default: {
                log.error("{} : No handler for outcome of {}", (Object)this.streamIdentifier, (Object)outcome.name());
                nextState = this.currentState.failureTransition();
            }
        }
        this.currentState = nextState = this.handleShutdownTransition(outcome, nextState);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ConsumerState handleShutdownTransition(TaskOutcome outcome, ConsumerState nextState) {
        Object object = this.shutdownLock;
        synchronized (object) {
            if (this.isShutdownRequested() && outcome != TaskOutcome.FAILURE) {
                return this.currentState.shutdownTransition(this.shutdownReason);
            }
            return nextState;
        }
    }

    private void logTaskException(TaskResult taskResult) {
        if (log.isDebugEnabled()) {
            Exception taskException = taskResult.getException();
            if (taskException instanceof BlockedOnParentShardException) {
                log.debug("{} : Shard {} is blocked on completion of parent shard.", (Object)this.streamIdentifier, (Object)this.shardInfo.shardId());
            } else {
                log.debug("{} : Caught exception running {} task: ", new Object[]{this.streamIdentifier, this.currentTask.taskType(), taskResult.getException()});
            }
        }
    }

    public void gracefulShutdown(ShutdownNotification shutdownNotification) {
        if (this.subscriber != null) {
            this.subscriber.cancel();
        }
        this.shutdownNotification = shutdownNotification;
        this.markForShutdown(ShutdownReason.REQUESTED);
    }

    public boolean leaseLost() {
        log.debug("{} : Shutdown({}): Lease lost triggered.", (Object)this.streamIdentifier, (Object)this.shardInfo.shardId());
        if (this.subscriber != null) {
            this.subscriber.cancel();
            log.debug("{} : Shutdown({}): Subscriber cancelled.", (Object)this.streamIdentifier, (Object)this.shardInfo.shardId());
        }
        this.markForShutdown(ShutdownReason.LEASE_LOST);
        return this.isShutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void markForShutdown(ShutdownReason reason) {
        Object object = this.shutdownLock;
        synchronized (object) {
            if (this.shutdownReason == null || this.shutdownReason.canTransitionTo(reason)) {
                this.shutdownReason = reason;
            }
        }
    }

    public boolean isShutdown() {
        return this.currentState.isTerminal();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public boolean isShutdownRequested() {
        Object object = this.shutdownLock;
        synchronized (object) {
            return this.shutdownReason != null;
        }
    }

    private static Function<ConsumerTask, ConsumerTask> metricsWrappingFunction(MetricsFactory metricsFactory) {
        return task -> {
            if (task == null) {
                return null;
            }
            return new MetricsCollectingTaskDecorator((ConsumerTask)task, metricsFactory);
        };
    }

    RecordsPublisher recordsPublisher() {
        return this.recordsPublisher;
    }

    ExecutorService executorService() {
        return this.executorService;
    }

    ShardInfo shardInfo() {
        return this.shardInfo;
    }

    ShardConsumerArgument shardConsumerArgument() {
        return this.shardConsumerArgument;
    }

    @NonNull
    Optional<Long> logWarningForTaskAfterMillis() {
        return this.logWarningForTaskAfterMillis;
    }

    Function<ConsumerTask, ConsumerTask> taskMetricsDecorator() {
        return this.taskMetricsDecorator;
    }

    int bufferSize() {
        return this.bufferSize;
    }

    TaskExecutionListener taskExecutionListener() {
        return this.taskExecutionListener;
    }

    String streamIdentifier() {
        return this.streamIdentifier;
    }

    ConsumerTask currentTask() {
        return this.currentTask;
    }

    TaskOutcome taskOutcome() {
        return this.taskOutcome;
    }

    CompletableFuture<Boolean> stateChangeFuture() {
        return this.stateChangeFuture;
    }

    boolean needsInitialization() {
        return this.needsInitialization;
    }

    Instant taskDispatchedAt() {
        return this.taskDispatchedAt;
    }

    boolean taskIsRunning() {
        return this.taskIsRunning;
    }

    ConsumerState currentState() {
        return this.currentState;
    }

    Object shutdownLock() {
        return this.shutdownLock;
    }

    ShutdownNotification shutdownNotification() {
        return this.shutdownNotification;
    }

    ShardConsumerSubscriber subscriber() {
        return this.subscriber;
    }

    ProcessRecordsInput shardEndProcessRecordsInput() {
        return this.shardEndProcessRecordsInput;
    }

    public ShutdownReason shutdownReason() {
        return this.shutdownReason;
    }
}

