/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.BiConsumerChain;
import io.helidon.common.reactive.ConsumerChain;
import io.helidon.common.reactive.EmittingPublisher;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class BufferedEmittingPublisher<T>
implements Flow.Publisher<T> {
    private final AtomicReference<State> state = new AtomicReference<State>(State.READY_TO_EMIT);
    private final ConcurrentLinkedQueue<T> buffer = new ConcurrentLinkedQueue();
    private final EmittingPublisher<T> emitter = new EmittingPublisher();
    private final AtomicLong deferredDrains = new AtomicLong(0L);
    private final AtomicBoolean draining = new AtomicBoolean(false);
    private final AtomicReference<Throwable> error = new AtomicReference();
    private BiConsumer<Long, Long> requestCallback = null;
    private Consumer<? super T> onEmitCallback = null;
    private boolean safeToSkipBuffer = false;

    protected BufferedEmittingPublisher() {
    }

    public static <T> BufferedEmittingPublisher<T> create() {
        return new BufferedEmittingPublisher<T>();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        this.emitter.onSubscribe(() -> this.state.get().drain(this));
        this.emitter.onRequest((n, cnt) -> {
            if (this.requestCallback != null) {
                this.requestCallback.accept((Long)n, (Long)cnt);
            }
            this.state.get().drain(this);
        });
        this.emitter.onCancel(() -> this.state.compareAndSet(State.READY_TO_EMIT, State.CANCELLED));
        this.emitter.subscribe(subscriber);
    }

    public void onRequest(BiConsumer<Long, Long> requestCallback) {
        this.requestCallback = this.requestCallback == null ? requestCallback : BiConsumerChain.combine(this.requestCallback, requestCallback);
    }

    public void onEmit(Consumer<T> onEmitCallback) {
        this.onEmitCallback = this.onEmitCallback == null ? onEmitCallback : ConsumerChain.combine(this.onEmitCallback, onEmitCallback);
    }

    public int emit(T item) {
        return this.state.get().emit(this, item);
    }

    public void fail(Throwable throwable) {
        this.error.set(throwable);
        if (this.state.compareAndSet(State.READY_TO_EMIT, State.FAILED)) {
            this.emitter.fail(throwable);
        }
    }

    public void complete() {
        if (this.state.compareAndSet(State.READY_TO_EMIT, State.COMPLETING)) {
            State.READY_TO_EMIT.drain(this);
        }
    }

    public void completeNow() {
        if (this.state.compareAndSet(State.READY_TO_EMIT, State.COMPLETED)) {
            this.emitter.complete();
        }
    }

    public void clearBuffer(Consumer<T> consumer) {
        while (!this.buffer.isEmpty()) {
            consumer.accept(this.buffer.poll());
        }
    }

    public boolean isUnbounded() {
        return this.emitter.isUnbounded();
    }

    public boolean hasRequests() {
        return this.emitter.hasRequests();
    }

    public boolean isCompleted() {
        return this.state.get() == State.COMPLETED;
    }

    public boolean isCancelled() {
        return this.state.get() == State.CANCELLED;
    }

    public int bufferSize() {
        return this.buffer.size();
    }

    private void drainBuffer() {
        long drains;
        this.deferredDrains.incrementAndGet();
        do {
            if (this.draining.getAndSet(true)) {
                return;
            }
            drains = this.deferredDrains.getAndUpdate(d -> d == 0L ? 0L : d - 1L);
            if (drains > 0L) {
                this.actualDrain();
                --drains;
            }
            this.draining.set(false);
        } while (drains < this.deferredDrains.get());
    }

    private void actualDrain() {
        while (!this.buffer.isEmpty() && this.emitter.emit(this.buffer.peek())) {
            if (this.onEmitCallback != null) {
                this.onEmitCallback.accept(this.buffer.poll());
                continue;
            }
            this.buffer.poll();
        }
        if (this.buffer.isEmpty() && this.state.compareAndSet(State.COMPLETING, State.COMPLETED)) {
            this.emitter.complete();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private int emitOrBuffer(T item) {
        BufferedEmittingPublisher bufferedEmittingPublisher = this;
        synchronized (bufferedEmittingPublisher) {
            try {
                if (this.buffer.isEmpty() && this.emitter.emit(item)) {
                    if (this.onEmitCallback != null) {
                        this.onEmitCallback.accept(item);
                    }
                    int n2 = 0;
                    return n2;
                }
                this.buffer.add(item);
                this.state.get().drain(this);
                int n = this.buffer.size();
                return n;
            }
            finally {
                if (!this.safeToSkipBuffer && this.isUnbounded() && this.buffer.isEmpty()) {
                    this.safeToSkipBuffer = true;
                }
            }
        }
    }

    private int unboundedEmitOrBuffer(T item) {
        if (this.emitter.emit(item)) {
            if (this.onEmitCallback != null) {
                this.onEmitCallback.accept(item);
            }
            return 0;
        }
        this.buffer.add(item);
        return this.buffer.size();
    }

    private static enum State {
        READY_TO_EMIT{

            @Override
            <T> int emit(BufferedEmittingPublisher<T> publisher, T item) {
                if (publisher.safeToSkipBuffer) {
                    return publisher.unboundedEmitOrBuffer(item);
                }
                return publisher.emitOrBuffer(item);
            }

            @Override
            <T> void drain(BufferedEmittingPublisher<T> publisher) {
                publisher.drainBuffer();
            }
        }
        ,
        CANCELLED{

            @Override
            <T> int emit(BufferedEmittingPublisher<T> publisher, T item) {
                throw new IllegalStateException("Emitter is cancelled!");
            }

            @Override
            <T> void drain(BufferedEmittingPublisher<T> publisher) {
            }
        }
        ,
        FAILED{

            @Override
            <T> int emit(BufferedEmittingPublisher<T> publisher, T item) {
                throw new IllegalStateException("Emitter is in failed state!");
            }

            @Override
            <T> void drain(BufferedEmittingPublisher<T> publisher) {
                publisher.emitter.fail(publisher.error.get());
            }
        }
        ,
        COMPLETING{

            @Override
            <T> int emit(BufferedEmittingPublisher<T> publisher, T item) {
                throw new IllegalStateException("Emitter is completing!");
            }

            @Override
            <T> void drain(BufferedEmittingPublisher<T> publisher) {
                READY_TO_EMIT.drain(publisher);
            }
        }
        ,
        COMPLETED{

            @Override
            <T> int emit(BufferedEmittingPublisher<T> publisher, T item) {
                throw new IllegalStateException("Emitter is completed!");
            }

            @Override
            <T> void drain(BufferedEmittingPublisher<T> publisher) {
                publisher.emitter.complete();
            }
        };


        abstract <T> int emit(BufferedEmittingPublisher<T> var1, T var2);

        abstract <T> void drain(BufferedEmittingPublisher<T> var1);
    }
}

