/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream.processor;

import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.processor.StreamTransformer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.jetbrains.annotations.NotNull;

public abstract class StreamFilter<I, O>
implements StreamTransformer<I, O> {
    private final Input input = new Input();
    private final Output output = new Output();

    protected StreamFilter() {
        this.input.getAcknowledgement().whenException(this.output::closeEx);
        this.output.getAcknowledgement().whenResult(this.input::acknowledge).whenException(this.input::closeEx);
    }

    public static <T> StreamFilter<T, T> create(final Predicate<T> predicate) {
        return new StreamFilter<T, T>(){

            @Override
            @NotNull
            protected StreamDataAcceptor<T> onResumed(@NotNull StreamDataAcceptor<T> output) {
                return item -> {
                    if (predicate.test(item)) {
                        output.accept(item);
                    }
                };
            }
        };
    }

    public static <I, O> StreamFilter<I, O> mapper(final Function<I, O> function) {
        return new StreamFilter<I, O>(){

            @Override
            @NotNull
            protected StreamDataAcceptor<I> onResumed(@NotNull StreamDataAcceptor<O> output) {
                return item -> output.accept(function.apply(item));
            }
        };
    }

    @Override
    public StreamConsumer<I> getInput() {
        return this.input;
    }

    @Override
    public StreamSupplier<O> getOutput() {
        return this.output;
    }

    private void sync() {
        StreamDataAcceptor dataAcceptor = this.output.getDataAcceptor();
        if (dataAcceptor != null) {
            this.input.resume(this.onResumed(this.isOneToMany() ? this.output.getBufferedDataAcceptor() : dataAcceptor));
        } else {
            this.input.suspend();
        }
    }

    protected boolean isOneToMany() {
        return false;
    }

    protected void onStarted(@NotNull StreamDataAcceptor<O> output) {
    }

    protected void onEndOfStream(@NotNull StreamDataAcceptor<O> output) {
    }

    @NotNull
    protected abstract StreamDataAcceptor<I> onResumed(@NotNull StreamDataAcceptor<O> var1);

    private final class Output
    extends AbstractStreamSupplier<O> {
        private Output() {
        }

        @Override
        protected void onResumed() {
            StreamFilter.this.sync();
        }

        @Override
        protected void onSuspended() {
            StreamFilter.this.sync();
        }
    }

    private final class Input
    extends AbstractStreamConsumer<I> {
        private Input() {
        }

        @Override
        protected void onStarted() {
            StreamFilter.this.onStarted(StreamFilter.this.output::send);
            StreamFilter.this.sync();
        }

        @Override
        protected void onEndOfStream() {
            StreamFilter.this.onEndOfStream(StreamFilter.this.output::send);
            StreamFilter.this.output.sendEndOfStream();
        }
    }
}

