/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector;

import io.axoniq.axonserver.connector.ResultStream;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResultStreamPublisher<M>
implements Publisher<M> {
    private static final Logger logger = LoggerFactory.getLogger(ResultStreamPublisher.class);
    private final Supplier<ResultStream<M>> resultStreamSupplier;

    public ResultStreamPublisher(Supplier<ResultStream<M>> resultStreamSupplier) {
        this.resultStreamSupplier = resultStreamSupplier;
    }

    public void subscribe(Subscriber<? super M> s) {
        ResultStreamSubscription subscription = new ResultStreamSubscription(s, this.resultStreamSupplier.get());
        s.onSubscribe((Subscription)subscription);
        subscription.afterSubscribe();
    }

    private class ResultStreamSubscription
    implements Subscription {
        private final Subscriber<? super M> subscriber;
        private final ResultStream<M> resultStream;
        private final AtomicLong requested = new AtomicLong(0L);
        private final AtomicBoolean signalGate = new AtomicBoolean(false);
        private final AtomicBoolean cancelled = new AtomicBoolean(false);
        private final AtomicBoolean completed = new AtomicBoolean(false);

        private ResultStreamSubscription(Subscriber<? super M> subscriber, ResultStream<M> resultStream) {
            this.subscriber = subscriber;
            this.resultStream = resultStream;
        }

        public void request(long n) {
            if (n <= 0L) {
                this.subscriber.onError((Throwable)new IllegalArgumentException("negative subscription request"));
                return;
            }
            this.requested.updateAndGet(current -> current + Math.min(Long.MAX_VALUE - current, n));
            this.signal();
        }

        public void cancel() {
            logger.debug("The call has been cancelled.");
            this.cancelled.set(true);
            this.resultStream.close();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void signal() {
            while (this.canConsume() && this.signalGate.compareAndSet(false, true)) {
                try {
                    long requests = this.requested.get();
                    long counter = 0L;
                    int i = 0;
                    while ((long)i < requests && !this.resultStream.isClosed() && this.resultStream.peek() != null) {
                        this.subscriber.onNext(this.resultStream.next());
                        --counter;
                        ++i;
                    }
                    this.requested.getAndAccumulate(counter, Long::sum);
                    if (!this.resultStream.isClosed()) continue;
                    Optional<Throwable> error = this.resultStream.getError();
                    if (error.isPresent()) {
                        this.onError(error.get());
                        continue;
                    }
                    this.subscriber.onComplete();
                    this.completed.set(true);
                }
                catch (InterruptedException e) {
                    this.signalGate.set(false);
                    Thread.currentThread().interrupt();
                }
                catch (Exception ex) {
                    this.onError(ex);
                }
                finally {
                    this.signalGate.set(false);
                }
            }
        }

        private void onError(Throwable error) {
            logger.debug("An error occurred accessing the ResultStream.", error);
            this.subscriber.onError(error);
            this.completed.set(true);
        }

        private boolean canConsume() {
            try {
                return !this.cancelled.get() && !this.completed.get() && (this.resultStream.isClosed() || this.resultStream.peek() != null && this.requested.get() > 0L);
            }
            catch (Exception e) {
                this.onError(e);
                return false;
            }
        }

        private void afterSubscribe() {
            this.resultStream.onAvailable(this::signal);
            this.signal();
        }
    }
}

