/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream;

import io.activej.common.Utils;
import io.activej.csp.ChannelSupplier;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.eventloop.Eventloop;
import io.activej.promise.Promise;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.jetbrains.annotations.NotNull;

final class StreamSuppliers {
    StreamSuppliers() {
    }

    static final class OfAnotherEventloop<T>
    extends AbstractStreamSupplier<T> {
        private static final int MAX_BUFFER_SIZE = 100;
        private static final Iterator<?> END_OF_STREAM = Collections.emptyIterator();
        private volatile Iterator<T> iterator;
        private volatile boolean isReady;
        private final StreamSupplier<T> anotherEventloopSupplier;
        private final InternalConsumer internalConsumer;
        private volatile boolean wakingUp;

        public OfAnotherEventloop(@NotNull Eventloop anotherEventloop, @NotNull StreamSupplier<T> anotherEventloopSupplier) {
            this.anotherEventloopSupplier = anotherEventloopSupplier;
            this.internalConsumer = (InternalConsumer)Eventloop.initWithEventloop((Eventloop)anotherEventloop, () -> new InternalConsumer());
        }

        void execute(Runnable runnable) {
            this.eventloop.execute(runnable);
        }

        void wakeUp() {
            if (this.wakingUp) {
                return;
            }
            this.wakingUp = true;
            this.execute(this::onWakeUp);
        }

        void onWakeUp() {
            if (this.isComplete()) {
                return;
            }
            this.wakingUp = false;
            this.flush();
        }

        @Override
        protected void onInit() {
            this.eventloop.startExternalTask();
        }

        @Override
        protected void onStarted() {
            this.internalConsumer.execute(() -> this.anotherEventloopSupplier.streamTo(this.internalConsumer));
        }

        @Override
        protected void onResumed() {
            this.isReady = true;
            this.flush();
        }

        @Override
        protected void onSuspended() {
            this.isReady = false;
            this.internalConsumer.wakeUp();
        }

        @Override
        protected void onComplete() {
            this.eventloop.completeExternalTask();
        }

        private void flush() {
            if (this.iterator == null) {
                this.internalConsumer.wakeUp();
                return;
            }
            Iterator<T> iterator = this.iterator;
            while (this.isReady() && iterator.hasNext()) {
                this.send(iterator.next());
            }
            if (iterator == END_OF_STREAM) {
                this.sendEndOfStream();
            } else if (!iterator.hasNext()) {
                this.iterator = null;
                this.internalConsumer.wakeUp();
            }
        }

        @Override
        protected void onAcknowledge() {
            this.internalConsumer.execute(this.internalConsumer::acknowledge);
        }

        @Override
        protected void onError(Exception e) {
            this.internalConsumer.execute(() -> this.internalConsumer.closeEx(e));
        }

        @Override
        protected void onCleanup() {
            this.iterator = null;
        }

        final class InternalConsumer
        extends AbstractStreamConsumer<T> {
            private List<T> list = new ArrayList();
            private final StreamDataAcceptor<T> toList = item -> {
                this.list.add(item);
                if (this.list.size() == 100) {
                    this.flush();
                    this.suspend();
                }
            };
            volatile boolean wakingUp;

            InternalConsumer() {
            }

            void execute(Runnable runnable) {
                this.eventloop.execute(runnable);
            }

            void wakeUp() {
                if (this.wakingUp) {
                    return;
                }
                this.wakingUp = true;
                this.execute(this::onWakeUp);
            }

            void onWakeUp() {
                if (this.isComplete()) {
                    return;
                }
                this.wakingUp = false;
                this.flush();
                if (OfAnotherEventloop.this.isReady) {
                    this.resume(this.toList);
                } else {
                    this.suspend();
                }
            }

            @Override
            protected void onInit() {
                this.eventloop.startExternalTask();
            }

            @Override
            protected void onEndOfStream() {
                this.flush();
            }

            private void flush() {
                if (OfAnotherEventloop.this.iterator != null) {
                    return;
                }
                if (this.isEndOfStream() && this.list.isEmpty()) {
                    OfAnotherEventloop.this.iterator = END_OF_STREAM;
                } else if (!this.list.isEmpty()) {
                    OfAnotherEventloop.this.iterator = this.list.iterator();
                    this.list = new ArrayList();
                } else {
                    return;
                }
                OfAnotherEventloop.this.wakeUp();
            }

            @Override
            protected void onError(Exception e) {
                OfAnotherEventloop.this.execute(() -> OfAnotherEventloop.this.closeEx(e));
            }

            @Override
            protected void onComplete() {
                this.eventloop.completeExternalTask();
            }

            @Override
            protected void onCleanup() {
                this.list = null;
            }
        }
    }

