/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream;

import io.activej.common.Checks;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.eventloop.Eventloop;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public abstract class AbstractStreamConsumer<T>
implements StreamConsumer<T> {
    private static final boolean CHECK = Checks.isEnabled(AbstractStreamConsumer.class);
    private StreamSupplier<T> supplier;
    private final SettablePromise<Void> acknowledgement = new SettablePromise();
    private boolean endOfStream;
    private boolean initialized;
    @Nullable
    private StreamDataAcceptor<T> dataAcceptor;
    protected final Eventloop eventloop = Eventloop.getCurrentEventloop();

    public AbstractStreamConsumer() {
        if (this.eventloop.inEventloopThread()) {
            this.eventloop.post(this::ensureInitialized);
        } else {
            this.eventloop.execute(this::ensureInitialized);
        }
    }

    @Override
    public final void consume(@NotNull StreamSupplier<T> streamSupplier) {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        Checks.checkState((!this.isStarted() ? 1 : 0) != 0);
        this.ensureInitialized();
        if (this.acknowledgement.isComplete()) {
            return;
        }
        this.supplier = streamSupplier;
        if (!streamSupplier.isException()) {
            this.onStarted();
        }
        streamSupplier.getEndOfStream().whenResult(this::endOfStream).whenException(this::closeEx);
    }

    @Override
    @Nullable
    public final StreamDataAcceptor<T> getDataAcceptor() {
        return this.dataAcceptor;
    }

    protected void onInit() {
    }

    protected void onStarted() {
    }

    public final boolean isStarted() {
        return this.supplier != null;
    }

    public final StreamSupplier<T> getSupplier() {
        return this.supplier;
    }

    private void endOfStream() {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        if (this.endOfStream) {
            return;
        }
        this.endOfStream = true;
        this.onEndOfStream();
    }

    protected void onEndOfStream() {
    }

    public final void resume(@Nullable StreamDataAcceptor<T> dataAcceptor) {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        if (this.endOfStream) {
            return;
        }
        if (this.dataAcceptor == dataAcceptor) {
            return;
        }
        this.dataAcceptor = dataAcceptor;
        if (!this.isStarted()) {
            return;
        }
        this.supplier.updateDataAcceptor();
    }

    public final void suspend() {
        this.resume(null);
    }

    public final void acknowledge() {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        this.ensureInitialized();
        this.endOfStream = true;
        if (this.acknowledgement.trySet(null)) {
            this.cleanup();
        }
    }

    @Override
    public final Promise<Void> getAcknowledgement() {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        return this.acknowledgement;
    }

    public final boolean isEndOfStream() {
        return this.endOfStream;
    }

    public final void closeEx(@NotNull Exception e) {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        this.ensureInitialized();
        this.endOfStream = true;
        if (this.acknowledgement.trySetException(e)) {
            this.onError(e);
            this.cleanup();
        }
    }

    protected void onError(Exception e) {
    }

    private void ensureInitialized() {
        if (!this.initialized) {
            this.initialized = true;
            this.onInit();
        }
    }

    private void cleanup() {
        this.onComplete();
        this.eventloop.post(this::onCleanup);
        this.acknowledgement.resetCallbacks();
        this.dataAcceptor = null;
    }

    protected void onComplete() {
    }

    protected void onCleanup() {
    }
}

