/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.Flow;
import io.helidon.common.reactive.RequestedCounter;
import io.helidon.common.reactive.SingleSubscriberHolder;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;

final class FixedItemsPublisher<T>
implements Flow.Publisher<T> {
    private final Queue<T> queue;
    private final SingleSubscriberHolder<T> subscriber;
    private final RequestedCounter requested;
    private final AtomicBoolean publishing;

    FixedItemsPublisher(Collection<T> items) {
        this.queue = new LinkedList<T>(items);
        this.subscriber = new SingleSubscriberHolder();
        this.requested = new RequestedCounter();
        this.publishing = new AtomicBoolean(false);
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> s) {
        if (this.subscriber.register(s)) {
            this.publishing.set(true);
            try {
                s.onSubscribe(new Flow.Subscription(){

                    @Override
                    public void request(long n) {
                        FixedItemsPublisher.this.requested.increment(n, t -> FixedItemsPublisher.this.tryComplete(t));
                        FixedItemsPublisher.this.tryPublish();
                    }

                    @Override
                    public void cancel() {
                        FixedItemsPublisher.this.subscriber.cancel();
                    }
                });
            }
            finally {
                this.publishing.set(false);
            }
            this.tryPublish();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryPublish() {
        boolean immediateRetry = true;
        while (immediateRetry) {
            immediateRetry = false;
            if (this.subscriber.isClosed() || this.requested.get() <= 0L || !this.publishing.compareAndSet(false, true)) continue;
            try {
                Flow.Subscriber<T> sub = this.subscriber.get();
                while (!this.subscriber.isClosed() && this.requested.tryDecrement() && !this.queue.isEmpty()) {
                    T item = this.queue.poll();
                    if (item == null) continue;
                    sub.onNext(item);
                }
                if (!this.queue.isEmpty()) continue;
                this.tryComplete();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.tryComplete(e);
            }
            catch (Exception e) {
                this.tryComplete(e);
            }
            finally {
                this.publishing.set(false);
            }
        }
    }

    private void tryComplete() {
        this.subscriber.close(Flow.Subscriber::onComplete);
    }

    private void tryComplete(Throwable t) {
        this.subscriber.close(sub -> sub.onError(t));
    }
}

