/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream;

import io.activej.common.Utils;
import io.activej.common.exception.FatalErrorHandlers;
import io.activej.common.function.ConsumerEx;
import io.activej.csp.ChannelConsumer;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.eventloop.Eventloop;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Collector;
import org.jetbrains.annotations.NotNull;

final class StreamConsumers {
    StreamConsumers() {
    }

    static final class OfAnotherEventloop<T>
    extends AbstractStreamConsumer<T> {
        private static final int MAX_BUFFER_SIZE = 100;
        private static final Iterator<?> END_OF_STREAM = Collections.emptyIterator();
        private List<T> list = new ArrayList<T>();
        private final StreamDataAcceptor<T> toList = item -> {
            this.list.add(item);
            if (this.list.size() == 100) {
                this.flush();
                this.suspend();
            }
        };
        private final StreamConsumer<T> anotherEventloopConsumer;
        private final InternalSupplier internalSupplier;
        private volatile boolean wakingUp;

        public OfAnotherEventloop(@NotNull Eventloop anotherEventloop, @NotNull StreamConsumer<T> anotherEventloopConsumer) {
            this.anotherEventloopConsumer = anotherEventloopConsumer;
            this.internalSupplier = (InternalSupplier)Eventloop.initWithEventloop((Eventloop)anotherEventloop, () -> new InternalSupplier());
        }

        void execute(Runnable runnable) {
            this.eventloop.execute(runnable);
        }

        void wakeUp() {
            if (this.wakingUp) {
                return;
            }
            this.wakingUp = true;
            this.execute(this::onWakeUp);
        }

        void onWakeUp() {
            if (this.isComplete()) {
                return;
            }
            this.wakingUp = false;
            this.flush();
            if (this.internalSupplier.isReady) {
                this.resume(this.toList);
                this.internalSupplier.wakeUp();
            } else {
                this.suspend();
            }
        }

        @Override
        protected void onInit() {
            this.eventloop.startExternalTask();
        }

        @Override
        protected void onStarted() {
            this.internalSupplier.execute(() -> this.internalSupplier.streamTo(this.anotherEventloopConsumer));
        }

        @Override
        protected void onEndOfStream() {
            this.flush();
        }

        @Override
        protected void onComplete() {
            this.eventloop.completeExternalTask();
        }

        private void flush() {
            if (this.internalSupplier.iterator != null) {
                return;
            }
            if (this.isEndOfStream() && this.list.isEmpty()) {
                this.internalSupplier.iterator = END_OF_STREAM;
            } else if (!this.list.isEmpty()) {
                this.internalSupplier.iterator = this.list.iterator();
                this.list = new ArrayList<T>();
            } else {
                return;
            }
            this.internalSupplier.wakeUp();
        }

        @Override
        protected void onError(Exception e) {
            this.internalSupplier.execute(() -> this.internalSupplier.closeEx(e));
        }

        @Override
        protected void onCleanup() {
            this.list = null;
        }

        final class InternalSupplier
        extends AbstractStreamSupplier<T> {
            volatile Iterator<T> iterator;
            volatile boolean isReady;
            volatile boolean wakingUp;

            InternalSupplier() {
            }

            void execute(Runnable runnable) {
                this.eventloop.execute(runnable);
            }

            void wakeUp() {
                if (this.wakingUp) {
                    return;
                }
                this.wakingUp = true;
                this.execute(this::onWakeUp);
            }

            void onWakeUp() {
                if (this.isComplete()) {
                    return;
                }
                this.wakingUp = false;
                this.flush();
            }

            @Override
            protected void onInit() {
                this.eventloop.startExternalTask();
            }

            @Override
            protected void onResumed() {
                this.isReady = true;
                this.flush();
            }

            @Override
            protected void onSuspended() {
                this.isReady = false;
                OfAnotherEventloop.this.wakeUp();
            }

            @Override
            protected void onComplete() {
                this.eventloop.completeExternalTask();
            }

            private void flush() {
                if (this.iterator == null) {
                    OfAnotherEventloop.this.wakeUp();
                    return;
                }
                Iterator iterator = this.iterator;
                while (this.isReady() && iterator.hasNext()) {
                    this.send(iterator.next());
                }
                if (iterator == END_OF_STREAM) {
                    this.sendEndOfStream();
                } else if (!this.iterator.hasNext()) {
                    this.iterator = null;
                    OfAnotherEventloop.this.wakeUp();
                }
            }

            @Override
            protected void onAcknowledge() {
                OfAnotherEventloop.this.execute(OfAnotherEventloop.this::acknowledge);
            }

            @Override
            protected void onError(Exception e) {
                OfAnotherEventloop.this.execute(() -> OfAnotherEventloop.this.closeEx(e));
            }

            @Override
            protected void onCleanup() {
                this.iterator = null;
            }
        }
    }

