/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream.processor;

import io.activej.common.initializer.WithInitializer;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.dsl.HasStreamInputs;
import io.activej.datastream.dsl.HasStreamOutput;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public final class StreamJoin<K, L, R, V>
implements HasStreamInputs,
HasStreamOutput<V>,
WithInitializer<StreamJoin<K, L, R, V>> {
    private final Comparator<K> keyComparator;
    private final Input<L> left;
    private final Input<R> right;
    private final Output output;
    private final ArrayDeque<L> leftDeque = new ArrayDeque();
    private final ArrayDeque<R> rightDeque = new ArrayDeque();
    private final Function<L, K> leftKeyFunction;
    private final Function<R, K> rightKeyFunction;
    private final Joiner<K, L, R, V> joiner;

    private StreamJoin(@NotNull Comparator<K> keyComparator, @NotNull Function<L, K> leftKeyFunction, @NotNull Function<R, K> rightKeyFunction, @NotNull Joiner<K, L, R, V> joiner) {
        this.keyComparator = keyComparator;
        this.joiner = joiner;
        this.left = new Input<L>(this.leftDeque);
        this.right = new Input<R>(this.rightDeque);
        this.leftKeyFunction = leftKeyFunction;
        this.rightKeyFunction = rightKeyFunction;
        this.output = new Output();
    }

    public static <K, L, R, V> StreamJoin<K, L, R, V> create(Comparator<K> keyComparator, Function<L, K> leftKeyFunction, Function<R, K> rightKeyFunction, Joiner<K, L, R, V> joiner) {
        return new StreamJoin<K, L, R, V>(keyComparator, leftKeyFunction, rightKeyFunction, joiner);
    }

    public StreamConsumer<L> getLeft() {
        return this.left;
    }

    public StreamConsumer<R> getRight() {
        return this.right;
    }

    @Override
    public List<? extends StreamConsumer<?>> getInputs() {
        return Arrays.asList(this.left, this.right);
    }

    @Override
    public StreamSupplier<V> getOutput() {
        return this.output;
    }

    private final class Input<I>
    extends AbstractStreamConsumer<I>
    implements StreamDataAcceptor<I> {
        private final Deque<I> deque;

        public Input(Deque<I> deque) {
            this.deque = deque;
        }

        @Override
        public void accept(I item) {
            boolean wasEmpty = this.deque.isEmpty();
            this.deque.addLast(item);
            if (wasEmpty) {
                StreamJoin.this.output.join();
            }
        }

        @Override
        protected void onStarted() {
            StreamJoin.this.output.join();
        }

        @Override
        protected void onEndOfStream() {
            StreamJoin.this.output.join();
            StreamJoin.this.output.getAcknowledgement().whenResult(this::acknowledge).whenException(this::closeEx);
        }

        @Override
        protected void onError(Exception e) {
            StreamJoin.this.output.closeEx(e);
        }
    }

    public static interface Joiner<K, L, R, V> {
        public void onInnerJoin(K var1, L var2, R var3, StreamDataAcceptor<V> var4);

        public void onLeftJoin(K var1, L var2, StreamDataAcceptor<V> var3);
    }

    private final class Output
    extends AbstractStreamSupplier<V> {
        private Output() {
        }

        void join() {
            this.resume();
        }

        @Override
        protected void onResumed() {
            StreamDataAcceptor<Object> acceptor = this::send;
            if (this.isReady() && !StreamJoin.this.leftDeque.isEmpty() && !StreamJoin.this.rightDeque.isEmpty()) {
                Object leftValue = StreamJoin.this.leftDeque.peek();
                Object leftKey = StreamJoin.this.leftKeyFunction.apply(leftValue);
                Object rightValue = StreamJoin.this.rightDeque.peek();
                Object rightKey = StreamJoin.this.rightKeyFunction.apply(rightValue);
                while (true) {
                    int compare;
                    if ((compare = StreamJoin.this.keyComparator.compare(leftKey, rightKey)) < 0) {
                        StreamJoin.this.joiner.onLeftJoin(leftKey, leftValue, acceptor);
                        StreamJoin.this.leftDeque.poll();
                        if (StreamJoin.this.leftDeque.isEmpty()) break;
                        leftValue = StreamJoin.this.leftDeque.peek();
                        leftKey = StreamJoin.this.leftKeyFunction.apply(leftValue);
                        continue;
                    }
                    if (compare > 0) {
                        StreamJoin.this.rightDeque.poll();
                        if (StreamJoin.this.rightDeque.isEmpty()) break;
                        rightValue = StreamJoin.this.rightDeque.peek();
                        rightKey = StreamJoin.this.rightKeyFunction.apply(rightValue);
                        continue;
                    }
                    StreamJoin.this.joiner.onInnerJoin(leftKey, leftValue, rightValue, acceptor);
                    StreamJoin.this.leftDeque.poll();
                    if (StreamJoin.this.leftDeque.isEmpty() || !this.isReady()) break;
                    leftValue = StreamJoin.this.leftDeque.peek();
                    leftKey = StreamJoin.this.leftKeyFunction.apply(leftValue);
                }
            }
            if (this.isReady()) {
                if (StreamJoin.this.left.isEndOfStream() && StreamJoin.this.right.isEndOfStream()) {
                    this.sendEndOfStream();
                } else {
                    StreamJoin.this.left.resume(StreamJoin.this.left);
                    StreamJoin.this.right.resume(StreamJoin.this.right);
                }
            } else {
                StreamJoin.this.left.suspend();
                StreamJoin.this.right.suspend();
            }
        }

        @Override
        protected void onError(Exception e) {
            StreamJoin.this.left.closeEx(e);
            StreamJoin.this.right.closeEx(e);
        }

        @Override
        protected void onCleanup() {
            StreamJoin.this.leftDeque.clear();
            StreamJoin.this.rightDeque.clear();
        }
    }

    public static abstract class ValueJoiner<K, L, R, V>
    implements Joiner<K, L, R, V> {
        public abstract V doInnerJoin(K var1, L var2, R var3);

        @Nullable
        public V doLeftJoin(K key, L left) {
            return null;
        }

        @Override
        public final void onInnerJoin(K key, L left, R right, StreamDataAcceptor<V> output) {
            V result = this.doInnerJoin(key, left, right);
            if (result != null) {
                output.accept(result);
            }
        }

        @Override
        public final void onLeftJoin(K key, L left, StreamDataAcceptor<V> output) {
            V result = this.doLeftJoin(key, left);
            if (result != null) {
                output.accept(result);
            }
        }
    }

    public static abstract class InnerJoiner<K, L, R, V>
    implements Joiner<K, L, R, V> {
        @Override
        public void onLeftJoin(K key, L left, StreamDataAcceptor<V> output) {
        }
    }
}

