/*
 * Decompiled with CFR 0.152.
 */
package karate.com.linecorp.armeria.common.multipart;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import karate.com.linecorp.armeria.common.HttpData;
import karate.com.linecorp.armeria.common.annotation.Nullable;
import karate.com.linecorp.armeria.common.multipart.BodyPart;
import karate.com.linecorp.armeria.common.multipart.MimeParser;
import karate.com.linecorp.armeria.common.multipart.MimeParsingException;
import karate.com.linecorp.armeria.common.stream.AbortedStreamException;
import karate.com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import karate.com.linecorp.armeria.common.stream.DefaultStreamMessage;
import karate.com.linecorp.armeria.common.stream.StreamDecoder;
import karate.com.linecorp.armeria.common.stream.StreamDecoderInput;
import karate.com.linecorp.armeria.common.stream.StreamDecoderOutput;
import karate.com.linecorp.armeria.common.stream.StreamMessage;
import karate.com.linecorp.armeria.common.stream.SubscriptionOption;
import karate.com.linecorp.armeria.common.util.EventLoopGroups;
import karate.com.linecorp.armeria.internal.common.stream.AbortingSubscriber;
import karate.com.linecorp.armeria.internal.common.stream.DecodedStreamMessage;
import karate.com.linecorp.armeria.internal.common.stream.SubscriberUtil;
import karate.com.linecorp.armeria.internal.shaded.guava.math.LongMath;
import karate.io.netty.buffer.ByteBuf;
import karate.io.netty.buffer.ByteBufAllocator;
import karate.io.netty.util.concurrent.EventExecutor;
import karate.org.reactivestreams.Subscriber;
import karate.org.reactivestreams.Subscription;