    static final class ToCollector<T, A, R>
    extends AbstractStreamConsumer<T> {
        private final SettablePromise<R> resultPromise = new SettablePromise();
        private final Collector<T, A, R> collector;
        private A accumulator;

        public ToCollector(Collector<T, A, R> collector) {
            this.collector = collector;
        }

        public Promise<R> getResult() {
            return this.resultPromise;
        }

        @Override
        protected void onInit() {
            this.resultPromise.whenResult(this::acknowledge);
        }

        @Override
        protected void onStarted() {
            Object accumulator = this.collector.supplier().get();
            this.accumulator = accumulator;
            BiConsumer consumer = this.collector.accumulator();
            this.resume(item -> consumer.accept(accumulator, item));
        }

        @Override
        protected void onEndOfStream() {
            this.resultPromise.set(this.collector.finisher().apply(this.accumulator));
        }

        @Override
        protected void onError(Exception e) {
            this.resultPromise.setException(e);
        }

        @Override
        protected void onCleanup() {
            this.accumulator = null;
        }
    }

    static final class OfChannelConsumer<T>
    extends AbstractStreamConsumer<T> {
        private final ChannelConsumer<T> consumer;
        private boolean working;

        OfChannelConsumer(ChannelConsumer<T> consumer) {
            this.consumer = consumer;
        }

        @Override
        protected void onStarted() {
            this.flush();
        }

        private void flush() {
            this.resume(item -> {
                Promise promise = this.consumer.accept(item).whenException(this::closeEx);
                if (promise.isComplete()) {
                    return;
                }
                this.suspend();
                this.working = true;
                promise.whenResult(() -> {
                    this.working = false;
                    if (!this.isEndOfStream()) {
                        this.flush();
                    } else {
                        this.sendEndOfStream();
                    }
                });
            });
        }

        @Override
        protected void onEndOfStream() {
            if (!this.working) {
                this.sendEndOfStream();
            }
        }

        private void sendEndOfStream() {
            this.consumer.acceptEndOfStream().whenResult(this::acknowledge).whenException(this::closeEx);
        }

        @Override
        protected void onError(Exception e) {
            this.consumer.closeEx(e);
        }
    }

    static final class OfPromise<T>
    extends AbstractStreamConsumer<T> {
        private Promise<? extends StreamConsumer<T>> promise;
        private final InternalSupplier internalSupplier = new InternalSupplier();

        public OfPromise(@NotNull Promise<? extends StreamConsumer<T>> promise) {
            this.promise = promise;
        }

        @Override
        protected void onInit() {
            this.promise.whenResult(consumer -> {
                consumer.getAcknowledgement().whenResult(this::acknowledge).whenException(this::closeEx);
                this.getAcknowledgement().whenException(e -> consumer.closeEx((Exception)e));
                this.internalSupplier.streamTo(consumer);
            }).whenException(this::closeEx);
        }

        @Override
        protected void onEndOfStream() {
            this.internalSupplier.sendEndOfStream();
        }

        @Override
        protected void onCleanup() {
            this.promise = null;
        }

        private class InternalSupplier
        extends AbstractStreamSupplier<T> {
            private InternalSupplier() {
            }

            @Override
            protected void onResumed() {
                OfPromise.this.resume(this.getDataAcceptor());
            }

            @Override
            protected void onSuspended() {
                OfPromise.this.suspend();
            }
        }
    }

    static final class ClosingWithError<T>
    extends AbstractStreamConsumer<T> {
        private Exception error;

        ClosingWithError(Exception e) {
            this.error = e;
        }

        @Override
        protected void onInit() {
            this.error = (Exception)Utils.nullify((Object)this.error, this::closeEx);
        }
    }

    static final class OfConsumer<T>
    extends AbstractStreamConsumer<T> {
        private final ConsumerEx<T> consumer;

        OfConsumer(ConsumerEx<T> consumer) {
            this.consumer = consumer;
        }

        @Override
        protected void onStarted() {
            this.resume(item -> {
                try {
                    this.consumer.accept(item);
                }
                catch (Exception ex) {
                    FatalErrorHandlers.handleError((Throwable)ex, (Object)this);
                    this.closeEx(ex);
                }
            });
        }

        @Override
        protected void onEndOfStream() {
            this.acknowledge();
        }
    }

    static final class Skip<T>
    extends AbstractStreamConsumer<T> {
        Skip() {
        }

        @Override
        protected void onStarted() {
            this.resume(item -> {});
        }

        @Override
        protected void onEndOfStream() {
            this.acknowledge();
        }
    }

    static final class Idle<T>
    extends AbstractStreamConsumer<T> {
        Idle() {
        }

        @Override
        protected void onEndOfStream() {
            this.acknowledge();
        }
    }
}

