/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pekko.remote.artery.aeron;

import io.aeron.Aeron;
import io.aeron.ConcurrentPublication;
import io.aeron.Publication;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.hints.ThreadHints;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.remote.artery.EnvelopeBuffer;
import org.apache.pekko.remote.artery.EnvelopeBufferPool;
import org.apache.pekko.remote.artery.RemotingFlightRecorder;
import org.apache.pekko.remote.artery.aeron.AeronSink$;
import org.apache.pekko.remote.artery.aeron.TaskRunner;
import org.apache.pekko.remote.artery.aeron.TaskRunner$Add$;
import org.apache.pekko.remote.artery.aeron.TaskRunner$Remove$;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Inlet;
import org.apache.pekko.stream.Inlet$;
import org.apache.pekko.stream.SinkShape;
import org.apache.pekko.stream.SinkShape$;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.GraphStageWithMaterializedValue;
import org.apache.pekko.stream.stage.InHandler;
import org.apache.pekko.util.PrettyDuration;
import org.apache.pekko.util.PrettyDuration$;
import scala.Function0;
import scala.Function1;
import scala.Tuple2;
import scala.Tuple2$;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.runtime.LambdaDeserialize;
import scala.runtime.function.JProcedure1;
import scala.util.Failure$;
import scala.util.Success$;
import scala.util.Try;
import scala.util.control.NoStackTrace;