final class MultipartDecoder
implements StreamDecoder<HttpData, BodyPart>,
StreamMessage<BodyPart> {
    private static final AtomicReferenceFieldUpdater<MultipartDecoder, MultipartSubscriber> delegatedSubscriberUpdater = AtomicReferenceFieldUpdater.newUpdater(MultipartDecoder.class, MultipartSubscriber.class, "delegatedSubscriber");
    private final DecodedStreamMessage<HttpData, BodyPart> decoded;
    private final String boundary;
    @Nullable
    private MimeParser parser;
    @Nullable
    private volatile MultipartSubscriber delegatedSubscriber;
    private long demandOfMultipart;

    MultipartDecoder(StreamMessage<? extends HttpData> upstream, String boundary, ByteBufAllocator alloc) {
        this.boundary = boundary;
        this.decoded = new DecodedStreamMessage<HttpData, BodyPart>(upstream, this, alloc);
    }

    @Override
    public ByteBuf toByteBuf(HttpData in) {
        return in.byteBuf();
    }

    @Override
    public void process(StreamDecoderInput in, StreamDecoderOutput<BodyPart> out) throws Exception {
        if (this.parser == null) {
            this.parser = new MimeParser(in, out, this.boundary, this);
        }
        this.parser.parse();
    }

    @Override
    public void processOnComplete(StreamDecoderInput in, StreamDecoderOutput<BodyPart> out) {
        if (this.parser != null) {
            try {
                this.parser.close();
            }
            catch (MimeParsingException e) {
                MultipartSubscriber delegatedSubscriber = this.delegatedSubscriber;
                assert (delegatedSubscriber != null) : "processOnComplete is called after subscription";
                delegatedSubscriber.onError(e);
            }
        }
    }

    @Override
    public void processOnError(Throwable cause) {
        if (this.parser != null) {
            try {
                this.parser.close();
            }
            catch (MimeParsingException mimeParsingException) {
                // empty catch block
            }
        }
        MultipartSubscriber delegatedSubscriber = this.delegatedSubscriber;
        assert (delegatedSubscriber != null) : "processOnError is called after subscription";
        delegatedSubscriber.onError(cause);
    }

    @Override
    public boolean isOpen() {
        return this.decoded.isOpen();
    }

    @Override
    public boolean isEmpty() {
        return this.decoded.isEmpty();
    }

    @Override
    public long demand() {
        return this.demandOfMultipart;
    }

    @Override
    public CompletableFuture<Void> whenComplete() {
        return this.decoded.whenComplete();
    }

    @Override
    public void subscribe(Subscriber<? super BodyPart> subscriber, EventExecutor executor, SubscriptionOption ... options) {
        Objects.requireNonNull(subscriber, "subscriber");
        Objects.requireNonNull(executor, "executor");
        Objects.requireNonNull(options, "options");
        MultipartSubscriber multipartSubscriber = new MultipartSubscriber(subscriber, executor);
        if (!delegatedSubscriberUpdater.compareAndSet(this, null, multipartSubscriber)) {
            MultipartSubscriber delegatedSubscriber = this.delegatedSubscriber;
            assert (delegatedSubscriber != null);
            SubscriberUtil.failLateSubscriber(executor, subscriber, delegatedSubscriber.subscriber);
            return;
        }
        this.decoded.subscribe(multipartSubscriber, executor, options);
    }

    @Override
    public void abort() {
        this.abort(AbortedStreamException.get());
    }

    @Override
    public void abort(Throwable cause) {
        Objects.requireNonNull(cause, "cause");
        if (this.delegatedSubscriber == null) {
            MultipartSubscriber abortedSubscriber = new MultipartSubscriber(AbortingSubscriber.get(cause), EventLoopGroups.directEventLoop());
            delegatedSubscriberUpdater.compareAndSet(this, null, abortedSubscriber);
        }
        this.decoded.abort(cause);
    }

    BodyPartPublisher onBodyPartBegin() {
        return new BodyPartPublisher();
    }

    void requestUpstreamForBodyPartData() {
        @Nullable MultipartSubscriber delegatedSubscriber = this.delegatedSubscriber;
        if (delegatedSubscriber != null) {
            delegatedSubscriber.requestUpstreamForBodyPartData();
        }
    }

    private final class MultipartSubscriber
    implements Subscriber<BodyPart>,
    Subscription {
        private final Subscriber<? super BodyPart> subscriber;
        private final EventExecutor executor;
        @Nullable
        private Subscription subscription;
        @Nullable
        private StreamMessage<? extends HttpData> currentExposedBodyPartPublisher;
        private boolean cancelled;

        private MultipartSubscriber(Subscriber<? super BodyPart> subscriber, EventExecutor executor) {
            this.subscriber = subscriber;
            this.executor = executor;
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            this.subscriber.onSubscribe(this);
        }

        @Override
        public void onNext(BodyPart bodyPart) {
            if (MultipartDecoder.this.demandOfMultipart != Long.MAX_VALUE) {
                MultipartDecoder.this.demandOfMultipart--;
            }
            this.currentExposedBodyPartPublisher = bodyPart.content();
            this.currentExposedBodyPartPublisher.whenComplete().handle((unused, throwable) -> {
                if (this.executor.inEventLoop()) {
                    this.onNext0();
                } else {
                    this.executor.execute(this::onNext0);
                }
                return null;
            });
            this.subscriber.onNext(bodyPart);
        }

        private void onNext0() {
            this.currentExposedBodyPartPublisher = null;
            if (MultipartDecoder.this.demandOfMultipart > 0L && !MultipartDecoder.this.isComplete()) {
                assert (this.subscription != null);
                this.subscription.request(1L);
            }
        }

        @Override
        public void onError(Throwable t) {
            this.subscriber.onError(t);
            if (this.currentExposedBodyPartPublisher != null) {
                this.currentExposedBodyPartPublisher.abort(t);
            }
            this.cleanup();
        }

        @Override
        public void onComplete() {
            this.subscriber.onComplete();
            this.cleanup();
        }

        @Override
        public void request(long n) {
            if (n <= 0L) {
                IllegalArgumentException exception = new IllegalArgumentException("Expecting only positive requests for parts");
                MultipartDecoder.this.abort(exception);
                return;
            }
            if (this.executor.inEventLoop()) {
                this.request0(n);
            } else {
                this.executor.execute(() -> this.request0(n));
            }
        }

        private void request0(long n) {
            long oldDemand = MultipartDecoder.this.demandOfMultipart;
            MultipartDecoder.this.demandOfMultipart = LongMath.saturatedAdd(oldDemand, n);
            if (oldDemand == 0L && this.currentExposedBodyPartPublisher == null) {
                assert (this.subscription != null);
                this.subscription.request(1L);
            }
        }

        @Override
        public void cancel() {
            if (this.executor.inEventLoop()) {
                this.cancel0();
            } else {
                this.executor.execute(this::cancel0);
            }
        }

        void requestUpstreamForBodyPartData() {
            if (this.executor.inEventLoop()) {
                this.requestUpstreamForBodyPartData0();
            } else {
                this.executor.execute(this::requestUpstreamForBodyPartData0);
            }
        }

        private void requestUpstreamForBodyPartData0() {
            if (this.currentExposedBodyPartPublisher != null && this.currentExposedBodyPartPublisher.isOpen()) {
                MultipartDecoder.this.decoded.askUpstreamForElement();
            }
        }

        private void cancel0() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            assert (this.subscription != null);
            this.subscription.cancel();
            if (this.currentExposedBodyPartPublisher != null) {
                this.currentExposedBodyPartPublisher.abort(CancelledSubscriptionException.get());
            }
            this.cleanup();
        }

        private void cleanup() {
            this.currentExposedBodyPartPublisher = null;
            MultipartDecoder.this.delegatedSubscriber = null;
        }
    }

    final class BodyPartPublisher
    extends DefaultStreamMessage<HttpData> {
        BodyPartPublisher() {
        }

        @Override
        protected void onRequest(long n) {
            this.whenConsumed().thenRun(() -> {
                if (this.demand() > 0L) {
                    MultipartDecoder.this.requestUpstreamForBodyPartData();
                }
            });
        }
    }
}

