/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream;

import io.activej.common.Checks;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.eventloop.Eventloop;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.util.ArrayDeque;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public abstract class AbstractStreamSupplier<T>
implements StreamSupplier<T> {
    private static final boolean CHECK = Checks.isEnabled(AbstractStreamSupplier.class);
    public static final StreamDataAcceptor<?> NO_ACCEPTOR = item -> {};
    @Nullable
    private StreamDataAcceptor<T> dataAcceptor;
    private StreamDataAcceptor<T> dataAcceptorBuffered;
    private final ArrayDeque<T> buffer = new ArrayDeque();
    private StreamConsumer<T> consumer;
    private boolean flushRequest;
    private boolean flushRunning;
    private boolean initialized;
    private int flushAsync;
    private boolean endOfStreamRequest;
    private final SettablePromise<Void> endOfStream = new SettablePromise();
    private final SettablePromise<Void> acknowledgement = new SettablePromise();
    @Nullable
    private SettablePromise<Void> flushPromise;
    protected final Eventloop eventloop = Eventloop.getCurrentEventloop();

    public AbstractStreamSupplier() {
        this.dataAcceptorBuffered = this.buffer::addLast;
        if (this.eventloop.inEventloopThread()) {
            this.eventloop.post(this::ensureInitialized);
        } else {
            this.eventloop.execute(this::ensureInitialized);
        }
    }

    @Override
    public final Promise<Void> streamTo(@NotNull StreamConsumer<T> consumer) {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        Checks.checkState((!this.isStarted() ? 1 : 0) != 0);
        this.ensureInitialized();
        this.consumer = consumer;
        consumer.getAcknowledgement().whenResult(this::acknowledge).whenException(this::closeEx);
        if (!this.isEndOfStream()) {
            this.onStarted();
        }
        this.dataAcceptor = NO_ACCEPTOR;
        consumer.consume(this);
        this.updateDataAcceptor();
        return this.acknowledgement;
    }

    protected void onInit() {
    }

    protected void onStarted() {
    }

    public final boolean isStarted() {
        return this.consumer != null;
    }

    public final StreamConsumer<T> getConsumer() {
        return this.consumer;
    }

    @Override
    public final void updateDataAcceptor() {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        if (!this.isStarted()) {
            return;
        }
        if (this.endOfStream.isComplete()) {
            return;
        }
        StreamDataAcceptor<T> dataAcceptor = this.consumer.getDataAcceptor();
        if (this.dataAcceptor == dataAcceptor) {
            return;
        }
        this.dataAcceptor = dataAcceptor;
        if (dataAcceptor != null) {
            if (!this.isEndOfStream()) {
                this.dataAcceptorBuffered = dataAcceptor;
            }
            this.flush();
        } else if (!this.isEndOfStream()) {
            this.dataAcceptorBuffered = this.buffer::addLast;
            this.onSuspended();
        }
    }

    protected final void asyncBegin() {
        ++this.flushAsync;
    }

    protected final void asyncEnd() {
        Checks.checkState((this.flushAsync > 0 ? 1 : 0) != 0);
        --this.flushAsync;
    }

    protected final void asyncResume() {
        Checks.checkState((this.flushAsync > 0 ? 1 : 0) != 0);
        --this.flushAsync;
        this.resume();
    }

    protected final void resume() {
        if (this.flushRunning) {
            this.flushRequest = true;
        } else if (this.isReady() && !this.isEndOfStream()) {
            this.onResumed();
        }
    }

    public final void send(T item) {
        this.dataAcceptorBuffered.accept(item);
    }

    public final Promise<Void> sendEndOfStream() {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        if (this.endOfStreamRequest) {
            return this.flushPromise;
        }
        if (this.flushAsync > 0) {
            this.asyncEnd();
        }
        this.endOfStreamRequest = true;
        this.dataAcceptorBuffered = NO_ACCEPTOR;
        this.flush();
        return this.getFlushPromise();
    }

    @NotNull
    public final Promise<Void> getFlushPromise() {
        if (this.isEndOfStream()) {
            return this.endOfStream;
        }
        if (this.flushPromise != null) {
            return this.flushPromise;
        }
        if (this.dataAcceptor != null) {
            return Promise.complete();
        }
        this.flushPromise = new SettablePromise();
        return this.flushPromise;
    }

    private void ensureInitialized() {
        if (!this.initialized) {
            this.initialized = true;
            this.onInit();
        }
    }

    private void flush() {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        this.flushRequest = true;
        if (this.flushRunning || this.flushAsync > 0) {
            return;
        }
        if (this.endOfStream.isComplete()) {
            return;
        }
        if (!this.isStarted()) {
            return;
        }
        this.flushRunning = true;
        while (this.flushRequest) {
            this.flushRequest = false;
            while (this.isReady() && !this.buffer.isEmpty()) {
                T item = this.buffer.pollFirst();
                this.dataAcceptor.accept(item);
            }
            if (!this.isReady() || this.isEndOfStream()) continue;
            this.onResumed();
        }
        this.flushRunning = false;
        if (this.flushAsync > 0) {
            return;
        }
        if (!this.buffer.isEmpty()) {
            return;
        }
        if (this.endOfStream.isComplete()) {
            return;
        }
        if (!this.endOfStreamRequest) {
            if (this.flushPromise != null) {
                SettablePromise<Void> flushPromise = this.flushPromise;
                this.flushPromise = null;
                flushPromise.set(null);
            }
            return;
        }
        this.dataAcceptor = null;
        if (this.flushPromise != null) {
            this.flushPromise.set(null);
        }
        this.endOfStream.set(null);
    }

    protected void onResumed() {
    }

    protected void onSuspended() {
    }

    @Nullable
    public final StreamDataAcceptor<T> getDataAcceptor() {
        return this.dataAcceptor;
    }

    @NotNull
    public final StreamDataAcceptor<T> getBufferedDataAcceptor() {
        return this.dataAcceptorBuffered;
    }

    public final boolean isReady() {
        return this.dataAcceptor != null;
    }

    @Override
    public final Promise<Void> getEndOfStream() {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        return this.endOfStream;
    }

    public final boolean isEndOfStream() {
        return this.endOfStreamRequest;
    }

    private void acknowledge() {
        this.ensureInitialized();
        if (this.acknowledgement.trySet(null)) {
            this.onAcknowledge();
            this.close();
            this.cleanup();
        }
    }

    protected void onAcknowledge() {
    }

    @Override
    public final Promise<Void> getAcknowledgement() {
        return this.acknowledgement;
    }

    public final void closeEx(@NotNull Exception e) {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        this.ensureInitialized();
        this.endOfStreamRequest = true;
        this.dataAcceptor = null;
        this.dataAcceptorBuffered = NO_ACCEPTOR;
        if (this.flushPromise != null) {
            this.flushPromise.trySetException(e);
        }
        this.endOfStream.trySetException(e);
        if (this.acknowledgement.trySetException(e)) {
            this.onError(e);
            this.cleanup();
        }
    }

    protected void onError(Exception e) {
    }

    private void cleanup() {
        this.onComplete();
        this.eventloop.post(this::onCleanup);
        this.buffer.clear();
        if (this.flushPromise != null) {
            this.flushPromise.resetCallbacks();
        }
    }

    protected void onComplete() {
    }

    protected void onCleanup() {
    }
}

