/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream.processor;

import io.activej.common.Checks;
import io.activej.common.initializer.WithInitializer;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.dsl.HasStreamInputs;
import io.activej.datastream.dsl.HasStreamOutput;
import io.activej.eventloop.Eventloop;
import java.util.ArrayList;
import java.util.List;

public final class StreamUnion<T>
implements HasStreamOutput<T>,
HasStreamInputs,
WithInitializer<StreamUnion<T>> {
    private final List<Input> inputs = new ArrayList<Input>();
    private final Output output = new Output();
    private boolean started;

    private StreamUnion() {
    }

    public static <T> StreamUnion<T> create() {
        StreamUnion<T> union = new StreamUnion<T>();
        Eventloop.getCurrentEventloop().post(union::start);
        return union;
    }

    @Override
    public List<? extends StreamConsumer<?>> getInputs() {
        return this.inputs;
    }

    @Override
    public StreamSupplier<T> getOutput() {
        return this.output;
    }

    public StreamConsumer<T> newInput() {
        Checks.checkState((!this.started ? 1 : 0) != 0, (Object)"Cannot add new inputs after StreamUnion has been started");
        Input input = new Input();
        this.inputs.add(input);
        input.getAcknowledgement().whenException(this.output::closeEx);
        this.output.getAcknowledgement().whenResult(input::acknowledge).whenException(input::closeEx);
        return input;
    }

    private void start() {
        this.started = true;
        this.sync();
    }

    private void sync() {
        if (!this.started) {
            return;
        }
        if (this.inputs.stream().allMatch(AbstractStreamConsumer::isEndOfStream)) {
            this.output.sendEndOfStream();
            return;
        }
        for (Input input : this.inputs) {
            input.resume(this.output.getDataAcceptor());
        }
    }

    private final class Output
    extends AbstractStreamSupplier<T> {
        private Output() {
        }

        @Override
        protected void onResumed() {
            StreamUnion.this.sync();
        }

        @Override
        protected void onSuspended() {
            StreamUnion.this.sync();
        }
    }

    private final class Input
    extends AbstractStreamConsumer<T> {
        private Input() {
        }

        @Override
        protected void onStarted() {
            StreamUnion.this.sync();
        }

        @Override
        protected void onEndOfStream() {
            StreamUnion.this.sync();
        }
    }
}

