/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.lifecycle;

import com.google.common.annotations.VisibleForTesting;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;

class ShardConsumerSubscriber
implements Subscriber<RecordsRetrieved> {
    private static final Logger log = LoggerFactory.getLogger(ShardConsumerSubscriber.class);
    private final RecordsPublisher recordsPublisher;
    private final Scheduler scheduler;
    private final int bufferSize;
    private final ShardConsumer shardConsumer;
    private final int readTimeoutsToIgnoreBeforeWarning;
    private volatile int readTimeoutSinceLastRead = 0;
    @VisibleForTesting
    final Object lockObject = new Object();
    private Instant lastRequestTime = null;
    private RecordsRetrieved lastAccepted = null;
    private Subscription subscription;
    private volatile Instant lastDataArrival;
    private volatile Throwable dispatchFailure;
    private volatile Throwable retrievalFailure;

    @Deprecated
    ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize, ShardConsumer shardConsumer) {
        this(recordsPublisher, executorService, bufferSize, shardConsumer, 0);
    }

    ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize, ShardConsumer shardConsumer, int readTimeoutsToIgnoreBeforeWarning) {
        this.recordsPublisher = recordsPublisher;
        this.scheduler = Schedulers.from((Executor)executorService);
        this.bufferSize = bufferSize;
        this.shardConsumer = shardConsumer;
        this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void startSubscriptions() {
        Object object = this.lockObject;
        synchronized (object) {
            this.lastRequestTime = Instant.now();
            if (this.lastAccepted != null) {
                this.recordsPublisher.restartFrom(this.lastAccepted);
            }
            Flowable.fromPublisher((Publisher)this.recordsPublisher).subscribeOn(this.scheduler).observeOn(this.scheduler, true, this.bufferSize).subscribe((Subscriber)new ShardConsumerNotifyingSubscriber(this, this.recordsPublisher));
        }
    }

    Throwable healthCheck(long maxTimeBetweenRequests) {
        Throwable result = this.restartIfFailed();
        if (result == null) {
            this.restartIfRequestTimerExpired(maxTimeBetweenRequests);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Throwable getAndResetDispatchFailure() {
        Object object = this.lockObject;
        synchronized (object) {
            Throwable failure = this.dispatchFailure;
            this.dispatchFailure = null;
            return failure;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Throwable restartIfFailed() {
        Throwable oldFailure = null;
        if (this.retrievalFailure != null) {
            Object object = this.lockObject;
            synchronized (object) {
                String logMessage = String.format("%s: Failure occurred in retrieval.  Restarting data requests", this.shardConsumer.shardInfo().shardId());
                if (this.retrievalFailure instanceof RetryableRetrievalException) {
                    log.debug(logMessage, this.retrievalFailure.getCause());
                } else {
                    log.warn(logMessage, this.retrievalFailure);
                }
                oldFailure = this.retrievalFailure;
                this.retrievalFailure = null;
            }
            this.startSubscriptions();
        }
        return oldFailure;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restartIfRequestTimerExpired(long maxTimeBetweenRequests) {
        Object object = this.lockObject;
        synchronized (object) {
            Instant now;
            Duration timeSinceLastResponse;
            if (this.lastRequestTime != null && (timeSinceLastResponse = Duration.between(this.lastRequestTime, now = Instant.now())).toMillis() > maxTimeBetweenRequests) {
                log.error("{}: Last request was dispatched at {}, but no response as of {} ({}).  Cancelling subscription, and restarting.", new Object[]{this.shardConsumer.shardInfo().shardId(), this.lastRequestTime, now, timeSinceLastResponse});
                this.cancel();
                this.startSubscriptions();
            }
        }
    }

    public void onSubscribe(Subscription s) {
        this.subscription = s;
        this.subscription.request(1L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(RecordsRetrieved input) {
        Object object;
        try {
            object = this.lockObject;
            synchronized (object) {
                this.lastRequestTime = null;
            }
            this.lastDataArrival = Instant.now();
            this.shardConsumer.handleInput(input.processRecordsInput().toBuilder().cacheExitTime(Instant.now()).build(), this.subscription);
        }
        catch (Throwable t) {
            log.warn("{}: Caught exception from handleInput", (Object)this.shardConsumer.shardInfo().shardId(), (Object)t);
            Object object2 = this.lockObject;
            synchronized (object2) {
                this.dispatchFailure = t;
            }
        }
        finally {
            this.subscription.request(1L);
            object = this.lockObject;
            synchronized (object) {
                this.lastAccepted = input;
                this.lastRequestTime = Instant.now();
            }
        }
        this.readTimeoutSinceLastRead = 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onError(Throwable t) {
        Object object = this.lockObject;
        synchronized (object) {
            if (t instanceof RetryableRetrievalException && t.getMessage().contains("ReadTimeout")) {
                ++this.readTimeoutSinceLastRead;
                if (this.readTimeoutSinceLastRead > this.readTimeoutsToIgnoreBeforeWarning) {
                    this.logOnErrorReadTimeoutWarning(t);
                }
            } else {
                this.logOnErrorWarning(t);
            }
            this.subscription.cancel();
            this.retrievalFailure = t;
        }
    }

    protected void logOnErrorWarning(Throwable t) {
        log.warn("{}: onError().  Cancelling subscription, and marking self as failed. KCL will recreate the subscription as neccessary to continue processing.", (Object)this.shardConsumer.shardInfo().shardId(), (Object)t);
    }

    protected void logOnErrorReadTimeoutWarning(Throwable t) {
        log.warn("{}: onError().  Cancelling subscription, and marking self as failed. KCL will recreate the subscription as neccessary to continue processing. If you are seeing this warning frequently consider increasing the SDK timeouts by providing an OverrideConfiguration to the kinesis client. Alternatively youcan configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppressintermittant ReadTimeout warnings.", (Object)this.shardConsumer.shardInfo().shardId(), (Object)t);
    }

    public void onComplete() {
        log.debug("{}: onComplete(): Received onComplete.  Activity should be triggered externally", (Object)this.shardConsumer.shardInfo().shardId());
    }

    public void cancel() {
        if (this.subscription != null) {
            this.subscription.cancel();
        }
    }

    public Instant lastDataArrival() {
        return this.lastDataArrival;
    }

    public Throwable dispatchFailure() {
        return this.dispatchFailure;
    }

    Throwable retrievalFailure() {
        return this.retrievalFailure;
    }
}

