/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream;

import io.activej.async.function.AsyncConsumer;
import io.activej.async.process.AsyncCloseable;
import io.activej.common.function.ConsumerEx;
import io.activej.csp.ChannelConsumer;
import io.activej.datastream.ForwardingStreamConsumer;
import io.activej.datastream.StreamConsumers;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.processor.StreamConsumerTransformer;
import io.activej.datastream.processor.StreamTransformer;
import io.activej.eventloop.Eventloop;
import io.activej.promise.Promise;
import java.util.function.UnaryOperator;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public interface StreamConsumer<T>
extends AsyncCloseable {
    public void consume(@NotNull StreamSupplier<T> var1);

    @Nullable
    public StreamDataAcceptor<T> getDataAcceptor();

    public Promise<Void> getAcknowledgement();

    default public boolean isComplete() {
        return this.getAcknowledgement().isComplete();
    }

    default public boolean isResult() {
        return this.getAcknowledgement().isResult();
    }

    default public boolean isException() {
        return this.getAcknowledgement().isException();
    }

    public static <T> StreamConsumer<T> idle() {
        return new StreamConsumers.Idle();
    }

    public static <T> StreamConsumer<T> skip() {
        return new StreamConsumers.Skip();
    }

    public static <T> StreamConsumer<T> ofConsumer(ConsumerEx<T> consumer) {
        return new StreamConsumers.OfConsumer<T>(consumer);
    }

    public static <T> StreamConsumer<T> closingWithError(Exception e) {
        return new StreamConsumers.ClosingWithError(e);
    }

    public static <T> StreamConsumer<T> ofPromise(Promise<? extends StreamConsumer<T>> promise) {
        if (promise.isResult()) {
            return (StreamConsumer)promise.getResult();
        }
        return new StreamConsumers.OfPromise(promise);
    }

    public static <T> StreamConsumer<T> ofChannelConsumer(ChannelConsumer<T> consumer) {
        return new StreamConsumers.OfChannelConsumer<T>(consumer);
    }

    public static <T> StreamConsumer<T> ofSupplier(AsyncConsumer<StreamSupplier<T>> supplier) {
        StreamTransformer forwarder = StreamTransformer.identity();
        Promise extraAcknowledge = supplier.accept(forwarder.getOutput());
        StreamConsumer result = forwarder.getInput();
        if (extraAcknowledge == Promise.complete()) {
            return result;
        }
        return result.withAcknowledgement(ack -> ack.both(extraAcknowledge));
    }

    public static <T> StreamConsumer<T> ofAnotherEventloop(@NotNull Eventloop anotherEventloop, @NotNull StreamConsumer<T> anotherEventloopConsumer) {
        if (Eventloop.getCurrentEventloop() == anotherEventloop) {
            return anotherEventloopConsumer;
        }
        return new StreamConsumers.OfAnotherEventloop<T>(anotherEventloop, anotherEventloopConsumer);
    }

    default public <R> R transformWith(StreamConsumerTransformer<T, R> fn) {
        return fn.transform(this);
    }

    default public StreamConsumer<T> withAcknowledgement(UnaryOperator<Promise<Void>> fn) {
        Promise newAcknowledgement;
        Promise<Void> acknowledgement = this.getAcknowledgement();
        if (acknowledgement == (newAcknowledgement = (Promise)fn.apply(acknowledgement))) {
            return this;
        }
        return new ForwardingStreamConsumer<T>(this){

            @Override
            public Promise<Void> getAcknowledgement() {
                return newAcknowledgement;
            }
        };
    }
}

