/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream.processor;

import io.activej.common.Checks;
import io.activej.common.initializer.WithInitializer;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.dsl.HasStreamInputs;
import io.activej.datastream.dsl.HasStreamOutput;
import io.activej.datastream.processor.StreamReducers;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public final class StreamReducer<K, O, A>
implements HasStreamInputs,
HasStreamOutput<O>,
WithInitializer<StreamReducer<K, O, A>> {
    public static final int DEFAULT_BUFFER_SIZE = 2000;
    private final List<Input> inputs = new ArrayList<Input>();
    private final Output output = new Output();
    private int bufferSize = 2000;
    @Nullable
    private Input<?> lastInput;
    @Nullable
    private K key = null;
    @Nullable
    private A accumulator;
    private final PriorityQueue<Input<?>> priorityQueue;
    private int streamsAwaiting;
    private int streamsOpen;

    private StreamReducer(@NotNull PriorityQueue<Input<?>> priorityQueue) {
        this.priorityQueue = priorityQueue;
    }

    public static <K, O, A> StreamReducer<K, O, A> create(Comparator<K> keyComparator) {
        return new StreamReducer<K, O, A>(new PriorityQueue(1, (input1, input2) -> {
            int compare = keyComparator.compare(((Input)input1).headKey, ((Input)input2).headKey);
            if (compare != 0) {
                return compare;
            }
            return ((Input)input1).index - ((Input)input2).index;
        }));
    }

    public static <K extends Comparable<K>, O, A> StreamReducer<K, O, A> create() {
        return new StreamReducer<K, O, A>(new PriorityQueue(1, (input1, input2) -> {
            int compare = ((Comparable)((Input)input1).headKey).compareTo((Comparable)((Input)input2).headKey);
            if (compare != 0) {
                return compare;
            }
            return ((Input)input1).index - ((Input)input2).index;
        }));
    }

    public StreamReducer<K, O, A> withBufferSize(int bufferSize) {
        Checks.checkArgument((bufferSize >= 0 ? 1 : 0) != 0, (String)"bufferSize must be positive value, got %s", (Object[])new Object[]{bufferSize});
        this.bufferSize = bufferSize;
        return this;
    }

    public <I> StreamConsumer<I> newInput(Function<I, K> keyFunction, StreamReducers.Reducer<K, I, O, A> reducer) {
        return this.addInput(new SimpleInput<I>(keyFunction, reducer));
    }

    public <I> Input<I> addInput(Input<I> input) {
        this.inputs.add(input);
        input.await();
        ++this.streamsOpen;
        return input;
    }

    @Override
    public List<? extends StreamConsumer<?>> getInputs() {
        return this.inputs;
    }

    @Override
    public StreamSupplier<O> getOutput() {
        return this.output;
    }

    public abstract class Input<I>
    extends AbstractStreamConsumer<I>
    implements StreamDataAcceptor<I>,
    Function<I, K>,
    StreamReducers.Reducer<K, I, O, A> {
        private I headItem;
        private K headKey;
        private final int index;
        private final PriorityQueue<Input<?>> priorityQueue;
        private final ArrayDeque<I> deque = new ArrayDeque();
        private final int bufferSize;

        protected Input() {
            this.index = StreamReducer.this.inputs.size();
            this.priorityQueue = StreamReducer.this.priorityQueue;
            this.bufferSize = StreamReducer.this.bufferSize;
        }

        @Override
        protected void onStarted() {
            this.resume(this);
        }

        @Override
        public void accept(I item) {
            if (this.headItem == null) {
                this.headItem = item;
                this.headKey = this.apply(this.headItem);
                this.priorityQueue.offer(this);
                if (this.advance() == 0) {
                    StreamReducer.this.output.reduce();
                }
            } else {
                this.deque.offer(item);
                if (this.deque.size() == this.bufferSize) {
                    this.suspend();
                    StreamReducer.this.output.reduce();
                }
            }
        }

        @Override
        protected void onEndOfStream() {
            this.closeInput();
            if (this.headItem == null) {
                this.advance();
            }
            StreamReducer.this.output.reduce();
            StreamReducer.this.output.getAcknowledgement().whenResult(this::acknowledge).whenException(this::closeEx);
        }

        @Override
        protected void onError(Exception e) {
            StreamReducer.this.output.closeEx(e);
        }

        @Override
        protected void onCleanup() {
            this.deque.clear();
        }

        protected int await() {
            return ++StreamReducer.this.streamsAwaiting;
        }

        protected int advance() {
            return --StreamReducer.this.streamsAwaiting;
        }

        protected void closeInput() {
            StreamReducer.this.streamsOpen--;
        }

        protected void continueReduce() {
            StreamReducer.this.output.reduce();
        }
    }

    private final class Output
    extends AbstractStreamSupplier<O> {
        private Output() {
        }

        void reduce() {
            this.resume();
        }

        @Override
        protected void onResumed() {
            Input input;
            while (StreamReducer.this.streamsAwaiting == 0 && (input = (Input)StreamReducer.this.priorityQueue.poll()) != null) {
                if (input.isComplete()) continue;
                if (StreamReducer.this.key != null && input.headKey.equals(StreamReducer.this.key)) {
                    StreamReducer.this.accumulator = input.onNextItem(this.getBufferedDataAcceptor(), StreamReducer.this.key, input.headItem, StreamReducer.this.accumulator);
                } else {
                    if (StreamReducer.this.lastInput != null) {
                        StreamReducer.this.lastInput.onComplete(this.getBufferedDataAcceptor(), StreamReducer.this.key, StreamReducer.this.accumulator);
                    }
                    StreamReducer.this.key = input.headKey;
                    StreamReducer.this.accumulator = input.onFirstItem(this.getBufferedDataAcceptor(), StreamReducer.this.key, input.headItem);
                }
                input.headItem = input.deque.poll();
                StreamReducer.this.lastInput = input;
                if (input.headItem != null) {
                    input.headKey = input.apply(input.headItem);
                    StreamReducer.this.priorityQueue.offer(input);
                    continue;
                }
                if (input.isEndOfStream()) continue;
                input.await();
                break;
            }
            for (Input input2 : StreamReducer.this.inputs) {
                if (input2.deque.size() > StreamReducer.this.bufferSize / 2) continue;
                input2.resume(input2);
            }
            if (StreamReducer.this.streamsOpen == 0 && StreamReducer.this.priorityQueue.isEmpty()) {
                if (StreamReducer.this.lastInput != null) {
                    StreamReducer.this.lastInput.onComplete(this.getBufferedDataAcceptor(), StreamReducer.this.key, StreamReducer.this.accumulator);
                    StreamReducer.this.lastInput = null;
                    StreamReducer.this.key = null;
                    StreamReducer.this.accumulator = null;
                }
                StreamReducer.this.output.sendEndOfStream();
            }
        }

        @Override
        protected void onError(Exception e) {
            for (Input input : StreamReducer.this.inputs) {
                input.closeEx(e);
            }
        }

        @Override
        protected void onCleanup() {
            StreamReducer.this.priorityQueue.clear();
        }
    }

    public class SimpleInput<I>
    extends Input<I> {
        private final Function<I, K> keyFunction;
        private final StreamReducers.Reducer<K, I, O, A> reducer;

        public SimpleInput(Function<I, K> keyFunction, StreamReducers.Reducer<K, I, O, A> reducer) {
            this.keyFunction = keyFunction;
            this.reducer = reducer;
        }

        @Override
        public K apply(I item) {
            return this.keyFunction.apply(item);
        }

        @Override
        public A onFirstItem(StreamDataAcceptor<O> stream, K key, I firstValue) {
            return this.reducer.onFirstItem(stream, key, firstValue);
        }

        @Override
        public A onNextItem(StreamDataAcceptor<O> stream, K key, I nextValue, A accumulator) {
            return this.reducer.onNextItem(stream, key, nextValue, accumulator);
        }

        @Override
        public void onComplete(StreamDataAcceptor<O> stream, K key, A accumulator) {
            this.reducer.onComplete(stream, key, accumulator);
        }
    }
}