    static final class OfChannelSupplier<T>
    extends AbstractStreamSupplier<T> {
        private final ChannelSupplier<T> supplier;

        public OfChannelSupplier(ChannelSupplier<T> supplier) {
            this.supplier = supplier;
        }

        @Override
        protected void onResumed() {
            this.asyncBegin();
            this.supplier.get().run((item, e) -> {
                if (e == null) {
                    if (item != null) {
                        this.send(item);
                        this.asyncResume();
                    } else {
                        this.sendEndOfStream();
                    }
                } else {
                    this.closeEx(e);
                }
            });
        }

        @Override
        protected void onError(Exception e) {
            this.supplier.closeEx(e);
        }
    }

    static final class Concat<T>
    extends AbstractStreamSupplier<T> {
        private ChannelSupplier<StreamSupplier<T>> iterator;
        private InternalConsumer internalConsumer = new InternalConsumer();

        Concat(ChannelSupplier<StreamSupplier<T>> iterator) {
            this.iterator = iterator;
        }

        @Override
        protected void onStarted() {
            this.next();
        }

        private void next() {
            this.internalConsumer.acknowledge();
            this.internalConsumer = new InternalConsumer();
            this.resume();
            this.iterator.get().whenResult(supplier -> {
                if (supplier != null) {
                    supplier.getEndOfStream().whenResult(this::next).whenException(this::closeEx);
                    supplier.streamTo(this.internalConsumer);
                } else {
                    this.sendEndOfStream();
                }
            }).whenException(this::closeEx);
        }

        @Override
        protected void onResumed() {
            this.internalConsumer.resume(this.getDataAcceptor());
        }

        @Override
        protected void onSuspended() {
            this.internalConsumer.suspend();
        }

        @Override
        protected void onError(Exception e) {
            this.internalConsumer.closeEx(e);
            this.iterator.closeEx(e);
        }

        @Override
        protected void onCleanup() {
            this.iterator = null;
        }

        private class InternalConsumer
        extends AbstractStreamConsumer<T> {
            private InternalConsumer() {
            }
        }
    }

    static final class OfPromise<T>
    extends AbstractStreamSupplier<T> {
        private Promise<? extends StreamSupplier<T>> promise;
        private final InternalConsumer internalConsumer = new InternalConsumer();

        public OfPromise(Promise<? extends StreamSupplier<T>> promise) {
            this.promise = promise;
        }

        @Override
        protected void onInit() {
            this.promise.whenResult(supplier -> {
                this.getEndOfStream().whenException(e -> supplier.closeEx((Exception)e));
                supplier.getEndOfStream().whenResult(this::sendEndOfStream).whenException(this::closeEx);
                supplier.streamTo(this.internalConsumer);
            }).whenException(this::closeEx);
        }

        @Override
        protected void onResumed() {
            this.internalConsumer.resume(this.getDataAcceptor());
        }

        @Override
        protected void onSuspended() {
            this.internalConsumer.suspend();
        }

        @Override
        protected void onAcknowledge() {
            this.internalConsumer.acknowledge();
        }

        @Override
        protected void onCleanup() {
            this.promise = null;
        }

        private class InternalConsumer
        extends AbstractStreamConsumer<T> {
            private InternalConsumer() {
            }
        }
    }

    static final class OfIterator<T>
    extends AbstractStreamSupplier<T> {
        private final Iterator<T> iterator;

        public OfIterator(@NotNull Iterator<T> iterator) {
            this.iterator = iterator;
        }

        @Override
        protected void onResumed() {
            while (this.isReady() && this.iterator.hasNext()) {
                this.send(this.iterator.next());
            }
            if (!this.iterator.hasNext()) {
                this.sendEndOfStream();
            }
        }
    }

    static final class Idle<T>
    extends AbstractStreamSupplier<T> {
        Idle() {
        }
    }

    static final class Closing<T>
    extends AbstractStreamSupplier<T> {
        Closing() {
        }

        @Override
        protected void onInit() {
            this.sendEndOfStream();
        }
    }

    static final class ClosingWithError<T>
    extends AbstractStreamSupplier<T> {
        private Exception error;

        ClosingWithError(Exception e) {
            this.error = e;
        }

        @Override
        protected void onInit() {
            this.error = (Exception)Utils.nullify((Object)this.error, this::closeEx);
        }
    }
}

