/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.Flow;
import io.helidon.common.reactive.RequestedCounter;
import io.helidon.common.reactive.SingleSubscriberHolder;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

abstract class BaseProcessor<T, U>
implements Flow.Processor<T, U>,
Flow.Subscription {
    private Flow.Subscription subscription;
    private final SingleSubscriberHolder<U> subscriber;
    private final RequestedCounter requested = new RequestedCounter();
    private final AtomicBoolean ready = new AtomicBoolean();
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private volatile boolean done;
    private Throwable error;

    BaseProcessor() {
        this.subscriber = new SingleSubscriberHolder();
    }

    @Override
    public final void request(long n) {
        this.requested.increment(n, ex -> this.onError((Throwable)ex));
        this.tryRequest(this.subscription);
        if (this.done) {
            this.tryComplete();
        }
    }

    @Override
    public final void cancel() {
        this.subscriber.cancel();
    }

    @Override
    public final void onSubscribe(Flow.Subscription s) {
        if (this.subscription == null) {
            this.subscription = s;
            this.tryRequest(s);
        }
    }

    @Override
    public final void onNext(T item) {
        if (this.subscriber.isClosed()) {
            throw new IllegalStateException("Subscriber is closed!");
        }
        try {
            this.hookOnNext(item);
        }
        catch (Throwable ex) {
            this.onError(ex);
        }
    }

    @Override
    public final void onError(Throwable ex) {
        this.done = true;
        if (this.error == null) {
            this.error = ex;
        }
        this.tryComplete();
    }

    @Override
    public final void onComplete() {
        this.done = true;
        this.tryComplete();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super U> s) {
        if (this.subscriber.register(s)) {
            this.ready.set(true);
            s.onSubscribe(this);
            if (this.done) {
                this.tryComplete();
            }
        }
    }

    protected void submit(U item) {
        if (this.requested.tryDecrement()) {
            try {
                this.subscriber.get().onNext(item);
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                this.onError(ex);
            }
            catch (ExecutionException ex) {
                this.onError(ex);
            }
            catch (Throwable ex) {
                this.onError(ex);
            }
        } else {
            this.onError(new IllegalStateException("Not enough request to submit item"));
        }
    }

    protected void hookOnNext(T item) {
    }

    protected void hookOnError(Throwable error) {
    }

    protected void hookOnComplete() {
    }

    protected final void doSubscribe(Flow.Publisher<U> delegate) {
        if (this.subscribed.compareAndSet(false, true)) {
            delegate.subscribe(new Flow.Subscriber<U>(){

                @Override
                public void onSubscribe(Flow.Subscription subscription) {
                    BaseProcessor.this.tryRequest(subscription);
                }

                @Override
                public void onNext(U item) {
                    BaseProcessor.this.submit(item);
                }

                @Override
                public void onError(Throwable ex) {
                    BaseProcessor.this.onError(ex);
                }

                @Override
                public void onComplete() {
                    BaseProcessor.this.onComplete();
                }
            });
        }
    }

    private void completeOnError(Flow.Subscriber<? super U> sub, Throwable ex) {
        this.hookOnError(ex);
        sub.onError(ex);
    }

    private void tryComplete() {
        if (this.ready.get() && !this.subscriber.isClosed()) {
            if (this.error != null) {
                this.subscriber.close(sub -> this.completeOnError((Flow.Subscriber<? super U>)sub, this.error));
            } else {
                try {
                    this.hookOnComplete();
                }
                catch (Throwable ex) {
                    this.subscriber.close(sub -> this.completeOnError((Flow.Subscriber<? super U>)sub, ex));
                    return;
                }
                this.subscriber.close(Flow.Subscriber::onComplete);
            }
        }
    }

    private void tryRequest(Flow.Subscription s) {
        long n;
        if (s != null && !this.subscriber.isClosed() && (n = this.requested.get()) > 0L) {
            s.request(n);
        }
    }
}

