/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream.processor;

import io.activej.common.Checks;
import io.activej.common.initializer.WithInitializer;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.dsl.HasStreamInput;
import io.activej.datastream.dsl.HasStreamOutputs;
import io.activej.eventloop.Eventloop;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;

public final class StreamSplitter<I, O>
implements HasStreamInput<I>,
HasStreamOutputs<O>,
WithInitializer<StreamSplitter<I, O>> {
    private final Function<StreamDataAcceptor<O>[], StreamDataAcceptor<I>> acceptorFactory;
    private final Input input;
    private final List<Output> outputs = new ArrayList<Output>();
    private StreamDataAcceptor<O>[] dataAcceptors = new StreamDataAcceptor[8];
    private boolean started;
    private int completed;

    private StreamSplitter(Function<StreamDataAcceptor<O>[], StreamDataAcceptor<I>> acceptorFactory) {
        this.acceptorFactory = acceptorFactory;
        this.input = new Input();
    }

    public static <I, O> StreamSplitter<I, O> create(BiConsumer<I, StreamDataAcceptor<O>[]> action) {
        return StreamSplitter.create((StreamDataAcceptor<O>[] acceptors) -> item -> action.accept((Object)item, (StreamDataAcceptor<O>[])acceptors));
    }

    public static <I, O> StreamSplitter<I, O> create(Function<StreamDataAcceptor<O>[], StreamDataAcceptor<I>> acceptorFactory) {
        StreamSplitter<I, O> streamSplitter = new StreamSplitter<I, O>(acceptorFactory);
        Eventloop.getCurrentEventloop().post(streamSplitter::start);
        return streamSplitter;
    }

    public StreamSupplier<O> newOutput() {
        return this.addOutput(new Output());
    }

    public StreamSupplier<O> addOutput(Output output) {
        Checks.checkState((!this.started ? 1 : 0) != 0, (Object)"Cannot add new outputs after StreamSplitter has been started");
        Checks.checkState((output.index == this.outputs.size() ? 1 : 0) != 0, (Object)"Invalid index");
        this.outputs.add(output);
        if (this.outputs.size() > this.dataAcceptors.length) {
            this.dataAcceptors = Arrays.copyOf(this.dataAcceptors, this.dataAcceptors.length * 2);
        }
        return output;
    }

    @Override
    public StreamConsumer<I> getInput() {
        return this.input;
    }

    @Override
    public List<? extends StreamSupplier<O>> getOutputs() {
        return this.outputs;
    }

    private void start() {
        if (this.outputs.isEmpty()) {
            this.input.acknowledge();
            return;
        }
        this.started = true;
        this.dataAcceptors = Arrays.copyOf(this.dataAcceptors, this.outputs.size());
        this.sync();
    }

    private void sync() {
        if (!this.started) {
            return;
        }
        if (this.outputs.stream().allMatch(Output::canProceed)) {
            this.input.resume(this.acceptorFactory.apply(this.dataAcceptors));
        } else {
            this.input.suspend();
        }
    }

    private final class Input
    extends AbstractStreamConsumer<I> {
        private Input() {
        }

        @Override
        protected void onStarted() {
            StreamSplitter.this.sync();
        }

        @Override
        protected void onEndOfStream() {
            for (Output output : StreamSplitter.this.outputs) {
                output.sendEndOfStream();
            }
        }

        @Override
        protected void onError(Exception e) {
            StreamSplitter.this.outputs.forEach(output -> output.closeEx(e));
        }
    }

    public class Output
    extends AbstractStreamSupplier<O> {
        final int index;

        public Output() {
            this.index = StreamSplitter.this.outputs.size();
        }

        @Override
        protected void onResumed() {
            ((StreamSplitter)StreamSplitter.this).dataAcceptors[this.index] = this.getDataAcceptor();
            this.sync();
        }

        @Override
        protected void onSuspended() {
            ((StreamSplitter)StreamSplitter.this).dataAcceptors[this.index] = this.getBufferedDataAcceptor();
            this.sync();
        }

        @Override
        protected void onError(Exception e) {
            StreamSplitter.this.input.closeEx(e);
        }

        @Override
        protected void onAcknowledge() {
            this.complete();
        }

        protected final void sync() {
            StreamSplitter.this.sync();
        }

        protected final void complete() {
            if (++StreamSplitter.this.completed == StreamSplitter.this.dataAcceptors.length) {
                StreamSplitter.this.input.acknowledge();
            }
        }

        protected boolean canProceed() {
            return this.isReady();
        }
    }
}