public class AeronSink
extends GraphStageWithMaterializedValue<SinkShape<EnvelopeBuffer>, Future<Done>> {
    public final String org$apache$pekko$remote$artery$aeron$AeronSink$$channel;
    public final int org$apache$pekko$remote$artery$aeron$AeronSink$$streamId;
    public final Aeron org$apache$pekko$remote$artery$aeron$AeronSink$$aeron;
    public final TaskRunner org$apache$pekko$remote$artery$aeron$AeronSink$$taskRunner;
    public final EnvelopeBufferPool org$apache$pekko$remote$artery$aeron$AeronSink$$pool;
    public final Duration org$apache$pekko$remote$artery$aeron$AeronSink$$giveUpAfter;
    public final RemotingFlightRecorder org$apache$pekko$remote$artery$aeron$AeronSink$$flightRecorder;
    private final Inlet in;
    private final SinkShape shape;

    public AeronSink(String channel, int streamId, Aeron aeron, TaskRunner taskRunner, EnvelopeBufferPool pool, Duration giveUpAfter, RemotingFlightRecorder flightRecorder) {
        this.org$apache$pekko$remote$artery$aeron$AeronSink$$channel = channel;
        this.org$apache$pekko$remote$artery$aeron$AeronSink$$streamId = streamId;
        this.org$apache$pekko$remote$artery$aeron$AeronSink$$aeron = aeron;
        this.org$apache$pekko$remote$artery$aeron$AeronSink$$taskRunner = taskRunner;
        this.org$apache$pekko$remote$artery$aeron$AeronSink$$pool = pool;
        this.org$apache$pekko$remote$artery$aeron$AeronSink$$giveUpAfter = giveUpAfter;
        this.org$apache$pekko$remote$artery$aeron$AeronSink$$flightRecorder = flightRecorder;
        this.in = Inlet$.MODULE$.apply("AeronSink");
        this.shape = SinkShape$.MODULE$.apply(this.in());
    }

    public Inlet<EnvelopeBuffer> in() {
        return this.in;
    }

    public SinkShape<EnvelopeBuffer> shape() {
        return this.shape;
    }

    public Tuple2<GraphStageLogic, Future<Done>> createLogicAndMaterializedValue(Attributes inheritedAttributes) {
        Promise completed = Promise$.MODULE$.apply();
        InHandler logic = new InHandler(completed, this){
            private final Promise completed$1;
            private EnvelopeBuffer envelopeInFlight;
            private final ConcurrentPublication pub;
            private Try completedValue;
            private final int spinning;
            private int backoffCount;
            private int lastMsgSize;
            private final OfferTask offerTask;
            private final TaskRunner.Add addOfferTask;
            private boolean offerTaskInProgress;
            private long delegateTaskStartTime;
            private long countBeforeDelegate;
            private final /* synthetic */ AeronSink $outer;
            {
                this.completed$1 = completed$2;
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                super($outer.shape());
                this.envelopeInFlight = null;
                this.pub = $outer.org$apache$pekko$remote$artery$aeron$AeronSink$$aeron.addPublication($outer.org$apache$pekko$remote$artery$aeron$AeronSink$$channel, $outer.org$apache$pekko$remote$artery$aeron$AeronSink$$streamId);
                this.completedValue = Success$.MODULE$.apply((Object)Done$.MODULE$);
                this.backoffCount = this.spinning = 2 * $outer.org$apache$pekko$remote$artery$aeron$AeronSink$$taskRunner.idleCpuLevel();
                this.lastMsgSize = 0;
                this.offerTask = new OfferTask((Publication)this.pub, null, this.lastMsgSize, (AsyncCallback<BoxedUnit>)this.getAsyncCallback((Function1)(JProcedure1 & Serializable)_$1 -> this.taskOnOfferSuccess()), $outer.org$apache$pekko$remote$artery$aeron$AeronSink$$giveUpAfter, (AsyncCallback<BoxedUnit>)this.getAsyncCallback((Function1)(JProcedure1 & Serializable)_$2 -> this.onGiveUp()), (AsyncCallback<BoxedUnit>)this.getAsyncCallback((Function1)(JProcedure1 & Serializable)_$3 -> this.onPublicationClosed()));
                this.addOfferTask = TaskRunner$Add$.MODULE$.apply(this.offerTask);
                this.offerTaskInProgress = false;
                this.delegateTaskStartTime = 0L;
                this.countBeforeDelegate = 0L;
                this.setHandler($outer.in(), this);
            }

            public void preStart() {
                this.setKeepGoing(true);
                this.pull(this.$outer.in());
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$flightRecorder.aeronSinkStarted(this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$channel, this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$streamId);
            }

            public void postStop() {
                try {
                    this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$taskRunner.command(TaskRunner$Remove$.MODULE$.apply(this.addOfferTask.task()));
                    this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$flightRecorder.aeronSinkTaskRunnerRemoved(this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$channel, this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$streamId);
                    this.pub.close();
                    this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$flightRecorder.aeronSinkPublicationClosed(this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$channel, this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$streamId);
                }
                finally {
                    this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$flightRecorder.aeronSinkStopped(this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$channel, this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$streamId);
                    this.completed$1.complete(this.completedValue);
                }
            }

            public void onPush() {
                this.envelopeInFlight = (EnvelopeBuffer)this.grab(this.$outer.in());
                this.backoffCount = this.spinning;
                this.lastMsgSize = this.envelopeInFlight.byteBuffer().limit();
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$flightRecorder.aeronSinkEnvelopeGrabbed(this.lastMsgSize);
                this.publish();
            }

            private void publish() {
                long result;
                while ((result = this.pub.offer((DirectBuffer)this.envelopeInFlight.aeronBuffer(), 0, this.lastMsgSize)) < 0L) {
                    if (result == -4L) {
                        this.onPublicationClosed();
                        return;
                    }
                    if (result == -1L) {
                        this.delegateBackoff();
                        return;
                    }
                    --this.backoffCount;
                    if (this.backoffCount > 0) {
                        ThreadHints.onSpinWait();
                        continue;
                    }
                    this.delegateBackoff();
                    return;
                }
                ++this.countBeforeDelegate;
                this.onOfferSuccess();
            }

            private void delegateBackoff() {
                this.offerTaskInProgress = true;
                this.offerTask.buffer_$eq(this.envelopeInFlight.aeronBuffer());
                this.offerTask.msgSize_$eq(this.lastMsgSize);
                this.delegateTaskStartTime = System.nanoTime();
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$taskRunner.command(this.addOfferTask);
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$flightRecorder.aeronSinkDelegateToTaskRunner(this.countBeforeDelegate);
            }

            private void taskOnOfferSuccess() {
                this.countBeforeDelegate = 0L;
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$flightRecorder.aeronSinkReturnFromTaskRunner(System.nanoTime() - this.delegateTaskStartTime);
                this.onOfferSuccess();
            }

            private void onOfferSuccess() {
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$flightRecorder.aeronSinkEnvelopeOffered(this.lastMsgSize);
                this.offerTaskInProgress = false;
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$pool.release(this.envelopeInFlight);
                this.offerTask.buffer_$eq(null);
                this.envelopeInFlight = null;
                if (this.isClosed(this.$outer.in())) {
                    this.completeStage();
                    return;
                }
                this.pull(this.$outer.in());
            }

            private void onGiveUp() {
                this.offerTaskInProgress = false;
                GaveUpMessageException cause = new GaveUpMessageException(new StringBuilder(35).append("Gave up sending message to ").append(this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$channel).append(" after ").append(PrettyDuration.PrettyPrintableDuration$.MODULE$.pretty$extension(PrettyDuration$.MODULE$.PrettyPrintableDuration(this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$giveUpAfter))).append(".").toString());
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$flightRecorder.aeronSinkGaveUpEnvelope(cause.getMessage());
                this.completedValue = Failure$.MODULE$.apply((Throwable)cause);
                this.failStage(cause);
            }

            private void onPublicationClosed() {
                this.offerTaskInProgress = false;
                PublicationClosedException cause = new PublicationClosedException(new StringBuilder(35).append("Aeron Publication to [").append(this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$channel).append("] was closed.").toString());
                this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$flightRecorder.aeronSinkPublicationClosedUnexpectedly(this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$channel, this.$outer.org$apache$pekko$remote$artery$aeron$AeronSink$$streamId);
                this.completedValue = Failure$.MODULE$.apply((Throwable)cause);
                this.failStage(cause);
            }

            public void onUpstreamFinish() {
                if (!this.offerTaskInProgress) {
                    InHandler.onUpstreamFinish$((InHandler)this);
                    return;
                }
            }

            public void onUpstreamFailure(Throwable cause) {
                this.completedValue = Failure$.MODULE$.apply(cause);
                InHandler.onUpstreamFailure$((InHandler)this, (Throwable)cause);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$init$$$anonfun$1(scala.runtime.BoxedUnit ), $init$$$anonfun$2(scala.runtime.BoxedUnit ), $init$$$anonfun$3(scala.runtime.BoxedUnit )}, serializedLambda);
            }
        };
        return Tuple2$.MODULE$.apply((Object)logic, (Object)completed.future());
    }

    public static final class GaveUpMessageException
    extends RuntimeException
    implements NoStackTrace {
        public GaveUpMessageException(String msg) {
            super(msg);
            NoStackTrace.$init$((NoStackTrace)this);
        }

        public Throwable scala$util$control$NoStackTrace$$super$fillInStackTrace() {
            return super.fillInStackTrace();
        }
    }

    public static final class OfferTask
    implements Function0<Object> {
        private final Publication pub;
        private UnsafeBuffer buffer;
        private int msgSize;
        private final AsyncCallback<BoxedUnit> onOfferSuccess;
        private final AsyncCallback<BoxedUnit> onGiveUp;
        private final AsyncCallback<BoxedUnit> onPublicationClosed;
        private final long giveUpAfterNanos;
        private long n;
        private long startTime;

        public OfferTask(Publication pub, UnsafeBuffer buffer, int msgSize, AsyncCallback<BoxedUnit> onOfferSuccess, Duration giveUpAfter, AsyncCallback<BoxedUnit> onGiveUp, AsyncCallback<BoxedUnit> onPublicationClosed) {
            long l;
            this.pub = pub;
            this.buffer = buffer;
            this.msgSize = msgSize;
            this.onOfferSuccess = onOfferSuccess;
            this.onGiveUp = onGiveUp;
            this.onPublicationClosed = onPublicationClosed;
            Duration duration = giveUpAfter;
            if (duration instanceof FiniteDuration) {
                FiniteDuration f = (FiniteDuration)duration;
                l = f.toNanos();
            } else {
                l = -1L;
            }
            this.giveUpAfterNanos = l;
            this.n = 0L;
            this.startTime = 0L;
        }

        public UnsafeBuffer buffer() {
            return this.buffer;
        }

        public void buffer_$eq(UnsafeBuffer x$1) {
            this.buffer = x$1;
        }

        public int msgSize() {
            return this.msgSize;
        }

        public void msgSize_$eq(int x$1) {
            this.msgSize = x$1;
        }

        public long giveUpAfterNanos() {
            return this.giveUpAfterNanos;
        }

        public long n() {
            return this.n;
        }

        public void n_$eq(long x$1) {
            this.n = x$1;
        }

        public long startTime() {
            return this.startTime;
        }

        public void startTime_$eq(long x$1) {
            this.startTime = x$1;
        }

        public boolean apply() {
            return this.apply$mcZ$sp();
        }

        public boolean apply$mcZ$sp() {
            if (this.n() == 0L) {
                this.startTime_$eq(this.giveUpAfterNanos() >= 0L ? System.nanoTime() : 0L);
            }
            this.n_$eq(this.n() + 1L);
            long result = this.pub.offer((DirectBuffer)this.buffer(), 0, this.msgSize());
            if (result >= 0L) {
                this.n_$eq(0L);
                this.onOfferSuccess.invoke((Object)BoxedUnit.UNIT);
                return true;
            }
            if (result == -4L) {
                this.onPublicationClosed.invoke((Object)BoxedUnit.UNIT);
                return true;
            }
            if (this.giveUpAfterNanos() >= 0L && (this.n() & (long)AeronSink$.org$apache$pekko$remote$artery$aeron$AeronSink$$$TimerCheckMask) == 0L && System.nanoTime() - this.startTime() > this.giveUpAfterNanos()) {
                this.n_$eq(0L);
                this.onGiveUp.invoke((Object)BoxedUnit.UNIT);
                return true;
            }
            return false;
        }
    }

    public static final class PublicationClosedException
    extends RuntimeException
    implements NoStackTrace {
        public PublicationClosedException(String msg) {
            super(msg);
            NoStackTrace.$init$((NoStackTrace)this);
        }

        public Throwable scala$util$control$NoStackTrace$$super$fillInStackTrace() {
            return super.fillInStackTrace();
        }
    }
}

