/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.retrieval.polling;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.retrieval.polling.KinesisDataFetcher;

@KinesisClientInternalApi
public class PrefetchRecordsPublisher
implements RecordsPublisher {
    private static final Logger log = LoggerFactory.getLogger(PrefetchRecordsPublisher.class);
    private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
    LinkedBlockingQueue<ProcessRecordsInput> getRecordsResultQueue;
    private int maxPendingProcessRecordsInput;
    private int maxByteSize;
    private int maxRecordsCount;
    private final int maxRecordsPerCall;
    private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
    private final ExecutorService executorService;
    private final MetricsFactory metricsFactory;
    private final long idleMillisBetweenCalls;
    private Instant lastSuccessfulCall;
    private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
    private PrefetchCounters prefetchCounters;
    private boolean started = false;
    private final String operation;
    private final KinesisDataFetcher dataFetcher;
    private final String shardId;
    private Subscriber<? super ProcessRecordsInput> subscriber;
    private final AtomicLong requestedResponses = new AtomicLong(0L);

    public PrefetchRecordsPublisher(int maxPendingProcessRecordsInput, int maxByteSize, int maxRecordsCount, int maxRecordsPerCall, @NonNull GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull ExecutorService executorService, long idleMillisBetweenCalls, @NonNull MetricsFactory metricsFactory, @NonNull String operation, @NonNull String shardId) {
        if (getRecordsRetrievalStrategy == null) {
            throw new NullPointerException("getRecordsRetrievalStrategy");
        }
        if (executorService == null) {
            throw new NullPointerException("executorService");
        }
        if (metricsFactory == null) {
            throw new NullPointerException("metricsFactory");
        }
        if (operation == null) {
            throw new NullPointerException("operation");
        }
        if (shardId == null) {
            throw new NullPointerException("shardId");
        }
        this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
        this.maxRecordsPerCall = maxRecordsPerCall;
        this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
        this.maxByteSize = maxByteSize;
        this.maxRecordsCount = maxRecordsCount;
        this.getRecordsResultQueue = new LinkedBlockingQueue(this.maxPendingProcessRecordsInput);
        this.prefetchCounters = new PrefetchCounters();
        this.executorService = executorService;
        this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
        this.idleMillisBetweenCalls = idleMillisBetweenCalls;
        this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
        Validate.notEmpty((CharSequence)operation, (String)"Operation cannot be empty", (Object[])new Object[0]);
        this.operation = operation;
        this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher();
        this.shardId = shardId;
    }

    @Override
    public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("ExecutorService has been shutdown.");
        }
        this.dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);
        if (!this.started) {
            log.info("Starting prefetching thread.");
            this.executorService.execute(this.defaultGetRecordsCacheDaemon);
        }
        this.started = true;
    }

    ProcessRecordsInput getNextResult() {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
        }
        if (!this.started) {
            throw new IllegalStateException("Cache has not been initialized, make sure to call start.");
        }
        ProcessRecordsInput result = null;
        try {
            result = this.getRecordsResultQueue.take().toBuilder().cacheExitTime(Instant.now()).build();
            this.prefetchCounters.removed(result);
            this.requestedResponses.decrementAndGet();
        }
        catch (InterruptedException e) {
            log.error("Interrupted while getting records from the cache", (Throwable)e);
        }
        return result;
    }

    @Override
    public void shutdown() {
        this.defaultGetRecordsCacheDaemon.isShutdown = true;
        this.executorService.shutdownNow();
        this.started = false;
    }

    public void subscribe(Subscriber<? super ProcessRecordsInput> s) {
        this.subscriber = s;
        this.subscriber.onSubscribe(new Subscription(){

            public void request(long n) {
                PrefetchRecordsPublisher.this.requestedResponses.addAndGet(n);
                PrefetchRecordsPublisher.this.drainQueueForRequests();
            }

            public void cancel() {
                PrefetchRecordsPublisher.this.requestedResponses.set(0L);
            }
        });
    }

    private void addArrivedRecordsInput(ProcessRecordsInput processRecordsInput) throws InterruptedException {
        this.getRecordsResultQueue.put(processRecordsInput);
        this.prefetchCounters.added(processRecordsInput);
    }

    private synchronized void drainQueueForRequests() {
        while (this.requestedResponses.get() > 0L && !this.getRecordsResultQueue.isEmpty()) {
            this.subscriber.onNext((Object)this.getNextResult());
        }
    }

    private class PrefetchCounters {
        private long size = 0L;
        private long byteSize = 0L;

        private PrefetchCounters() {
        }

        public synchronized void added(ProcessRecordsInput result) {
            this.size += this.getSize(result);
            this.byteSize += this.getByteSize(result);
        }

        public synchronized void removed(ProcessRecordsInput result) {
            this.size -= this.getSize(result);
            this.byteSize -= this.getByteSize(result);
            this.notifyAll();
        }

        private long getSize(ProcessRecordsInput result) {
            return result.records().size();
        }

        private long getByteSize(ProcessRecordsInput result) {
            return result.records().stream().mapToLong(record -> record.data().limit()).sum();
        }

        public synchronized void waitForConsumer() throws InterruptedException {
            if (!this.shouldGetNewRecords()) {
                log.debug("Queue is full waiting for consumer for {} ms", (Object)PrefetchRecordsPublisher.this.idleMillisBetweenCalls);
                this.wait(PrefetchRecordsPublisher.this.idleMillisBetweenCalls);
            }
        }

        public synchronized boolean shouldGetNewRecords() {
            if (log.isDebugEnabled()) {
                log.debug("Current Prefetch Counter States: {}", (Object)this.toString());
            }
            return this.size < (long)PrefetchRecordsPublisher.this.maxRecordsCount && this.byteSize < (long)PrefetchRecordsPublisher.this.maxByteSize;
        }

        public String toString() {
            return String.format("{ Requests: %d, Records: %d, Bytes: %d }", PrefetchRecordsPublisher.this.getRecordsResultQueue.size(), this.size, this.byteSize);
        }
    }

    private class DefaultGetRecordsCacheDaemon
    implements Runnable {
        volatile boolean isShutdown = false;

        private DefaultGetRecordsCacheDaemon() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!this.isShutdown) {
                if (Thread.currentThread().isInterrupted()) {
                    log.warn("Prefetch thread was interrupted.");
                    break;
                }
                MetricsScope scope = MetricsUtil.createMetricsWithOperation(PrefetchRecordsPublisher.this.metricsFactory, PrefetchRecordsPublisher.this.operation);
                if (PrefetchRecordsPublisher.this.prefetchCounters.shouldGetNewRecords()) {
                    try {
                        this.sleepBeforeNextCall();
                        GetRecordsResponse getRecordsResult = PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.getRecords(PrefetchRecordsPublisher.this.maxRecordsPerCall);
                        PrefetchRecordsPublisher.this.lastSuccessfulCall = Instant.now();
                        List<KinesisClientRecord> records = getRecordsResult.records().stream().map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
                        ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).millisBehindLatest(getRecordsResult.millisBehindLatest()).cacheEntryTime(PrefetchRecordsPublisher.this.lastSuccessfulCall).isAtShardEnd(PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.getDataFetcher().isShardEndReached()).build();
                        PrefetchRecordsPublisher.this.addArrivedRecordsInput(processRecordsInput);
                        PrefetchRecordsPublisher.this.drainQueueForRequests();
                        continue;
                    }
                    catch (InterruptedException e) {
                        log.info("Thread was interrupted, indicating shutdown was called on the cache.");
                        continue;
                    }
                    catch (ExpiredIteratorException e) {
                        log.info("ShardId {}: records threw ExpiredIteratorException - restarting after greatest seqNum passed to customer", (Object)PrefetchRecordsPublisher.this.shardId, (Object)e);
                        scope.addData(PrefetchRecordsPublisher.EXPIRED_ITERATOR_METRIC, 1.0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
                        PrefetchRecordsPublisher.this.dataFetcher.restartIterator();
                        continue;
                    }
                    catch (SdkClientException e) {
                        log.error("Exception thrown while fetching records from Kinesis", (Throwable)e);
                        continue;
                    }
                    catch (Throwable e) {
                        log.error("Unexpected exception was thrown. This could probably be an issue or a bug. Please search for the exception/error online to check what is going on. If the issue persists or is a recurring problem, feel free to open an issue on, https://github.com/awslabs/amazon-kinesis-client.", e);
                        continue;
                    }
                    finally {
                        MetricsUtil.endScope(scope);
                        continue;
                    }
                }
                try {
                    PrefetchRecordsPublisher.this.prefetchCounters.waitForConsumer();
                }
                catch (InterruptedException ie) {
                    log.info("Thread was interrupted while waiting for the consumer.  Shutdown has probably been started");
                }
            }
            this.callShutdownOnStrategy();
        }

        private void callShutdownOnStrategy() {
            if (!PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.isShutdown()) {
                PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.shutdown();
            }
        }

        private void sleepBeforeNextCall() throws InterruptedException {
            if (PrefetchRecordsPublisher.this.lastSuccessfulCall == null) {
                return;
            }
            long timeSinceLastCall = Duration.between(PrefetchRecordsPublisher.this.lastSuccessfulCall, Instant.now()).abs().toMillis();
            if (timeSinceLastCall < PrefetchRecordsPublisher.this.idleMillisBetweenCalls) {
                Thread.sleep(PrefetchRecordsPublisher.this.idleMillisBetweenCalls - timeSinceLastCall);
            }
        }
    }
}

