/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.Flow;
import io.helidon.common.reactive.UnboundedSemaphore;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;

public abstract class OriginThreadPublisher<T, U>
implements Flow.Publisher<T> {
    private static final Logger LOGGER = Logger.getLogger(OriginThreadPublisher.class.getName());
    private final UnboundedSemaphore semaphore;
    private final AtomicBoolean hasSingleSubscriber = new AtomicBoolean(false);
    private final Lock reentrantLock = new ReentrantLock();
    private volatile Flow.Subscriber<? super T> singleSubscriber;
    private volatile boolean completed;
    private volatile Throwable t;
    private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(256);
    private final AtomicLong nextCount = new AtomicLong();
    private volatile long reqCount = 0L;

    protected OriginThreadPublisher(UnboundedSemaphore semaphore) {
        this.semaphore = semaphore;
    }

    protected OriginThreadPublisher() {
        this(UnboundedSemaphore.create());
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> originalSubscriber) {
        if (!this.hasSingleSubscriber.compareAndSet(false, true)) {
            originalSubscriber.onError(new IllegalStateException("Only single subscriber is allowed!"));
            return;
        }
        this.singleSubscriber = originalSubscriber;
        this.reentrantLock.lock();
        try {
            originalSubscriber.onSubscribe(new Flow.Subscription(){
                private boolean nexting;

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void request(long n) {
                    if (n <= 0L) {
                        OriginThreadPublisher.this.error(new IllegalArgumentException("Illegal value requested: " + n));
                        return;
                    }
                    try {
                        OriginThreadPublisher.this.reentrantLock.lock();
                        OriginThreadPublisher.this.reqCount += n;
                        long release = n;
                        if (this.nexting) {
                            return;
                        }
                        while (OriginThreadPublisher.this.singleSubscriber != null && !OriginThreadPublisher.this.queue.isEmpty() && OriginThreadPublisher.this.reqCount > OriginThreadPublisher.this.nextCount.get()) {
                            OriginThreadPublisher.this.nextCount.incrementAndGet();
                            try {
                                this.nexting = true;
                                --release;
                                Object item = OriginThreadPublisher.this.queue.remove();
                                LOGGER.finest(() -> "Publishing item: " + item);
                                OriginThreadPublisher.this.singleSubscriber.onNext(item);
                            }
                            finally {
                                this.nexting = false;
                            }
                        }
                        if (OriginThreadPublisher.this.singleSubscriber == null) {
                            return;
                        }
                        if (OriginThreadPublisher.this.t != null) {
                            LOGGER.finest("Completing with an error from request.");
                            OriginThreadPublisher.this.singleSubscriber.onError(OriginThreadPublisher.this.t);
                        } else if (OriginThreadPublisher.this.completed && OriginThreadPublisher.this.queue.isEmpty()) {
                            LOGGER.finest("Completing from request.");
                            OriginThreadPublisher.this.singleSubscriber.onComplete();
                        } else if (OriginThreadPublisher.this.queue.isEmpty()) {
                            long released = n == Long.MAX_VALUE ? Long.MAX_VALUE : release;
                            long result = OriginThreadPublisher.this.semaphore.release(released);
                            LOGGER.finest(() -> "Semaphore released: " + result);
                            OriginThreadPublisher.this.hookOnRequested(released, result);
                        }
                    }
                    finally {
                        OriginThreadPublisher.this.reentrantLock.unlock();
                    }
                }

                @Override
                public void cancel() {
                    OriginThreadPublisher.this.hookOnCancel();
                    OriginThreadPublisher.this.singleSubscriber = null;
                }
            });
        }
        finally {
            this.reentrantLock.unlock();
        }
    }

    protected T wrap(U data) {
        return (T)data;
    }

    public void submit(U data) {
        try {
            this.reentrantLock.lock();
            if (!this.queue.offer(this.wrap(data))) {
                LOGGER.severe("Unable to add an element to the publisher cache.");
                this.error(new IllegalStateException("Unable to add an element to the publisher cache."));
                return;
            }
            if (this.nextCount.get() < this.reqCount) {
                this.nextCount.incrementAndGet();
                Object item = this.queue.poll();
                LOGGER.finest(() -> "Publishing item: " + (null == item ? "null" : item));
                this.singleSubscriber.onNext(item);
            } else {
                LOGGER.finest(() -> "Not publishing due to low request count: " + this.nextCount + " <= " + this.reqCount);
            }
        }
        catch (RuntimeException e) {
            if (this.singleSubscriber == null) {
                this.t = e;
            } else {
                this.error(new IllegalStateException("An error occurred when submitting data.", e));
            }
        }
        finally {
            this.reentrantLock.unlock();
        }
    }

    public void drain() {
        if (!(this.hasSingleSubscriber.get() || this.completed && this.queue.isEmpty())) {
            LOGGER.fine(() -> "No one registered to consumer request");
            this.subscribe(new Flow.Subscriber<T>(){

                @Override
                public void onSubscribe(Flow.Subscription subscription) {
                    subscription.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(T item) {
                    OriginThreadPublisher.this.drain(item);
                }

                @Override
                public void onError(Throwable throwable) {
                }

                @Override
                public void onComplete() {
                }
            });
        }
    }

    protected void drain(T item) {
    }

    public void error(Throwable throwable) {
        try {
            this.reentrantLock.lock();
            if (this.singleSubscriber != null && this.queue.isEmpty()) {
                this.singleSubscriber.onError(throwable);
                this.singleSubscriber = null;
            } else {
                this.t = throwable;
            }
        }
        catch (RuntimeException e) {
            throw new IllegalStateException("On error threw an exception!", e);
        }
        finally {
            this.reentrantLock.unlock();
        }
    }

    public void complete() {
        try {
            this.reentrantLock.lock();
            this.completed = true;
            if (this.singleSubscriber != null && this.queue.isEmpty()) {
                LOGGER.finest("Completing by the producing thread.");
                this.singleSubscriber.onComplete();
                this.singleSubscriber = null;
            } else {
                LOGGER.finest("Not completing by the producing thread.");
            }
        }
        finally {
            this.reentrantLock.unlock();
        }
    }

    public long tryAcquire() {
        return this.semaphore.tryAcquire();
    }

    public boolean isCompleted() {
        return this.completed;
    }

    public boolean requiresMoreItems() {
        return this.reqCount - (this.nextCount.get() + (long)this.queue.size()) > 0L;
    }

    protected void hookOnRequested(long n, long result) {
    }

    protected void hookOnCancel() {
    }
}

