/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.retrieval.polling;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.retrieval.polling.KinesisDataFetcher;

@KinesisClientInternalApi
public class PrefetchRecordsPublisher
implements RecordsPublisher {
    private static final Logger log = LoggerFactory.getLogger(PrefetchRecordsPublisher.class);
    private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
    @VisibleForTesting
    LinkedBlockingQueue<PrefetchRecordsRetrieved> getRecordsResultQueue;
    private int maxPendingProcessRecordsInput;
    private int maxByteSize;
    private int maxRecordsCount;
    private final int maxRecordsPerCall;
    private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
    private final ExecutorService executorService;
    private final MetricsFactory metricsFactory;
    private final long idleMillisBetweenCalls;
    private Instant lastSuccessfulCall;
    private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
    private PrefetchCounters prefetchCounters;
    private boolean started = false;
    private final String operation;
    private final KinesisDataFetcher dataFetcher;
    private final String shardId;
    private Subscriber<? super RecordsRetrieved> subscriber;
    private final AtomicLong requestedResponses = new AtomicLong(0L);
    private String highestSequenceNumber;
    private InitialPositionInStreamExtended initialPositionInStreamExtended;
    private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock();
    private boolean wasReset = false;

    public PrefetchRecordsPublisher(int maxPendingProcessRecordsInput, int maxByteSize, int maxRecordsCount, int maxRecordsPerCall, @NonNull GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull ExecutorService executorService, long idleMillisBetweenCalls, @NonNull MetricsFactory metricsFactory, @NonNull String operation, @NonNull String shardId) {
        if (getRecordsRetrievalStrategy == null) {
            throw new NullPointerException("getRecordsRetrievalStrategy");
        }
        if (executorService == null) {
            throw new NullPointerException("executorService");
        }
        if (metricsFactory == null) {
            throw new NullPointerException("metricsFactory");
        }
        if (operation == null) {
            throw new NullPointerException("operation");
        }
        if (shardId == null) {
            throw new NullPointerException("shardId");
        }
        this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
        this.maxRecordsPerCall = maxRecordsPerCall;
        this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
        this.maxByteSize = maxByteSize;
        this.maxRecordsCount = maxRecordsCount;
        this.getRecordsResultQueue = new LinkedBlockingQueue(this.maxPendingProcessRecordsInput);
        this.prefetchCounters = new PrefetchCounters();
        this.executorService = executorService;
        this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
        this.idleMillisBetweenCalls = idleMillisBetweenCalls;
        this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
        Validate.notEmpty((CharSequence)operation, (String)"Operation cannot be empty", (Object[])new Object[0]);
        this.operation = operation;
        this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher();
        this.shardId = shardId;
    }

    @Override
    public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("ExecutorService has been shutdown.");
        }
        this.initialPositionInStreamExtended = initialPositionInStreamExtended;
        this.highestSequenceNumber = extendedSequenceNumber.sequenceNumber();
        this.dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);
        if (!this.started) {
            log.info("Starting prefetching thread.");
            this.executorService.execute(this.defaultGetRecordsCacheDaemon);
        }
        this.started = true;
    }

    RecordsRetrieved getNextResult() {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
        }
        if (!this.started) {
            throw new IllegalStateException("Cache has not been initialized, make sure to call start.");
        }
        PrefetchRecordsRetrieved result = null;
        try {
            result = this.getRecordsResultQueue.take().prepareForPublish();
            this.prefetchCounters.removed(result.processRecordsInput);
            this.requestedResponses.decrementAndGet();
        }
        catch (InterruptedException e) {
            log.error("Interrupted while getting records from the cache", (Throwable)e);
        }
        return result;
    }

    @Override
    public void shutdown() {
        this.defaultGetRecordsCacheDaemon.isShutdown = true;
        this.executorService.shutdownNow();
        this.started = false;
    }

    @Override
    public void restartFrom(RecordsRetrieved recordsRetrieved) {
        if (!(recordsRetrieved instanceof PrefetchRecordsRetrieved)) {
            throw new IllegalArgumentException("Provided RecordsRetrieved was not produced by the PrefetchRecordsPublisher");
        }
        PrefetchRecordsRetrieved prefetchRecordsRetrieved = (PrefetchRecordsRetrieved)recordsRetrieved;
        this.resetLock.writeLock().lock();
        try {
            this.getRecordsResultQueue.clear();
            this.prefetchCounters.reset();
            this.highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber();
            this.dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), this.highestSequenceNumber, this.initialPositionInStreamExtended);
            this.wasReset = true;
        }
        finally {
            this.resetLock.writeLock().unlock();
        }
    }

    public void subscribe(Subscriber<? super RecordsRetrieved> s) {
        this.subscriber = s;
        this.subscriber.onSubscribe(new Subscription(){

            public void request(long n) {
                PrefetchRecordsPublisher.this.requestedResponses.addAndGet(n);
                PrefetchRecordsPublisher.this.drainQueueForRequests();
            }

            public void cancel() {
                PrefetchRecordsPublisher.this.requestedResponses.set(0L);
            }
        });
    }

    private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException {
        this.wasReset = false;
        while (!this.getRecordsResultQueue.offer(recordsRetrieved, this.idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) {
            this.resetLock.readLock().unlock();
            this.resetLock.readLock().lock();
            if (!this.wasReset) continue;
            throw new PositionResetException();
        }
        this.prefetchCounters.added(recordsRetrieved.processRecordsInput);
    }

    private synchronized void drainQueueForRequests() {
        while (this.requestedResponses.get() > 0L && !this.getRecordsResultQueue.isEmpty()) {
            this.subscriber.onNext((Object)this.getNextResult());
        }
    }

    private String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) {
        String result = this.highestSequenceNumber;
        if (processRecordsInput.records() != null && !processRecordsInput.records().isEmpty()) {
            result = processRecordsInput.records().get(processRecordsInput.records().size() - 1).sequenceNumber();
        }
        return result;
    }

    private class PrefetchCounters {
        private long size = 0L;
        private long byteSize = 0L;

        private PrefetchCounters() {
        }

        public synchronized void added(ProcessRecordsInput result) {
            this.size += this.getSize(result);
            this.byteSize += this.getByteSize(result);
        }

        public synchronized void removed(ProcessRecordsInput result) {
            this.size -= this.getSize(result);
            this.byteSize -= this.getByteSize(result);
            this.notifyAll();
        }

        private long getSize(ProcessRecordsInput result) {
            return result.records().size();
        }

        private long getByteSize(ProcessRecordsInput result) {
            return result.records().stream().mapToLong(record -> record.data().limit()).sum();
        }

        public synchronized void waitForConsumer() throws InterruptedException {
            if (!this.shouldGetNewRecords()) {
                log.debug("Queue is full waiting for consumer for {} ms", (Object)PrefetchRecordsPublisher.this.idleMillisBetweenCalls);
                this.wait(PrefetchRecordsPublisher.this.idleMillisBetweenCalls);
            }
        }

        public synchronized boolean shouldGetNewRecords() {
            if (log.isDebugEnabled()) {
                log.debug("Current Prefetch Counter States: {}", (Object)this.toString());
            }
            return this.size < (long)PrefetchRecordsPublisher.this.maxRecordsCount && this.byteSize < (long)PrefetchRecordsPublisher.this.maxByteSize;
        }

        void reset() {
            this.size = 0L;
            this.byteSize = 0L;
        }

        public String toString() {
            return String.format("{ Requests: %d, Records: %d, Bytes: %d }", PrefetchRecordsPublisher.this.getRecordsResultQueue.size(), this.size, this.byteSize);
        }
    }

    private class DefaultGetRecordsCacheDaemon
    implements Runnable {
        volatile boolean isShutdown = false;

        private DefaultGetRecordsCacheDaemon() {
        }

        @Override
        public void run() {
            while (!this.isShutdown) {
                if (Thread.currentThread().isInterrupted()) {
                    log.warn("Prefetch thread was interrupted.");
                    break;
                }
                PrefetchRecordsPublisher.this.resetLock.readLock().lock();
                try {
                    this.makeRetrievalAttempt();
                }
                catch (PositionResetException pre) {
                    log.debug("Position was reset while attempting to add item to queue.");
                }
                finally {
                    PrefetchRecordsPublisher.this.resetLock.readLock().unlock();
                }
            }
            this.callShutdownOnStrategy();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void makeRetrievalAttempt() {
            MetricsScope scope = MetricsUtil.createMetricsWithOperation(PrefetchRecordsPublisher.this.metricsFactory, PrefetchRecordsPublisher.this.operation);
            if (PrefetchRecordsPublisher.this.prefetchCounters.shouldGetNewRecords()) {
                try {
                    this.sleepBeforeNextCall();
                    GetRecordsResponse getRecordsResult = PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.getRecords(PrefetchRecordsPublisher.this.maxRecordsPerCall);
                    PrefetchRecordsPublisher.this.lastSuccessfulCall = Instant.now();
                    List<KinesisClientRecord> records = getRecordsResult.records().stream().map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
                    ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).millisBehindLatest(getRecordsResult.millisBehindLatest()).cacheEntryTime(PrefetchRecordsPublisher.this.lastSuccessfulCall).isAtShardEnd(PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.getDataFetcher().isShardEndReached()).build();
                    PrefetchRecordsPublisher.this.highestSequenceNumber = PrefetchRecordsPublisher.this.calculateHighestSequenceNumber(processRecordsInput);
                    PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput, PrefetchRecordsPublisher.this.highestSequenceNumber, getRecordsResult.nextShardIterator());
                    PrefetchRecordsPublisher.this.highestSequenceNumber = recordsRetrieved.lastBatchSequenceNumber;
                    PrefetchRecordsPublisher.this.addArrivedRecordsInput(recordsRetrieved);
                    PrefetchRecordsPublisher.this.drainQueueForRequests();
                }
                catch (PositionResetException pse) {
                    throw pse;
                }
                catch (RetryableRetrievalException rre) {
                    log.info("Timeout occurred while waiting for response from Kinesis.  Will retry the request.");
                }
                catch (InterruptedException e) {
                    log.info("Thread was interrupted, indicating shutdown was called on the cache.");
                }
                catch (ExpiredIteratorException e) {
                    log.info("ShardId {}: records threw ExpiredIteratorException - restarting after greatest seqNum passed to customer", (Object)PrefetchRecordsPublisher.this.shardId, (Object)e);
                    scope.addData(PrefetchRecordsPublisher.EXPIRED_ITERATOR_METRIC, 1.0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
                    PrefetchRecordsPublisher.this.dataFetcher.restartIterator();
                }
                catch (SdkException e) {
                    log.error("Exception thrown while fetching records from Kinesis", (Throwable)e);
                }
                catch (Throwable e) {
                    log.error("Unexpected exception was thrown. This could probably be an issue or a bug. Please search for the exception/error online to check what is going on. If the issue persists or is a recurring problem, feel free to open an issue on, https://github.com/awslabs/amazon-kinesis-client.", e);
                }
                finally {
                    MetricsUtil.endScope(scope);
                }
            } else {
                try {
                    PrefetchRecordsPublisher.this.prefetchCounters.waitForConsumer();
                }
                catch (InterruptedException ie) {
                    log.info("Thread was interrupted while waiting for the consumer.  Shutdown has probably been started");
                }
            }
        }

        private void callShutdownOnStrategy() {
            if (!PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.isShutdown()) {
                PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.shutdown();
            }
        }

        private void sleepBeforeNextCall() throws InterruptedException {
            if (PrefetchRecordsPublisher.this.lastSuccessfulCall == null) {
                return;
            }
            long timeSinceLastCall = Duration.between(PrefetchRecordsPublisher.this.lastSuccessfulCall, Instant.now()).abs().toMillis();
            if (timeSinceLastCall < PrefetchRecordsPublisher.this.idleMillisBetweenCalls) {
                Thread.sleep(PrefetchRecordsPublisher.this.idleMillisBetweenCalls - timeSinceLastCall);
            }
        }
    }

    private static class PositionResetException
    extends RuntimeException {
        private PositionResetException() {
        }
    }

    static class PrefetchRecordsRetrieved
    implements RecordsRetrieved {
        final ProcessRecordsInput processRecordsInput;
        final String lastBatchSequenceNumber;
        final String shardIterator;

        PrefetchRecordsRetrieved prepareForPublish() {
            return new PrefetchRecordsRetrieved(this.processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(), this.lastBatchSequenceNumber, this.shardIterator);
        }

        public PrefetchRecordsRetrieved(ProcessRecordsInput processRecordsInput, String lastBatchSequenceNumber, String shardIterator) {
            this.processRecordsInput = processRecordsInput;
            this.lastBatchSequenceNumber = lastBatchSequenceNumber;
            this.shardIterator = shardIterator;
        }

        @Override
        public ProcessRecordsInput processRecordsInput() {
            return this.processRecordsInput;
        }

        public String lastBatchSequenceNumber() {
            return this.lastBatchSequenceNumber;
        }

        public String shardIterator() {
            return this.shardIterator;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof PrefetchRecordsRetrieved)) {
                return false;
            }
            PrefetchRecordsRetrieved other = (PrefetchRecordsRetrieved)o;
            if (!other.canEqual(this)) {
                return false;
            }
            ProcessRecordsInput this$processRecordsInput = this.processRecordsInput();
            ProcessRecordsInput other$processRecordsInput = other.processRecordsInput();
            if (this$processRecordsInput == null ? other$processRecordsInput != null : !((Object)this$processRecordsInput).equals(other$processRecordsInput)) {
                return false;
            }
            String this$lastBatchSequenceNumber = this.lastBatchSequenceNumber();
            String other$lastBatchSequenceNumber = other.lastBatchSequenceNumber();
            if (this$lastBatchSequenceNumber == null ? other$lastBatchSequenceNumber != null : !this$lastBatchSequenceNumber.equals(other$lastBatchSequenceNumber)) {
                return false;
            }
            String this$shardIterator = this.shardIterator();
            String other$shardIterator = other.shardIterator();
            return !(this$shardIterator == null ? other$shardIterator != null : !this$shardIterator.equals(other$shardIterator));
        }

        protected boolean canEqual(Object other) {
            return other instanceof PrefetchRecordsRetrieved;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            ProcessRecordsInput $processRecordsInput = this.processRecordsInput();
            result = result * 59 + ($processRecordsInput == null ? 43 : ((Object)$processRecordsInput).hashCode());
            String $lastBatchSequenceNumber = this.lastBatchSequenceNumber();
            result = result * 59 + ($lastBatchSequenceNumber == null ? 43 : $lastBatchSequenceNumber.hashCode());
            String $shardIterator = this.shardIterator();
            result = result * 59 + ($shardIterator == null ? 43 : $shardIterator.hashCode());
            return result;
        }

        public String toString() {
            return "PrefetchRecordsPublisher.PrefetchRecordsRetrieved(processRecordsInput=" + this.processRecordsInput() + ", lastBatchSequenceNumber=" + this.lastBatchSequenceNumber() + ", shardIterator=" + this.shardIterator() + ")";
        }
    }
}

