/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.retrieval.fanout;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.fanout.MultipleSubscriberException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@KinesisClientInternalApi
public class FanOutRecordsPublisher
implements RecordsPublisher {
    private static final Logger log = LoggerFactory.getLogger(FanOutRecordsPublisher.class);
    private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.ACQUIRE_TIMEOUT);
    private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT);
    private final KinesisAsyncClient kinesis;
    private final String shardId;
    private final String consumerArn;
    private final Object lockObject = new Object();
    private final AtomicInteger subscribeToShardId = new AtomicInteger(0);
    private RecordFlow flow;
    private String currentSequenceNumber;
    private InitialPositionInStreamExtended initialPositionInStreamExtended;
    private boolean isFirstConnection = true;
    private Subscriber<? super RecordsRetrieved> subscriber;
    private long availableQueueSpace = 0L;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) {
        Object object = this.lockObject;
        synchronized (object) {
            log.debug("[{}] Initializing Publisher @ Sequence: {} -- Initial Position: {}", new Object[]{this.shardId, extendedSequenceNumber, initialPositionInStreamExtended});
            this.initialPositionInStreamExtended = initialPositionInStreamExtended;
            this.currentSequenceNumber = extendedSequenceNumber.sequenceNumber();
            this.isFirstConnection = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        Object object = this.lockObject;
        synchronized (object) {
            if (this.flow != null) {
                this.flow.cancel();
            }
            this.flow = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void restartFrom(RecordsRetrieved recordsRetrieved) {
        Object object = this.lockObject;
        synchronized (object) {
            if (this.flow != null) {
                this.flow.cancel();
            }
            this.flow = null;
            if (!(recordsRetrieved instanceof FanoutRecordsRetrieved)) {
                throw new IllegalArgumentException("Provided ProcessRecordsInput not created from the FanOutRecordsPublisher");
            }
            this.currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrieved).continuationSequenceNumber();
        }
    }

    private boolean hasValidSubscriber() {
        return this.subscriber != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribeToShard(String sequenceNumber) {
        Object object = this.lockObject;
        synchronized (object) {
            SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder().shardId(this.shardId).consumerARN(this.consumerArn);
            SubscribeToShardRequest request = this.isFirstConnection ? (SubscribeToShardRequest)IteratorBuilder.request(builder, sequenceNumber, this.initialPositionInStreamExtended).build() : (SubscribeToShardRequest)IteratorBuilder.reconnectRequest(builder, sequenceNumber, this.initialPositionInStreamExtended).build();
            Instant connectionStart = Instant.now();
            int subscribeInvocationId = this.subscribeToShardId.incrementAndGet();
            String instanceId = this.shardId + "-" + subscribeInvocationId;
            log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ {} id: {} -- Starting subscribe to shard", new Object[]{this.shardId, connectionStart, instanceId});
            this.flow = new RecordFlow(this, connectionStart, instanceId);
            this.kinesis.subscribeToShard(request, (SubscribeToShardResponseHandler)this.flow);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
        Object object = this.lockObject;
        synchronized (object) {
            if (!this.hasValidSubscriber()) {
                log.warn("{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null", new Object[]{this.shardId, this.flow.connectionStartedAt, this.flow.subscribeToShardId});
                return;
            }
            Throwable propagationThrowable = t;
            ThrowableCategory category = this.throwableCategory(propagationThrowable);
            if (this.isActiveFlow(triggeringFlow)) {
                if (this.flow != null) {
                    String logMessage = String.format("%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s", this.shardId, this.flow.connectionStartedAt, this.flow.subscribeToShardId, category.throwableTypeString);
                    switch (category.throwableType) {
                        case READ_TIMEOUT: {
                            log.debug(logMessage, propagationThrowable);
                            propagationThrowable = new RetryableRetrievalException(category.throwableTypeString, (Exception)propagationThrowable.getCause());
                            break;
                        }
                        case ACQUIRE_TIMEOUT: {
                            this.logAcquireTimeoutMessage(t);
                        }
                        default: {
                            log.warn(logMessage, propagationThrowable);
                        }
                    }
                    this.flow.cancel();
                }
                log.debug("{}: availableQueueSpace zeroing from {}", (Object)this.shardId, (Object)this.availableQueueSpace);
                this.availableQueueSpace = 0L;
                try {
                    this.handleFlowError(propagationThrowable);
                }
                catch (Throwable innerThrowable) {
                    log.warn("{}: Exception while calling subscriber.onError", (Object)this.shardId, (Object)innerThrowable);
                }
                this.subscriber = null;
                this.flow = null;
            } else if (triggeringFlow != null) {
                log.debug("{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {} -> triggeringFlow wasn't the active flow.  Didn't dispatch error", new Object[]{this.shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId, category.throwableTypeString});
                triggeringFlow.cancel();
            }
        }
    }

    protected void logAcquireTimeoutMessage(Throwable t) {
        log.error("An acquire timeout occurred which usually indicates that the KinesisAsyncClient supplied has a low maximum streams limit.  Please use the software.amazon.kinesis.common.KinesisClientUtil to setup the client, or refer to the class to setup the client manually.");
    }

    private void handleFlowError(Throwable t) {
        if (t.getCause() instanceof ResourceNotFoundException) {
            log.debug("{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.", (Object)this.shardId);
            FanoutRecordsRetrieved response = new FanoutRecordsRetrieved(ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build(), null);
            this.subscriber.onNext((Object)response);
            this.subscriber.onComplete();
        } else {
            this.subscriber.onError(t);
        }
    }

    private ThrowableCategory throwableCategory(Throwable t) {
        Throwable current = t;
        StringBuilder builder = new StringBuilder();
        do {
            if (current.getMessage() != null && current.getMessage().startsWith("Acquire operation")) {
                return ACQUIRE_TIMEOUT_CATEGORY;
            }
            if (current.getClass().getName().equals("io.netty.handler.timeout.ReadTimeoutException")) {
                return READ_TIMEOUT_CATEGORY;
            }
            if (current.getCause() == null) {
                builder.append(current.getClass().getName()).append(": ").append(current.getMessage());
                continue;
            }
            builder.append(current.getClass().getSimpleName());
            builder.append("/");
        } while ((current = current.getCause()) != null);
        return new ThrowableCategory(ThrowableType.OTHER, builder.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void recordsReceived(RecordFlow triggeringFlow, SubscribeToShardEvent recordBatchEvent) {
        Object object = this.lockObject;
        synchronized (object) {
            if (!this.hasValidSubscriber()) {
                log.debug("{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Subscriber is null.", new Object[]{this.shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId});
                triggeringFlow.cancel();
                if (this.flow != null) {
                    this.flow.cancel();
                }
                return;
            }
            if (!this.isActiveFlow(triggeringFlow)) {
                log.debug("{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Received records for an inactive flow.", new Object[]{this.shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId});
                return;
            }
            List<KinesisClientRecord> records = recordBatchEvent.records().stream().map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
            ProcessRecordsInput input = ProcessRecordsInput.builder().cacheEntryTime(Instant.now()).millisBehindLatest(recordBatchEvent.millisBehindLatest()).isAtShardEnd(recordBatchEvent.continuationSequenceNumber() == null).records(records).build();
            FanoutRecordsRetrieved recordsRetrieved = new FanoutRecordsRetrieved(input, recordBatchEvent.continuationSequenceNumber());
            try {
                this.subscriber.onNext((Object)recordsRetrieved);
                this.currentSequenceNumber = recordBatchEvent.continuationSequenceNumber();
            }
            catch (Throwable t) {
                log.warn("{}: Unable to call onNext for subscriber.  Failing publisher.", (Object)this.shardId);
                this.errorOccurred(triggeringFlow, t);
            }
            if (this.availableQueueSpace <= 0L) {
                log.debug("{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0", new Object[]{this.shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId});
            } else {
                --this.availableQueueSpace;
                if (this.availableQueueSpace > 0L) {
                    triggeringFlow.request(1L);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onComplete(RecordFlow triggeringFlow) {
        Object object = this.lockObject;
        synchronized (object) {
            log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", new Object[]{this.shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId});
            triggeringFlow.cancel();
            if (!this.hasValidSubscriber()) {
                log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", new Object[]{this.shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId});
                return;
            }
            if (!this.isActiveFlow(triggeringFlow)) {
                log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {} -- Received spurious onComplete from unexpected flow. Ignoring.", new Object[]{this.shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId});
                return;
            }
            if (this.currentSequenceNumber != null) {
                log.debug("{}: Shard hasn't ended resubscribing.", (Object)this.shardId);
                this.subscribeToShard(this.currentSequenceNumber);
            } else {
                log.debug("{}: Shard has ended completing subscriber.", (Object)this.shardId);
                this.subscriber.onComplete();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(final Subscriber<? super RecordsRetrieved> s) {
        Object object = this.lockObject;
        synchronized (object) {
            if (this.subscriber != null) {
                log.error("{}: A subscribe occurred while there was an active subscriber.  Sending error to current subscriber", (Object)this.shardId);
                MultipleSubscriberException multipleSubscriberException = new MultipleSubscriberException();
                this.subscriber.onError((Throwable)multipleSubscriberException);
                this.subscriber = null;
                s.onError((Throwable)multipleSubscriberException);
                this.terminateExistingFlow();
                return;
            }
            this.terminateExistingFlow();
            this.subscriber = s;
            try {
                this.subscribeToShard(this.currentSequenceNumber);
            }
            catch (Throwable t) {
                this.errorOccurred(this.flow, t);
                return;
            }
            if (this.flow == null) {
                this.errorOccurred(this.flow, new IllegalStateException("SubscribeToShard failed"));
                return;
            }
            this.subscriber.onSubscribe(new Subscription(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void request(long n) {
                    Object object = FanOutRecordsPublisher.this.lockObject;
                    synchronized (object) {
                        if (FanOutRecordsPublisher.this.subscriber != s) {
                            log.warn("{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match.", (Object)FanOutRecordsPublisher.this.shardId, (Object)n);
                            return;
                        }
                        if (FanOutRecordsPublisher.this.flow == null) {
                            log.debug("{}: (FanOutRecordsPublisher/Subscription#request) - Request called for a null flow.", (Object)FanOutRecordsPublisher.this.shardId);
                            FanOutRecordsPublisher.this.errorOccurred(FanOutRecordsPublisher.this.flow, new IllegalStateException("Attempted to request on a null flow."));
                            return;
                        }
                        long previous = FanOutRecordsPublisher.this.availableQueueSpace;
                        FanOutRecordsPublisher.this.availableQueueSpace = FanOutRecordsPublisher.this.availableQueueSpace + n;
                        if (previous <= 0L) {
                            FanOutRecordsPublisher.this.flow.request(1L);
                        }
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void cancel() {
                    Object object = FanOutRecordsPublisher.this.lockObject;
                    synchronized (object) {
                        if (FanOutRecordsPublisher.this.subscriber != s) {
                            log.warn("{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match.", (Object)FanOutRecordsPublisher.this.shardId);
                            return;
                        }
                        if (!FanOutRecordsPublisher.this.hasValidSubscriber()) {
                            log.warn("{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber", (Object)FanOutRecordsPublisher.this.shardId);
                        }
                        FanOutRecordsPublisher.this.subscriber = null;
                        if (FanOutRecordsPublisher.this.flow != null) {
                            log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}", new Object[]{FanOutRecordsPublisher.this.shardId, FanOutRecordsPublisher.this.flow.connectionStartedAt, FanOutRecordsPublisher.this.flow.subscribeToShardId});
                            FanOutRecordsPublisher.this.flow.cancel();
                            FanOutRecordsPublisher.this.availableQueueSpace = 0L;
                        }
                    }
                }
            });
        }
    }

    private void terminateExistingFlow() {
        if (this.flow != null) {
            RecordFlow current = this.flow;
            this.flow = null;
            current.cancel();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isActiveFlow(RecordFlow requester) {
        Object object = this.lockObject;
        synchronized (object) {
            return requester == this.flow;
        }
    }

    private void rejectSubscription(SdkPublisher<SubscribeToShardEventStream> publisher) {
        publisher.subscribe((Subscriber)new Subscriber<SubscribeToShardEventStream>(){
            Subscription localSub;

            public void onSubscribe(Subscription s) {
                this.localSub = s;
                this.localSub.cancel();
            }

            public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
                this.localSub.cancel();
            }

            public void onError(Throwable t) {
                this.localSub.cancel();
            }

            public void onComplete() {
                this.localSub.cancel();
            }
        });
    }

    public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) {
        this.kinesis = kinesis;
        this.shardId = shardId;
        this.consumerArn = consumerArn;
    }

    static class RecordSubscription
    implements Subscriber<SubscribeToShardEventStream> {
        private static final Logger log = LoggerFactory.getLogger(RecordSubscription.class);
        private final FanOutRecordsPublisher parent;
        private final RecordFlow flow;
        private final Instant connectionStartedAt;
        private final String subscribeToShardId;
        private Subscription subscription;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void request(long n) {
            Object object = this.parent.lockObject;
            synchronized (object) {
                this.subscription.request(n);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() {
            Object object = this.parent.lockObject;
            synchronized (object) {
                log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#cancel) @ {} id: {} -- Cancel called", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
                this.flow.isCancelled = true;
                if (this.subscription != null) {
                    this.subscription.cancel();
                } else {
                    log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#cancel) @ {} id: {} -- SDK subscription is null", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onSubscribe(Subscription s) {
            Object object = this.parent.lockObject;
            synchronized (object) {
                this.subscription = s;
                if (this.flow.shouldSubscriptionCancel()) {
                    if (this.flow.isCancelled) {
                        log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Subscription was cancelled before onSubscribe", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
                    }
                    if (this.flow.isDisposed) {
                        log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow has been disposed cancelling subscribe", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
                    }
                    log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow requires cancelling", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
                    this.cancel();
                }
                log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Outstanding: {} items so requesting an item", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId, this.parent.availableQueueSpace});
                if (this.parent.availableQueueSpace > 0L) {
                    this.request(1L);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onNext(SubscribeToShardEventStream recordBatchEvent) {
            Object object = this.parent.lockObject;
            synchronized (object) {
                if (this.flow.shouldSubscriptionCancel()) {
                    log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onNext) @ {} id: {} -- RecordFlow requires cancelling", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
                    this.cancel();
                    return;
                }
                recordBatchEvent.accept(new SubscribeToShardResponseHandler.Visitor(){

                    public void visit(SubscribeToShardEvent event) {
                        flow.recordsReceived(event);
                    }
                });
            }
        }

        public void onError(Throwable t) {
            log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId, t.getClass().getName(), t.getMessage()});
        }

        public void onComplete() {
            log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onComplete) @ {} id: {} -- Allowing RecordFlow to call onComplete", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
        }

        public RecordSubscription(FanOutRecordsPublisher parent, RecordFlow flow, Instant connectionStartedAt, String subscribeToShardId) {
            this.parent = parent;
            this.flow = flow;
            this.connectionStartedAt = connectionStartedAt;
            this.subscribeToShardId = subscribeToShardId;
        }
    }

    static class RecordFlow
    implements SubscribeToShardResponseHandler {
        private static final Logger log = LoggerFactory.getLogger(RecordFlow.class);
        private final FanOutRecordsPublisher parent;
        private final Instant connectionStartedAt;
        private final String subscribeToShardId;
        private RecordSubscription subscription;
        private boolean isDisposed = false;
        private boolean isErrorDispatched = false;
        private boolean isCancelled = false;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
            Object object = this.parent.lockObject;
            synchronized (object) {
                log.debug("{}: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ {} id: {} -- Subscribe", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
                if (!this.parent.isActiveFlow(this)) {
                    this.isDisposed = true;
                    log.debug("{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- parent is disposed", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
                    this.parent.rejectSubscription((SdkPublisher<SubscribeToShardEventStream>)publisher);
                    return;
                }
                try {
                    log.debug("{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- creating record subscription", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
                    this.subscription = new RecordSubscription(this.parent, this, this.connectionStartedAt, this.subscribeToShardId);
                    publisher.subscribe((Subscriber)this.subscription);
                    this.parent.isFirstConnection = false;
                }
                catch (Throwable t) {
                    log.debug("{}: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ {} id: {} -- throwable during record subscription: {}", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId, t.getMessage()});
                    this.parent.errorOccurred(this, t);
                }
            }
        }

        public void responseReceived(SubscribeToShardResponse response) {
            log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void exceptionOccurred(Throwable throwable) {
            Object object = this.parent.lockObject;
            synchronized (object) {
                log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId, throwable.getClass().getName(), throwable.getMessage()});
                if (this.isDisposed) {
                    log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- This flow has been disposed, not dispatching error. {}: {}", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId, throwable.getClass().getName(), throwable.getMessage()});
                    this.isErrorDispatched = true;
                }
                this.isDisposed = true;
                if (!this.isErrorDispatched) {
                    this.parent.errorOccurred(this, throwable);
                    this.isErrorDispatched = true;
                } else {
                    log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- An error has previously been dispatched, not dispatching this error {}: {}", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId, throwable.getClass().getName(), throwable.getMessage()});
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void complete() {
            Object object = this.parent.lockObject;
            synchronized (object) {
                log.debug("{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Connection completed", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
                if (this.isCancelled) {
                    log.warn("{}: complete called on a cancelled subscription.  Ignoring completion", (Object)this.parent.shardId);
                    return;
                }
                if (this.isDisposed) {
                    log.warn("{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId});
                    return;
                }
                this.parent.onComplete(this);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() {
            Object object = this.parent.lockObject;
            synchronized (object) {
                this.isDisposed = true;
                this.isCancelled = true;
                if (this.subscription != null) {
                    try {
                        this.subscription.cancel();
                    }
                    catch (Throwable t) {
                        log.error("{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Exception while trying to cancel failed subscription: {}", new Object[]{this.parent.shardId, this.connectionStartedAt, this.subscribeToShardId, t.getMessage(), t});
                    }
                }
            }
        }

        private boolean shouldSubscriptionCancel() {
            return this.isDisposed || this.isCancelled || !this.parent.isActiveFlow(this);
        }

        public void request(long n) {
            if (this.subscription != null && !this.shouldSubscriptionCancel()) {
                this.subscription.request(n);
            }
        }

        private void recordsReceived(SubscribeToShardEvent event) {
            this.parent.recordsReceived(this, event);
        }

        public RecordFlow(FanOutRecordsPublisher parent, Instant connectionStartedAt, String subscribeToShardId) {
            this.parent = parent;
            this.connectionStartedAt = connectionStartedAt;
            this.subscribeToShardId = subscribeToShardId;
        }
    }

    static class FanoutRecordsRetrieved
    implements RecordsRetrieved {
        private final ProcessRecordsInput processRecordsInput;
        private final String continuationSequenceNumber;

        @Override
        public ProcessRecordsInput processRecordsInput() {
            return this.processRecordsInput;
        }

        public FanoutRecordsRetrieved(ProcessRecordsInput processRecordsInput, String continuationSequenceNumber) {
            this.processRecordsInput = processRecordsInput;
            this.continuationSequenceNumber = continuationSequenceNumber;
        }

        public String continuationSequenceNumber() {
            return this.continuationSequenceNumber;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof FanoutRecordsRetrieved)) {
                return false;
            }
            FanoutRecordsRetrieved other = (FanoutRecordsRetrieved)o;
            if (!other.canEqual(this)) {
                return false;
            }
            ProcessRecordsInput this$processRecordsInput = this.processRecordsInput();
            ProcessRecordsInput other$processRecordsInput = other.processRecordsInput();
            if (this$processRecordsInput == null ? other$processRecordsInput != null : !((Object)this$processRecordsInput).equals(other$processRecordsInput)) {
                return false;
            }
            String this$continuationSequenceNumber = this.continuationSequenceNumber();
            String other$continuationSequenceNumber = other.continuationSequenceNumber();
            return !(this$continuationSequenceNumber == null ? other$continuationSequenceNumber != null : !this$continuationSequenceNumber.equals(other$continuationSequenceNumber));
        }

        protected boolean canEqual(Object other) {
            return other instanceof FanoutRecordsRetrieved;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            ProcessRecordsInput $processRecordsInput = this.processRecordsInput();
            result = result * 59 + ($processRecordsInput == null ? 43 : ((Object)$processRecordsInput).hashCode());
            String $continuationSequenceNumber = this.continuationSequenceNumber();
            result = result * 59 + ($continuationSequenceNumber == null ? 43 : $continuationSequenceNumber.hashCode());
            return result;
        }

        public String toString() {
            return "FanOutRecordsPublisher.FanoutRecordsRetrieved(processRecordsInput=" + this.processRecordsInput() + ", continuationSequenceNumber=" + this.continuationSequenceNumber() + ")";
        }
    }

    private static class ThrowableCategory {
        @NonNull
        final ThrowableType throwableType;
        @NonNull
        final String throwableTypeString;

        ThrowableCategory(ThrowableType throwableType) {
            this(throwableType, throwableType.value);
        }

        ThrowableCategory(ThrowableType throwableType, String throwableTypeString) {
            this.throwableType = throwableType;
            this.throwableTypeString = throwableTypeString;
        }
    }

    private static enum ThrowableType {
        ACQUIRE_TIMEOUT("AcquireTimeout"),
        READ_TIMEOUT("ReadTimeout"),
        OTHER("Other");

        String value;

        private ThrowableType(String value) {
            this.value = value;
        }
    }
}

