/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.retrieval.polling;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.DiagnosticUtils;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory;
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.retrieval.polling.DataFetcher;

@KinesisClientInternalApi
public class PrefetchRecordsPublisher
implements RecordsPublisher {
    private static final Logger log = LoggerFactory.getLogger(PrefetchRecordsPublisher.class);
    private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
    private int maxPendingProcessRecordsInput;
    private int maxByteSize;
    private int maxRecordsCount;
    private final int maxRecordsPerCall;
    private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
    private final ExecutorService executorService;
    private final MetricsFactory metricsFactory;
    private final long idleMillisBetweenCalls;
    private Instant lastSuccessfulCall;
    private boolean isFirstGetCallTry = true;
    private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
    private boolean started = false;
    private final String operation;
    private final StreamIdentifier streamId;
    private final String streamAndShardId;
    private Subscriber<? super RecordsRetrieved> subscriber;
    @VisibleForTesting
    private final PublisherSession publisherSession;
    private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock();
    private boolean wasReset = false;
    private Instant lastEventDeliveryTime = Instant.EPOCH;
    private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();

    public PrefetchRecordsPublisher(int maxPendingProcessRecordsInput, int maxByteSize, int maxRecordsCount, int maxRecordsPerCall, @NonNull GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull ExecutorService executorService, long idleMillisBetweenCalls, @NonNull MetricsFactory metricsFactory, @NonNull String operation, @NonNull String shardId) {
        if (getRecordsRetrievalStrategy == null) {
            throw new NullPointerException("getRecordsRetrievalStrategy");
        }
        if (executorService == null) {
            throw new NullPointerException("executorService");
        }
        if (metricsFactory == null) {
            throw new NullPointerException("metricsFactory");
        }
        if (operation == null) {
            throw new NullPointerException("operation");
        }
        if (shardId == null) {
            throw new NullPointerException("shardId");
        }
        this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
        this.maxRecordsPerCall = maxRecordsPerCall;
        this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
        this.maxByteSize = maxByteSize;
        this.maxRecordsCount = maxRecordsCount;
        this.publisherSession = new PublisherSession(new LinkedBlockingQueue<PrefetchRecordsRetrieved>(this.maxPendingProcessRecordsInput), new PrefetchCounters(), this.getRecordsRetrievalStrategy.dataFetcher());
        this.executorService = executorService;
        this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
        this.idleMillisBetweenCalls = idleMillisBetweenCalls;
        this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
        Validate.notEmpty((CharSequence)operation, (String)"Operation cannot be empty", (Object[])new Object[0]);
        this.operation = operation;
        this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier();
        this.streamAndShardId = this.streamId.serialize() + ":" + shardId;
    }

    @Override
    public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("ExecutorService has been shutdown.");
        }
        if (!this.started) {
            log.info("{} : Starting Prefetching thread and initializing publisher session.", (Object)this.streamAndShardId);
            this.publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended);
            this.executorService.execute(this.defaultGetRecordsCacheDaemon);
        } else {
            log.info("{} : Skipping publisher start as it was already started.", (Object)this.streamAndShardId);
        }
        this.started = true;
    }

    private void throwOnIllegalState() {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
        }
        if (!this.started) {
            throw new IllegalStateException("Cache has not been initialized, make sure to call start.");
        }
    }

    private PrefetchRecordsRetrieved peekNextResult() {
        this.throwOnIllegalState();
        return this.publisherSession.peekNextRecord();
    }

    @Override
    public void shutdown() {
        this.defaultGetRecordsCacheDaemon.isShutdown = true;
        this.executorService.shutdownNow();
        this.started = false;
    }

    @Override
    public RequestDetails getLastSuccessfulRequestDetails() {
        return this.lastSuccessfulRequestDetails;
    }

    @Override
    public void restartFrom(RecordsRetrieved recordsRetrieved) {
        if (!(recordsRetrieved instanceof PrefetchRecordsRetrieved)) {
            throw new IllegalArgumentException("Provided RecordsRetrieved was not produced by the PrefetchRecordsPublisher");
        }
        this.resetLock.writeLock().lock();
        try {
            this.publisherSession.reset((PrefetchRecordsRetrieved)recordsRetrieved);
            this.wasReset = true;
        }
        finally {
            this.resetLock.writeLock().unlock();
        }
    }

    public void subscribe(Subscriber<? super RecordsRetrieved> s) {
        this.subscriber = s;
        this.subscriber.onSubscribe(new Subscription(){

            public void request(long n) {
                PrefetchRecordsPublisher.this.publisherSession.requestedResponses().addAndGet(n);
                PrefetchRecordsPublisher.this.drainQueueForRequests();
            }

            public void cancel() {
                PrefetchRecordsPublisher.this.publisherSession.requestedResponses().set(0L);
            }
        });
    }

    @Override
    public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
        this.publisherSession.handleRecordsDeliveryAck(recordsDeliveryAck, this.streamAndShardId, () -> this.drainQueueForRequests());
        DiagnosticUtils.takeDelayedDeliveryActionIfRequired(this.streamAndShardId, this.lastEventDeliveryTime, log);
    }

    private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException {
        this.wasReset = false;
        while (!this.publisherSession.offerRecords(recordsRetrieved, this.idleMillisBetweenCalls)) {
            this.resetLock.readLock().unlock();
            this.resetLock.readLock().lock();
            if (!this.wasReset) continue;
            throw new PositionResetException();
        }
        this.publisherSession.prefetchCounters().added(recordsRetrieved.processRecordsInput);
    }

    @VisibleForTesting
    synchronized void drainQueueForRequests() {
        PrefetchRecordsRetrieved recordsToDeliver = this.peekNextResult();
        if (this.publisherSession.hasDemandToPublish() && PrefetchRecordsPublisher.canDispatchRecord(recordsToDeliver)) {
            this.subscriber.onNext((Object)recordsToDeliver.prepareForPublish());
            recordsToDeliver.dispatched();
            this.lastEventDeliveryTime = Instant.now();
        }
    }

    private static boolean canDispatchRecord(PrefetchRecordsRetrieved recordsToDeliver) {
        return recordsToDeliver != null && !recordsToDeliver.isDispatched();
    }

    private String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) {
        String result = this.publisherSession.highestSequenceNumber();
        if (processRecordsInput.records() != null && !processRecordsInput.records().isEmpty()) {
            result = processRecordsInput.records().get(processRecordsInput.records().size() - 1).sequenceNumber();
        }
        return result;
    }

    public PublisherSession getPublisherSession() {
        return this.publisherSession;
    }

    private class PrefetchCounters {
        private long size = 0L;
        private long byteSize = 0L;

        private PrefetchCounters() {
        }

        public synchronized void added(ProcessRecordsInput result) {
            this.size += this.getSize(result);
            this.byteSize += this.getByteSize(result);
        }

        public synchronized void removed(ProcessRecordsInput result) {
            this.size -= this.getSize(result);
            this.byteSize -= this.getByteSize(result);
            this.notifyAll();
        }

        private long getSize(ProcessRecordsInput result) {
            return result.records().size();
        }

        private long getByteSize(ProcessRecordsInput result) {
            return result.records().stream().mapToLong(record -> record.data().limit()).sum();
        }

        public synchronized void waitForConsumer() throws InterruptedException {
            if (!this.shouldGetNewRecords()) {
                log.debug("{} : Queue is full waiting for consumer for {} ms", (Object)PrefetchRecordsPublisher.this.streamAndShardId, (Object)PrefetchRecordsPublisher.this.idleMillisBetweenCalls);
                this.wait(PrefetchRecordsPublisher.this.idleMillisBetweenCalls);
            }
        }

        public synchronized boolean shouldGetNewRecords() {
            if (log.isDebugEnabled()) {
                log.debug("{} : Current Prefetch Counter States: {}", (Object)PrefetchRecordsPublisher.this.streamAndShardId, (Object)this.toString());
            }
            return this.size < (long)PrefetchRecordsPublisher.this.maxRecordsCount && this.byteSize < (long)PrefetchRecordsPublisher.this.maxByteSize;
        }

        void reset() {
            this.size = 0L;
            this.byteSize = 0L;
        }

        public String toString() {
            return String.format("{ Requests: %d, Records: %d, Bytes: %d }", PrefetchRecordsPublisher.this.publisherSession.prefetchRecordsQueue().size(), this.size, this.byteSize);
        }
    }

    private class DefaultGetRecordsCacheDaemon
    implements Runnable {
        volatile boolean isShutdown = false;

        private DefaultGetRecordsCacheDaemon() {
        }

        @Override
        public void run() {
            while (!this.isShutdown) {
                if (Thread.currentThread().isInterrupted()) {
                    log.warn("{} : Prefetch thread was interrupted.", (Object)PrefetchRecordsPublisher.this.streamAndShardId);
                    break;
                }
                PrefetchRecordsPublisher.this.resetLock.readLock().lock();
                try {
                    this.makeRetrievalAttempt();
                }
                catch (PositionResetException pre) {
                    log.debug("{} : Position was reset while attempting to add item to queue.", (Object)PrefetchRecordsPublisher.this.streamAndShardId);
                }
                finally {
                    PrefetchRecordsPublisher.this.resetLock.readLock().unlock();
                }
            }
            this.callShutdownOnStrategy();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void makeRetrievalAttempt() {
            MetricsScope scope = MetricsUtil.createMetricsWithOperation(PrefetchRecordsPublisher.this.metricsFactory, PrefetchRecordsPublisher.this.operation);
            if (PrefetchRecordsPublisher.this.publisherSession.prefetchCounters().shouldGetNewRecords()) {
                try {
                    this.sleepBeforeNextCall();
                    GetRecordsResponse getRecordsResult = PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.getRecords(PrefetchRecordsPublisher.this.maxRecordsPerCall);
                    PrefetchRecordsPublisher.this.lastSuccessfulCall = Instant.now();
                    List<KinesisClientRecord> records = getRecordsResult.records().stream().map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
                    ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).millisBehindLatest(getRecordsResult.millisBehindLatest()).cacheEntryTime(PrefetchRecordsPublisher.this.lastSuccessfulCall).isAtShardEnd(PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.dataFetcher().isShardEndReached()).childShards(getRecordsResult.childShards()).build();
                    PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput, PrefetchRecordsPublisher.this.calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(), PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
                    PrefetchRecordsPublisher.this.publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber);
                    PrefetchRecordsPublisher.this.addArrivedRecordsInput(recordsRetrieved);
                    PrefetchRecordsPublisher.this.drainQueueForRequests();
                }
                catch (PositionResetException pse) {
                    throw pse;
                }
                catch (RetryableRetrievalException rre) {
                    log.info("{} :  Timeout occurred while waiting for response from Kinesis.  Will retry the request.", (Object)PrefetchRecordsPublisher.this.streamAndShardId);
                }
                catch (InterruptedException e) {
                    log.info("{} :  Thread was interrupted, indicating shutdown was called on the cache.", (Object)PrefetchRecordsPublisher.this.streamAndShardId);
                }
                catch (ExpiredIteratorException e) {
                    log.info("{} :  records threw ExpiredIteratorException - restarting after greatest seqNum passed to customer", (Object)PrefetchRecordsPublisher.this.streamAndShardId, (Object)e);
                    MetricsUtil.addStreamId(scope, PrefetchRecordsPublisher.this.streamId);
                    scope.addData(PrefetchRecordsPublisher.EXPIRED_ITERATOR_METRIC, 1.0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
                    PrefetchRecordsPublisher.this.publisherSession.dataFetcher().restartIterator();
                }
                catch (ProvisionedThroughputExceededException e) {
                    PrefetchRecordsPublisher.this.lastSuccessfulCall = Instant.now();
                    log.error("{} :  Exception thrown while fetching records from Kinesis", (Object)PrefetchRecordsPublisher.this.streamAndShardId, (Object)e);
                }
                catch (SdkException e) {
                    log.error("{} :  Exception thrown while fetching records from Kinesis", (Object)PrefetchRecordsPublisher.this.streamAndShardId, (Object)e);
                }
                catch (Throwable e) {
                    log.error("{} :  Unexpected exception was thrown. This could probably be an issue or a bug. Please search for the exception/error online to check what is going on. If the issue persists or is a recurring problem, feel free to open an issue on, https://github.com/awslabs/amazon-kinesis-client.", (Object)PrefetchRecordsPublisher.this.streamAndShardId, (Object)e);
                }
                finally {
                    MetricsUtil.endScope(scope);
                }
            } else {
                try {
                    PrefetchRecordsPublisher.this.publisherSession.prefetchCounters().waitForConsumer();
                }
                catch (InterruptedException ie) {
                    log.info("{} :  Thread was interrupted while waiting for the consumer.  Shutdown has probably been started", (Object)PrefetchRecordsPublisher.this.streamAndShardId);
                }
            }
        }

        private void callShutdownOnStrategy() {
            if (!PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.isShutdown()) {
                PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.shutdown();
            }
        }

        private void sleepBeforeNextCall() throws InterruptedException {
            if (PrefetchRecordsPublisher.this.lastSuccessfulCall == null && PrefetchRecordsPublisher.this.isFirstGetCallTry) {
                PrefetchRecordsPublisher.this.isFirstGetCallTry = false;
                return;
            }
            if (PrefetchRecordsPublisher.this.lastSuccessfulCall == null) {
                Thread.sleep(PrefetchRecordsPublisher.this.idleMillisBetweenCalls);
                return;
            }
            long timeSinceLastCall = Duration.between(PrefetchRecordsPublisher.this.lastSuccessfulCall, Instant.now()).abs().toMillis();
            if (timeSinceLastCall < PrefetchRecordsPublisher.this.idleMillisBetweenCalls) {
                Thread.sleep(PrefetchRecordsPublisher.this.idleMillisBetweenCalls - timeSinceLastCall);
            }
        }
    }

    private static class PositionResetException
    extends RuntimeException {
        private PositionResetException() {
        }
    }

    static class PrefetchRecordsRetrieved
    implements RecordsRetrieved {
        final ProcessRecordsInput processRecordsInput;
        final String lastBatchSequenceNumber;
        final String shardIterator;
        final BatchUniqueIdentifier batchUniqueIdentifier;
        boolean dispatched = false;

        PrefetchRecordsRetrieved prepareForPublish() {
            return new PrefetchRecordsRetrieved(this.processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(), this.lastBatchSequenceNumber, this.shardIterator, this.batchUniqueIdentifier);
        }

        @Override
        public BatchUniqueIdentifier batchUniqueIdentifier() {
            return this.batchUniqueIdentifier;
        }

        void dispatched() {
            this.dispatched = true;
        }

        public static BatchUniqueIdentifier generateBatchUniqueIdentifier() {
            return new BatchUniqueIdentifier(UUID.randomUUID().toString(), "");
        }

        public PrefetchRecordsRetrieved(ProcessRecordsInput processRecordsInput, String lastBatchSequenceNumber, String shardIterator, BatchUniqueIdentifier batchUniqueIdentifier) {
            this.processRecordsInput = processRecordsInput;
            this.lastBatchSequenceNumber = lastBatchSequenceNumber;
            this.shardIterator = shardIterator;
            this.batchUniqueIdentifier = batchUniqueIdentifier;
        }

        @Override
        public ProcessRecordsInput processRecordsInput() {
            return this.processRecordsInput;
        }

        public String lastBatchSequenceNumber() {
            return this.lastBatchSequenceNumber;
        }

        public String shardIterator() {
            return this.shardIterator;
        }

        public boolean isDispatched() {
            return this.dispatched;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof PrefetchRecordsRetrieved)) {
                return false;
            }
            PrefetchRecordsRetrieved other = (PrefetchRecordsRetrieved)o;
            if (!other.canEqual(this)) {
                return false;
            }
            ProcessRecordsInput this$processRecordsInput = this.processRecordsInput();
            ProcessRecordsInput other$processRecordsInput = other.processRecordsInput();
            if (this$processRecordsInput == null ? other$processRecordsInput != null : !((Object)this$processRecordsInput).equals(other$processRecordsInput)) {
                return false;
            }
            String this$lastBatchSequenceNumber = this.lastBatchSequenceNumber();
            String other$lastBatchSequenceNumber = other.lastBatchSequenceNumber();
            if (this$lastBatchSequenceNumber == null ? other$lastBatchSequenceNumber != null : !this$lastBatchSequenceNumber.equals(other$lastBatchSequenceNumber)) {
                return false;
            }
            String this$shardIterator = this.shardIterator();
            String other$shardIterator = other.shardIterator();
            if (this$shardIterator == null ? other$shardIterator != null : !this$shardIterator.equals(other$shardIterator)) {
                return false;
            }
            BatchUniqueIdentifier this$batchUniqueIdentifier = this.batchUniqueIdentifier();
            BatchUniqueIdentifier other$batchUniqueIdentifier = other.batchUniqueIdentifier();
            if (this$batchUniqueIdentifier == null ? other$batchUniqueIdentifier != null : !((Object)this$batchUniqueIdentifier).equals(other$batchUniqueIdentifier)) {
                return false;
            }
            return this.isDispatched() == other.isDispatched();
        }

        protected boolean canEqual(Object other) {
            return other instanceof PrefetchRecordsRetrieved;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            ProcessRecordsInput $processRecordsInput = this.processRecordsInput();
            result = result * 59 + ($processRecordsInput == null ? 43 : ((Object)$processRecordsInput).hashCode());
            String $lastBatchSequenceNumber = this.lastBatchSequenceNumber();
            result = result * 59 + ($lastBatchSequenceNumber == null ? 43 : $lastBatchSequenceNumber.hashCode());
            String $shardIterator = this.shardIterator();
            result = result * 59 + ($shardIterator == null ? 43 : $shardIterator.hashCode());
            BatchUniqueIdentifier $batchUniqueIdentifier = this.batchUniqueIdentifier();
            result = result * 59 + ($batchUniqueIdentifier == null ? 43 : ((Object)$batchUniqueIdentifier).hashCode());
            result = result * 59 + (this.isDispatched() ? 79 : 97);
            return result;
        }

        public String toString() {
            return "PrefetchRecordsPublisher.PrefetchRecordsRetrieved(processRecordsInput=" + this.processRecordsInput() + ", lastBatchSequenceNumber=" + this.lastBatchSequenceNumber() + ", shardIterator=" + this.shardIterator() + ", batchUniqueIdentifier=" + this.batchUniqueIdentifier() + ", dispatched=" + this.isDispatched() + ")";
        }
    }

    static final class PublisherSession {
        private final AtomicLong requestedResponses = new AtomicLong(0L);
        @VisibleForTesting
        private final LinkedBlockingQueue<PrefetchRecordsRetrieved> prefetchRecordsQueue;
        private final PrefetchCounters prefetchCounters;
        private final DataFetcher dataFetcher;
        private InitialPositionInStreamExtended initialPositionInStreamExtended;
        private String highestSequenceNumber;

        void init(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) {
            this.initialPositionInStreamExtended = initialPositionInStreamExtended;
            this.highestSequenceNumber = extendedSequenceNumber.sequenceNumber();
            this.dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);
        }

        void reset(PrefetchRecordsRetrieved prefetchRecordsRetrieved) {
            this.requestedResponses.set(0L);
            this.prefetchRecordsQueue.clear();
            this.prefetchCounters.reset();
            this.highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber();
            this.dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), this.highestSequenceNumber, this.initialPositionInStreamExtended);
        }

        void handleRecordsDeliveryAck(RecordsDeliveryAck recordsDeliveryAck, String streamAndShardId, Runnable nextEventDispatchAction) {
            PrefetchRecordsRetrieved recordsToCheck = this.peekNextRecord();
            if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier().equals(recordsDeliveryAck.batchUniqueIdentifier())) {
                this.evictPublishedRecordAndUpdateDemand(streamAndShardId);
                nextEventDispatchAction.run();
            } else {
                BatchUniqueIdentifier peekedBatchUniqueIdentifier = recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier();
                log.info("{} :  Received a stale notification with id {} instead of expected id {} at {}. Will ignore.", new Object[]{streamAndShardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now()});
            }
        }

        @VisibleForTesting
        RecordsRetrieved evictPublishedRecordAndUpdateDemand(String streamAndShardId) {
            PrefetchRecordsRetrieved result = this.prefetchRecordsQueue.poll();
            if (result != null) {
                this.updateDemandTrackersOnPublish(result);
            } else {
                log.info("{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer was reset.", (Object)streamAndShardId);
            }
            return result;
        }

        boolean hasDemandToPublish() {
            return this.requestedResponses.get() > 0L;
        }

        PrefetchRecordsRetrieved peekNextRecord() {
            return this.prefetchRecordsQueue.peek();
        }

        boolean offerRecords(PrefetchRecordsRetrieved recordsRetrieved, long idleMillisBetweenCalls) throws InterruptedException {
            return this.prefetchRecordsQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS);
        }

        private void updateDemandTrackersOnPublish(PrefetchRecordsRetrieved result) {
            this.prefetchCounters.removed(result.processRecordsInput);
            this.requestedResponses.decrementAndGet();
        }

        public PublisherSession(LinkedBlockingQueue<PrefetchRecordsRetrieved> prefetchRecordsQueue, PrefetchCounters prefetchCounters, DataFetcher dataFetcher) {
            this.prefetchRecordsQueue = prefetchRecordsQueue;
            this.prefetchCounters = prefetchCounters;
            this.dataFetcher = dataFetcher;
        }

        public AtomicLong requestedResponses() {
            return this.requestedResponses;
        }

        public PrefetchCounters prefetchCounters() {
            return this.prefetchCounters;
        }

        public DataFetcher dataFetcher() {
            return this.dataFetcher;
        }

        public InitialPositionInStreamExtended initialPositionInStreamExtended() {
            return this.initialPositionInStreamExtended;
        }

        public String highestSequenceNumber() {
            return this.highestSequenceNumber;
        }

        public PublisherSession initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) {
            this.initialPositionInStreamExtended = initialPositionInStreamExtended;
            return this;
        }

        public PublisherSession highestSequenceNumber(String highestSequenceNumber) {
            this.highestSequenceNumber = highestSequenceNumber;
            return this;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof PublisherSession)) {
                return false;
            }
            PublisherSession other = (PublisherSession)o;
            AtomicLong this$requestedResponses = this.requestedResponses();
            AtomicLong other$requestedResponses = other.requestedResponses();
            if (this$requestedResponses == null ? other$requestedResponses != null : !this$requestedResponses.equals(other$requestedResponses)) {
                return false;
            }
            LinkedBlockingQueue<PrefetchRecordsRetrieved> this$prefetchRecordsQueue = this.prefetchRecordsQueue();
            LinkedBlockingQueue<PrefetchRecordsRetrieved> other$prefetchRecordsQueue = other.prefetchRecordsQueue();
            if (this$prefetchRecordsQueue == null ? other$prefetchRecordsQueue != null : !this$prefetchRecordsQueue.equals(other$prefetchRecordsQueue)) {
                return false;
            }
            PrefetchCounters this$prefetchCounters = this.prefetchCounters();
            PrefetchCounters other$prefetchCounters = other.prefetchCounters();
            if (this$prefetchCounters == null ? other$prefetchCounters != null : !this$prefetchCounters.equals(other$prefetchCounters)) {
                return false;
            }
            DataFetcher this$dataFetcher = this.dataFetcher();
            DataFetcher other$dataFetcher = other.dataFetcher();
            if (this$dataFetcher == null ? other$dataFetcher != null : !this$dataFetcher.equals(other$dataFetcher)) {
                return false;
            }
            InitialPositionInStreamExtended this$initialPositionInStreamExtended = this.initialPositionInStreamExtended();
            InitialPositionInStreamExtended other$initialPositionInStreamExtended = other.initialPositionInStreamExtended();
            if (this$initialPositionInStreamExtended == null ? other$initialPositionInStreamExtended != null : !((Object)this$initialPositionInStreamExtended).equals(other$initialPositionInStreamExtended)) {
                return false;
            }
            String this$highestSequenceNumber = this.highestSequenceNumber();
            String other$highestSequenceNumber = other.highestSequenceNumber();
            return !(this$highestSequenceNumber == null ? other$highestSequenceNumber != null : !this$highestSequenceNumber.equals(other$highestSequenceNumber));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            AtomicLong $requestedResponses = this.requestedResponses();
            result = result * 59 + ($requestedResponses == null ? 43 : $requestedResponses.hashCode());
            LinkedBlockingQueue<PrefetchRecordsRetrieved> $prefetchRecordsQueue = this.prefetchRecordsQueue();
            result = result * 59 + ($prefetchRecordsQueue == null ? 43 : $prefetchRecordsQueue.hashCode());
            PrefetchCounters $prefetchCounters = this.prefetchCounters();
            result = result * 59 + ($prefetchCounters == null ? 43 : $prefetchCounters.hashCode());
            DataFetcher $dataFetcher = this.dataFetcher();
            result = result * 59 + ($dataFetcher == null ? 43 : $dataFetcher.hashCode());
            InitialPositionInStreamExtended $initialPositionInStreamExtended = this.initialPositionInStreamExtended();
            result = result * 59 + ($initialPositionInStreamExtended == null ? 43 : ((Object)$initialPositionInStreamExtended).hashCode());
            String $highestSequenceNumber = this.highestSequenceNumber();
            result = result * 59 + ($highestSequenceNumber == null ? 43 : $highestSequenceNumber.hashCode());
            return result;
        }

        public String toString() {
            return "PrefetchRecordsPublisher.PublisherSession(requestedResponses=" + this.requestedResponses() + ", prefetchRecordsQueue=" + this.prefetchRecordsQueue() + ", prefetchCounters=" + this.prefetchCounters() + ", dataFetcher=" + this.dataFetcher() + ", initialPositionInStreamExtended=" + this.initialPositionInStreamExtended() + ", highestSequenceNumber=" + this.highestSequenceNumber() + ")";
        }

        public LinkedBlockingQueue<PrefetchRecordsRetrieved> prefetchRecordsQueue() {
            return this.prefetchRecordsQueue;
        }
    }
}

