/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.transfer.s3.internal;

import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
import software.amazon.awssdk.utils.Logger;

@SdkInternalApi
public final class S3CrtRequestBodyStreamAdapter
implements HttpRequestBodyStream {
    static final long DEFAULT_REQUEST_SIZE = 8L;
    private static final Logger LOG = Logger.loggerFor(S3CrtRequestBodyStreamAdapter.class);
    private final AtomicReference<SubscriptionStatus> subscriptionStatus = new AtomicReference<SubscriptionStatus>(SubscriptionStatus.NOT_SUBSCRIBED);
    private final BlockingQueue<Subscription> subscriptionQueue = new LinkedBlockingQueue<Subscription>(1);
    private final BlockingDeque<Event> eventBuffer = new LinkedBlockingDeque<Event>();
    private final Publisher<ByteBuffer> bodyPublisher;
    private volatile Subscription subscription;
    private Subscriber<? super ByteBuffer> subscriber;
    private long pending = 0L;

    public S3CrtRequestBodyStreamAdapter(Publisher<ByteBuffer> bodyPublisher) {
        this.bodyPublisher = bodyPublisher;
        this.subscriber = this.createSubscriber();
    }

    public boolean sendRequestBody(ByteBuffer outBuffer) {
        LOG.trace(() -> "Getting data to fill buffer of size " + outBuffer.remaining());
        this.waitForSubscription();
        block5: while (outBuffer.hasRemaining()) {
            Event ev;
            if (this.eventBuffer.isEmpty() && this.pending == 0L) {
                this.pending = 8L;
                this.subscription.request(this.pending);
            }
            if (!(ev = this.takeFirstEvent()).subscriber().equals(this.subscriber)) {
                LOG.debug(() -> "Received an event for a previous publisher. Discarding. Event was: " + ev);
                continue;
            }
            switch (ev.type()) {
                case DATA: {
                    ByteBuffer srcBuffer = ((DataEvent)ev).data();
                    ByteBuffer bufferToWrite = srcBuffer.duplicate();
                    int nBytesToWrite = Math.min(outBuffer.remaining(), srcBuffer.remaining());
                    if (bufferToWrite.remaining() > nBytesToWrite) {
                        bufferToWrite.limit(bufferToWrite.position() + nBytesToWrite);
                    }
                    outBuffer.put(bufferToWrite);
                    srcBuffer.position(bufferToWrite.limit());
                    if (!srcBuffer.hasRemaining()) {
                        --this.pending;
                        continue block5;
                    }
                    this.eventBuffer.push(ev);
                    continue block5;
                }
                case COMPLETE: {
                    this.eventBuffer.push(ev);
                    this.pending = 0L;
                    return true;
                }
                case ERROR: {
                    this.eventBuffer.push(ev);
                    Throwable t = ((ErrorEvent)ev).error();
                    if (t instanceof RuntimeException) {
                        throw (RuntimeException)t;
                    }
                    throw new RuntimeException(t);
                }
            }
            throw new IllegalStateException("Unknown event type: " + (Object)((Object)ev.type()));
        }
        return false;
    }

    public boolean resetPosition() {
        this.subscription.cancel();
        this.subscription = null;
        this.subscriber = this.createSubscriber();
        this.subscriptionStatus.set(SubscriptionStatus.NOT_SUBSCRIBED);
        this.eventBuffer.clear();
        this.pending = 0L;
        return true;
    }

    private Event takeFirstEvent() {
        try {
            return this.eventBuffer.takeFirst();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for next event", e);
        }
    }

    public SubscriberImpl createSubscriber() {
        return new SubscriberImpl(this::setSubscription, this.eventBuffer);
    }

    private void setSubscription(Subscription subscription) {
        if (this.subscriptionStatus.compareAndSet(SubscriptionStatus.SUBSCRIBING, SubscriptionStatus.SUBSCRIBED)) {
            this.subscriptionQueue.add(subscription);
        } else {
            LOG.error(() -> "The supplier stopped waiting for the subscription. This is likely because it took longer than the timeout to arrive. Cancelling the subscription");
            subscription.cancel();
        }
    }

    private void waitForSubscription() {
        if (!this.subscriptionStatus.compareAndSet(SubscriptionStatus.NOT_SUBSCRIBED, SubscriptionStatus.SUBSCRIBING)) {
            return;
        }
        this.bodyPublisher.subscribe(this.subscriber);
        try {
            this.subscription = this.subscriptionQueue.poll(5L, TimeUnit.SECONDS);
            if (this.subscription == null) {
                if (!this.subscriptionStatus.compareAndSet(SubscriptionStatus.SUBSCRIBING, SubscriptionStatus.TIMED_OUT)) {
                    this.subscriptionQueue.take().cancel();
                }
                throw new RuntimeException("Publisher did not respond with a subscription within 5 seconds");
            }
        }
        catch (InterruptedException e) {
            LOG.error(() -> "Interrupted while waiting for subscription", (Throwable)e);
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for subscription", e);
        }
    }

    private static enum SubscriptionStatus {
        NOT_SUBSCRIBED,
        SUBSCRIBING,
        SUBSCRIBED,
        TIMED_OUT;

    }

    private static final class ErrorEvent
    implements Event {
        private final Subscriber<? super ByteBuffer> subscriber;
        private final Throwable error;

        ErrorEvent(Subscriber<? super ByteBuffer> subscriber, Throwable error) {
            this.subscriber = subscriber;
            this.error = error;
        }

        @Override
        public Subscriber<? super ByteBuffer> subscriber() {
            return this.subscriber;
        }

        @Override
        public EventType type() {
            return EventType.ERROR;
        }

        public Throwable error() {
            return this.error;
        }
    }

    private static final class CompleteEvent
    implements Event {
        private final Subscriber<? super ByteBuffer> subscriber;

        CompleteEvent(Subscriber<? super ByteBuffer> subscriber) {
            this.subscriber = subscriber;
        }

        @Override
        public Subscriber<? super ByteBuffer> subscriber() {
            return this.subscriber;
        }

        @Override
        public EventType type() {
            return EventType.COMPLETE;
        }
    }

    private static final class DataEvent
    implements Event {
        private final Subscriber<? super ByteBuffer> subscriber;
        private final ByteBuffer data;

        DataEvent(Subscriber<? super ByteBuffer> subscriber, ByteBuffer data) {
            this.subscriber = subscriber;
            this.data = data;
        }

        @Override
        public Subscriber<? super ByteBuffer> subscriber() {
            return this.subscriber;
        }

        @Override
        public EventType type() {
            return EventType.DATA;
        }

        public ByteBuffer data() {
            return this.data;
        }
    }

    static interface Event {
        public Subscriber<? super ByteBuffer> subscriber();

        public EventType type();
    }

    private static enum EventType {
        DATA,
        COMPLETE,
        ERROR;

    }

    static class SubscriberImpl
    implements Subscriber<ByteBuffer> {
        private final Consumer<Subscription> subscriptionSetter;
        private final Deque<Event> eventBuffer;
        private boolean subscribed = false;

        SubscriberImpl(Consumer<Subscription> subscriptionSetter, Deque<Event> eventBuffer) {
            this.subscriptionSetter = subscriptionSetter;
            this.eventBuffer = eventBuffer;
        }

        public void onSubscribe(Subscription subscription) {
            if (subscription == null) {
                throw new NullPointerException("Subscription must not be null");
            }
            if (this.subscribed) {
                subscription.cancel();
                return;
            }
            this.subscriptionSetter.accept(subscription);
            this.subscribed = true;
        }

        public void onNext(ByteBuffer byteBuffer) {
            if (byteBuffer == null) {
                throw new NullPointerException("byteBuffer must not be null");
            }
            LOG.trace(() -> "Received new data of size: " + byteBuffer.remaining());
            this.eventBuffer.add(new DataEvent(this, byteBuffer));
        }

        public void onError(Throwable throwable) {
            this.eventBuffer.add(new ErrorEvent(this, throwable));
        }

        public void onComplete() {
            this.eventBuffer.add(new CompleteEvent(this));
        }
    }
}

