/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.impl.FlowControlledBuffer;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.grpc.stub.StreamObserver;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public abstract class AbstractBufferedStream<T, R>
extends FlowControlledBuffer<T, R>
implements ResultStream<T> {
    private static final Runnable NO_OP = () -> {};
    private final AtomicReference<Runnable> onAvailableCallback = new AtomicReference<Runnable>(NO_OP);
    private final Queue<Runnable> closeHandlers = new ConcurrentLinkedQueue<Runnable>();
    private final AtomicBoolean closeRequested = new AtomicBoolean();
    private final AtomicBoolean clientClosed = new AtomicBoolean();

    public AbstractBufferedStream(String clientId, int bufferSize, int refillBatch) {
        super(clientId, bufferSize, refillBatch);
    }

    @Override
    public T next() throws InterruptedException {
        return this.take();
    }

    @Override
    public T nextIfAvailable() {
        return this.tryTakeNow();
    }

    @Override
    public T nextIfAvailable(long timeout, TimeUnit unit) throws InterruptedException {
        return this.tryTake(timeout, unit);
    }

    @Override
    public void onNext(T value) {
        super.onNext(value);
        this.onAvailableCallback.get().run();
    }

    @Override
    public void onError(Throwable t) {
        super.onError(t);
        this.onAvailableCallback.get().run();
        this.closeRequested.set(true);
        this.invokeCloseRequestHandlers();
    }

    @Override
    public Optional<Throwable> getError() {
        return Optional.ofNullable(super.getErrorResult());
    }

    @Override
    public void onCompleted() {
        super.onCompleted();
        this.onAvailableCallback.get().run();
        this.closeRequested.set(true);
        this.invokeCloseRequestHandlers();
    }

    @Override
    public T peek() {
        return super.peek();
    }

    @Override
    public void onAvailable(Runnable callback) {
        if (callback == null) {
            this.onAvailableCallback.set(NO_OP);
        } else {
            this.onAvailableCallback.set(callback);
            if (this.isClosed() || this.peek() != null) {
                callback.run();
            }
        }
    }

    @Override
    public boolean isClosed() {
        return super.isClosed();
    }

    @Override
    public void close() {
        if (!this.clientClosed.getAndSet(true)) {
            super.close();
            ObjectUtils.doIfNotNull(this.outboundStream(), StreamObserver::onCompleted);
        }
    }

    public void onCloseRequested(Runnable handler) {
        this.closeHandlers.add(handler);
        if (this.closeRequested.get()) {
            this.invokeCloseRequestHandlers();
        }
    }

    private void invokeCloseRequestHandlers() {
        Runnable closeHandler;
        do {
            closeHandler = this.closeHandlers.poll();
            ObjectUtils.doIfNotNull(closeHandler, Runnable::run);
        } while (closeHandler != null);
    }
}

