/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pekko.remote.artery.aeron;

import io.aeron.Aeron;
import io.aeron.FragmentAssembler;
import io.aeron.Subscription;
import io.aeron.exceptions.DriverTimeoutException;
import io.aeron.logbuffer.FragmentHandler;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.agrona.hints.ThreadHints;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.remote.artery.EnvelopeBuffer;
import org.apache.pekko.remote.artery.EnvelopeBufferPool;
import org.apache.pekko.remote.artery.RemotingFlightRecorder;
import org.apache.pekko.remote.artery.aeron.AeronSource$;
import org.apache.pekko.remote.artery.aeron.TaskRunner;
import org.apache.pekko.remote.artery.aeron.TaskRunner$Add$;
import org.apache.pekko.remote.artery.aeron.TaskRunner$Remove$;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.Outlet$;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.SourceShape$;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.GraphStageWithMaterializedValue;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.function.JProcedure1;
import scala.runtime.java8.JFunction1;
import scala.util.control.NonFatal$;

public class AeronSource
extends GraphStageWithMaterializedValue<SourceShape<EnvelopeBuffer>, AeronLifecycle> {
    public final String org$apache$pekko$remote$artery$aeron$AeronSource$$channel;
    public final int org$apache$pekko$remote$artery$aeron$AeronSource$$streamId;
    public final Aeron org$apache$pekko$remote$artery$aeron$AeronSource$$aeron;
    public final TaskRunner org$apache$pekko$remote$artery$aeron$AeronSource$$taskRunner;
    public final EnvelopeBufferPool org$apache$pekko$remote$artery$aeron$AeronSource$$pool;
    public final RemotingFlightRecorder org$apache$pekko$remote$artery$aeron$AeronSource$$flightRecorder;
    public final int org$apache$pekko$remote$artery$aeron$AeronSource$$spinning;
    private final Outlet out;
    private final SourceShape shape;

    public AeronSource(String channel, int streamId, Aeron aeron, TaskRunner taskRunner, EnvelopeBufferPool pool, RemotingFlightRecorder flightRecorder, int spinning) {
        this.org$apache$pekko$remote$artery$aeron$AeronSource$$channel = channel;
        this.org$apache$pekko$remote$artery$aeron$AeronSource$$streamId = streamId;
        this.org$apache$pekko$remote$artery$aeron$AeronSource$$aeron = aeron;
        this.org$apache$pekko$remote$artery$aeron$AeronSource$$taskRunner = taskRunner;
        this.org$apache$pekko$remote$artery$aeron$AeronSource$$pool = pool;
        this.org$apache$pekko$remote$artery$aeron$AeronSource$$flightRecorder = flightRecorder;
        this.org$apache$pekko$remote$artery$aeron$AeronSource$$spinning = spinning;
        this.out = Outlet$.MODULE$.apply("AeronSource");
        this.shape = SourceShape$.MODULE$.apply(this.out());
    }

    public Outlet<EnvelopeBuffer> out() {
        return this.out;
    }

    public SourceShape<EnvelopeBuffer> shape() {
        return this.shape;
    }

    public Tuple2<GraphStageLogic, AeronLifecycle> createLogicAndMaterializedValue(Attributes inheritedAttributes) {
        OutHandler logic = new OutHandler(this){
            private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
            private final Subscription subscription;
            private int backoffCount;
            private long delegateTaskStartTime;
            private long countBeforeDelegate;
            private final MessageHandler messageHandler;
            private final TaskRunner.Add addPollTask;
            private boolean delegatingToTaskRunner;
            private List pendingUnavailableImages;
            private final AsyncCallback onUnavailableImageCb;
            private final AsyncCallback getStatusCb;
            private final /* synthetic */ AeronSource $outer;
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                super($outer.shape());
                StageLogging.$init$((StageLogging)this);
                this.subscription = $outer.org$apache$pekko$remote$artery$aeron$AeronSource$$aeron.addSubscription($outer.org$apache$pekko$remote$artery$aeron$AeronSource$$channel, $outer.org$apache$pekko$remote$artery$aeron$AeronSource$$streamId);
                this.backoffCount = $outer.org$apache$pekko$remote$artery$aeron$AeronSource$$spinning;
                this.delegateTaskStartTime = 0L;
                this.countBeforeDelegate = 0L;
                this.messageHandler = new MessageHandler($outer.org$apache$pekko$remote$artery$aeron$AeronSource$$pool);
                this.addPollTask = TaskRunner$Add$.MODULE$.apply(AeronSource$.MODULE$.org$apache$pekko$remote$artery$aeron$AeronSource$$$pollTask(this.subscription, this.messageHandler, (AsyncCallback<EnvelopeBuffer>)this.getAsyncCallback((Function1)(JProcedure1 & Serializable)data -> this.taskOnMessage((EnvelopeBuffer)data))));
                this.delegatingToTaskRunner = false;
                this.pendingUnavailableImages = package$.MODULE$.Nil();
                this.onUnavailableImageCb = this.getAsyncCallback((Function1)(JFunction1.mcVI.sp & Serializable)sessionId -> {
                    this.pendingUnavailableImages = this.pendingUnavailableImages.$colon$colon((Object)BoxesRunTime.boxToInteger((int)sessionId));
                    this.freeSessionBuffers();
                });
                this.getStatusCb = this.getAsyncCallback((Function1)(JProcedure1 & Serializable)promise -> promise.success((Object)BoxesRunTime.boxToLong((long)this.subscription.channelStatus())));
                this.setHandler($outer.out(), this);
            }

            public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
                return this.org$apache$pekko$stream$stage$StageLogging$$_log;
            }

            public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter x$1) {
                this.org$apache$pekko$stream$stage$StageLogging$$_log = x$1;
            }

            public Class logSource() {
                return AeronSource.class;
            }

            public void preStart() {
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceStarted(this.$outer.org$apache$pekko$remote$artery$aeron$AeronSource$$channel, this.$outer.org$apache$pekko$remote$artery$aeron$AeronSource$$streamId);
            }

            public void postStop() {
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSource$$taskRunner.command(TaskRunner$Remove$.MODULE$.apply(this.addPollTask.task()));
                try {
                    try {
                        this.subscription.close();
                    }
                    catch (DriverTimeoutException e) {
                        this.log().debug("DriverTimeout when closing subscription. {}", (Object)((Object)e));
                    }
                }
                finally {
                    this.$outer.org$apache$pekko$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceStopped(this.$outer.org$apache$pekko$remote$artery$aeron$AeronSource$$channel, this.$outer.org$apache$pekko$remote$artery$aeron$AeronSource$$streamId);
                }
            }

            public void onPull() {
                this.backoffCount = this.$outer.org$apache$pekko$remote$artery$aeron$AeronSource$$spinning;
                this.subscriberLoop();
            }

            private void subscriberLoop() {
                while (true) {
                    this.messageHandler.reset();
                    int fragmentsRead = this.subscription.poll((FragmentHandler)this.messageHandler.fragmentsHandler(), 1);
                    EnvelopeBuffer msg = this.messageHandler.messageReceived();
                    this.messageHandler.reset();
                    if (fragmentsRead > 0) {
                        ++this.countBeforeDelegate;
                        if (msg == null) continue;
                        this.onMessage(msg);
                        return;
                    }
                    --this.backoffCount;
                    if (this.backoffCount <= 0) break;
                    ThreadHints.onSpinWait();
                }
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceDelegateToTaskRunner(this.countBeforeDelegate);
                this.delegatingToTaskRunner = true;
                this.delegateTaskStartTime = System.nanoTime();
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSource$$taskRunner.command(this.addPollTask);
            }

            public Future channelEndpointStatus() {
                Promise promise = Promise$.MODULE$.apply();
                this.getStatusCb.invoke((Object)promise);
                return promise.future();
            }

            private void taskOnMessage(EnvelopeBuffer data) {
                this.countBeforeDelegate = 0L;
                this.delegatingToTaskRunner = false;
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceReturnFromTaskRunner(System.nanoTime() - this.delegateTaskStartTime);
                this.freeSessionBuffers();
                this.onMessage(data);
            }

            private void onMessage(EnvelopeBuffer data) {
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceReceived(data.byteBuffer().limit());
                this.push(this.$outer.out(), data);
            }

            private void freeSessionBuffers() {
                if (!this.delegatingToTaskRunner) {
                    this.loop$1(this.pendingUnavailableImages);
                    this.pendingUnavailableImages = package$.MODULE$.Nil();
                    return;
                }
            }

            public void onUnavailableImage(int sessionId) {
                try {
                    this.onUnavailableImageCb.invoke((Object)BoxesRunTime.boxToInteger((int)sessionId));
                }
                catch (Throwable throwable) {
                    Option option;
                    Throwable throwable2 = throwable;
                    if (throwable2 != null && !(option = NonFatal$.MODULE$.unapply(throwable2)).isEmpty()) {
                        Throwable throwable3 = (Throwable)option.get();
                    }
                    throw throwable;
                }
            }

            private final void loop$1(List remaining) {
                List list;
                while (true) {
                    list = remaining;
                    Nil$ nil$ = package$.MODULE$.Nil();
                    List list2 = list;
                    if (!(nil$ != null ? !nil$.equals(list2) : list2 != null)) {
                        return;
                    }
                    if (!(list instanceof .colon.colon)) break;
                    .colon.colon colon2 = (.colon.colon)list;
                    List list3 = colon2.next$access$1();
                    int sessionId = BoxesRunTime.unboxToInt((Object)colon2.head());
                    List tail = list3;
                    this.messageHandler.fragmentsHandler().freeSessionBuffer(sessionId);
                    remaining = tail;
                }
                throw new MatchError((Object)list);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$init$$$anonfun$1(org.apache.pekko.remote.artery.EnvelopeBuffer ), $init$$$anonfun$2(int ), $init$$$anonfun$3(scala.concurrent.Promise )}, serializedLambda);
            }
        };
        return Tuple2$.MODULE$.apply((Object)logic, (Object)logic);
    }

    public static interface AeronLifecycle {
        public void onUnavailableImage(int var1);

        public Future<Object> channelEndpointStatus();
    }

    public static class Fragments
    extends FragmentAssembler {
        public Fragments(Function1<EnvelopeBuffer, BoxedUnit> onMessage, EnvelopeBufferPool pool) {
            super(AeronSource$.MODULE$.org$apache$pekko$remote$artery$aeron$AeronSource$$$Fragments$superArg$1(onMessage, pool));
        }
    }

    public static class MessageHandler {
        private EnvelopeBuffer messageReceived = null;
        private final Fragments fragmentsHandler;

        public MessageHandler(EnvelopeBufferPool pool) {
            this.fragmentsHandler = new Fragments((Function1<EnvelopeBuffer, BoxedUnit>)(JProcedure1 & Serializable)data -> this.messageReceived_$eq((EnvelopeBuffer)data), pool);
        }

        public void reset() {
            this.messageReceived_$eq(null);
        }

        public EnvelopeBuffer messageReceived() {
            return this.messageReceived;
        }

        public void messageReceived_$eq(EnvelopeBuffer x$1) {
            this.messageReceived = x$1;
        }

        public Fragments fragmentsHandler() {
            return this.fragmentsHandler;
        }
    }
}

