/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp;

import io.activej.async.function.AsyncSupplier;
import io.activej.async.process.AsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufPool;
import io.activej.common.MemSize;
import io.activej.common.Utils;
import io.activej.common.collection.Try;
import io.activej.common.exception.FatalErrorHandlers;
import io.activej.common.function.BiConsumerEx;
import io.activej.common.function.FunctionEx;
import io.activej.common.recycle.Recyclers;
import io.activej.csp.AbstractChannelSupplier;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.queue.ChannelBuffer;
import io.activej.csp.queue.ChannelZeroBuffer;
import io.activej.eventloop.Eventloop;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettablePromise;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.ToIntFunction;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public final class ChannelSuppliers {
    @NotNull
    private static final MemSize DEFAULT_BUFFER_SIZE = MemSize.kilobytes((long)8L);

    public static <T> ChannelSupplier<T> concat(ChannelSupplier<? extends T> supplier1, ChannelSupplier<? extends T> supplier2) {
        return ChannelSuppliers.concat(Arrays.asList(supplier1, supplier2));
    }

    @SafeVarargs
    public static <T> ChannelSupplier<T> concat(ChannelSupplier<? extends T> ... suppliers) {
        return ChannelSuppliers.concat(Arrays.asList(suppliers));
    }

    public static <T> ChannelSupplier<T> concat(List<ChannelSupplier<? extends T>> suppliers) {
        return new ChannelSupplierConcat<T>(suppliers.iterator(), true);
    }

    public static <T> ChannelSupplier<T> concat(Iterator<? extends ChannelSupplier<? extends T>> iterator) {
        return new ChannelSupplierConcat(iterator, false);
    }

    public static <T, A, R> Promise<R> collect(ChannelSupplier<T> supplier, A initialValue, BiConsumerEx<A, T> accumulator, FunctionEx<A, R> finisher) {
        return Promise.ofCallback(cb -> ChannelSuppliers.toCollectorImpl(supplier, initialValue, accumulator, finisher, cb));
    }

    private static <T, A, R> void toCollectorImpl(ChannelSupplier<T> supplier, A accumulatedValue, BiConsumerEx<A, T> accumulator, FunctionEx<A, R> finisher, SettablePromise<R> cb) {
        Object item;
        Promise<T> promise;
        while ((promise = supplier.get()).isResult() && (item = promise.getResult()) != null) {
            try {
                accumulator.accept(accumulatedValue, item);
            }
            catch (Exception ex) {
                FatalErrorHandlers.handleError((Throwable)ex, cb);
                supplier.closeEx(ex);
                cb.setException(ex);
                return;
            }
        }
        promise.run((value, e) -> {
            if (e == null) {
                if (value != null) {
                    try {
                        accumulator.accept(accumulatedValue, value);
                    }
                    catch (Exception ex) {
                        FatalErrorHandlers.handleError((Throwable)ex, (Object)cb);
                        supplier.closeEx(ex);
                        cb.setException(ex);
                        return;
                    }
                    ChannelSuppliers.toCollectorImpl(supplier, accumulatedValue, accumulator, finisher, cb);
                } else {
                    Object result;
                    try {
                        result = finisher.apply(accumulatedValue);
                    }
                    catch (Exception ex) {
                        FatalErrorHandlers.handleError((Throwable)ex, (Object)cb);
                        cb.setException(ex);
                        return;
                    }
                    cb.set(result);
                }
            } else {
                Recyclers.recycle((Object)accumulatedValue);
                cb.setException(e);
            }
        });
    }

    public static <T> Promise<Void> streamTo(Promise<ChannelSupplier<T>> supplier, Promise<ChannelConsumer<T>> consumer) {
        return Promises.toTuple((Promise)supplier.toTry(), (Promise)consumer.toTry()).then(t -> ChannelSuppliers.streamTo((Try)t.getValue1(), (Try)t.getValue2()));
    }

    public static <T> Promise<Void> streamTo(Try<ChannelSupplier<T>> supplier, Try<ChannelConsumer<T>> consumer) {
        if (supplier.isSuccess() && consumer.isSuccess()) {
            return ChannelSuppliers.streamTo((ChannelSupplier)supplier.get(), (ChannelConsumer)consumer.get());
        }
        Exception exception = new Exception("Channel stream failed");
        supplier.consume(AsyncCloseable::close, exception::addSuppressed);
        consumer.consume(AsyncCloseable::close, exception::addSuppressed);
        return Promise.ofException((Exception)exception);
    }

    public static <T> Promise<Void> streamTo(ChannelSupplier<T> supplier, ChannelConsumer<T> consumer) {
        return Promise.ofCallback(cb -> ChannelSuppliers.streamToImpl(supplier, consumer, (SettablePromise<Void>)cb));
    }

    private static <T> void streamToImpl(ChannelSupplier<T> supplier, ChannelConsumer<T> consumer, SettablePromise<Void> cb) {
        Object item2;
        Promise<T> supplierPromise;
        while ((supplierPromise = supplier.get()).isResult() && (item2 = supplierPromise.getResult()) != null) {
            Promise<Void> consumerPromise = consumer.accept(item2);
            if (consumerPromise.isResult()) continue;
            consumerPromise.run(($, e) -> {
                if (e == null) {
                    ChannelSuppliers.streamToImpl(supplier, consumer, cb);
                } else {
                    supplier.closeEx(e);
                    cb.trySetException(e);
                }
            });
            return;
        }
        supplierPromise.run((item, e1) -> {
            if (e1 == null) {
                consumer.accept(item).run(($, e2) -> {
                    if (e2 == null) {
                        if (item != null) {
                            ChannelSuppliers.streamToImpl(supplier, consumer, cb);
                        } else {
                            cb.trySet(null);
                        }
                    } else {
                        supplier.closeEx(e2);
                        cb.trySetException(e2);
                    }
                });
            } else {
                consumer.closeEx(e1);
                cb.trySetException(e1);
            }
        });
    }

    public static <T> ChannelSupplier<T> prefetch(int count, ChannelSupplier<T> actual) {
        ChannelBuffer buffer = new ChannelBuffer(count);
        actual.streamTo(buffer.getConsumer());
        return buffer.getSupplier();
    }

    public static <T> ChannelSupplier<T> prefetch(ChannelSupplier<T> actual) {
        ChannelZeroBuffer buffer = new ChannelZeroBuffer();
        actual.streamTo(buffer.getConsumer());
        return buffer.getSupplier();
    }

    public static <T, V> ChannelSupplier<V> remap(final ChannelSupplier<T> supplier, final Function<? super T, ? extends Iterator<? extends V>> fn) {
        return new AbstractChannelSupplier<V>(supplier){
            Iterator<? extends V> iterator;
            boolean endOfStream;
            {
                super(closeable);
                this.iterator = Utils.iteratorOf();
            }

            @Override
            protected Promise<V> doGet() {
                if (this.iterator.hasNext()) {
                    return Promise.of(this.iterator.next());
                }
                return Promise.ofCallback(this::next);
            }

            private void next(SettablePromise<V> cb) {
                if (!this.endOfStream) {
                    supplier.get().run((item, e) -> {
                        if (e == null) {
                            if (item == null) {
                                this.endOfStream = true;
                            }
                            this.iterator = (Iterator)fn.apply(item);
                            if (this.iterator.hasNext()) {
                                cb.set(this.iterator.next());
                            } else {
                                this.next(cb);
                            }
                        } else {
                            cb.setException(e);
                        }
                    });
                } else {
                    cb.set(null);
                }
            }
        };
    }

    public static ChannelSupplier<ByteBuf> inputStreamAsChannelSupplier(Executor executor, MemSize bufSize, InputStream is) {
        return ChannelSuppliers.inputStreamAsChannelSupplier(executor, bufSize.toInt(), is);
    }

    public static ChannelSupplier<ByteBuf> inputStreamAsChannelSupplier(Executor executor, InputStream is) {
        return ChannelSuppliers.inputStreamAsChannelSupplier(executor, DEFAULT_BUFFER_SIZE, is);
    }

    public static ChannelSupplier<ByteBuf> inputStreamAsChannelSupplier(final Executor executor, final int bufSize, final InputStream inputStream) {
        return new AbstractChannelSupplier<ByteBuf>(){

            @Override
            protected Promise<ByteBuf> doGet() {
                return Promise.ofBlocking((Executor)executor, () -> {
                    int readBytes;
                    ByteBuf buf = ByteBufPool.allocate((int)bufSize);
                    try {
                        readBytes = inputStream.read(buf.array(), 0, bufSize);
                    }
                    catch (IOException e) {
                        buf.recycle();
                        throw e;
                    }
                    if (readBytes != -1) {
                        buf.moveTail(readBytes);
                        return buf;
                    }
                    buf.recycle();
                    return null;
                });
            }

            protected void onClosed(@NotNull Exception e) {
                executor.execute(() -> {
                    try {
                        inputStream.close();
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                });
            }
        };
    }

    public static InputStream channelSupplierAsInputStream(final Eventloop eventloop, final ChannelSupplier<ByteBuf> channelSupplier) {
        return new InputStream(){
            @Nullable
            private ByteBuf current = null;
            private boolean isClosed;
            private boolean isEOS;

            @Override
            public int read() throws IOException {
                return this.doRead(ByteBuf::readByte);
            }

            @Override
            public int read(byte @NotNull [] b, int off, int len) throws IOException {
                return this.doRead(buf -> buf.read(b, off, Math.min(buf.readRemaining(), len)));
            }

            private int doRead(ToIntFunction<ByteBuf> reader) throws IOException {
                if (this.isClosed) {
                    throw new IOException("Stream Closed");
                }
                if (this.isEOS) {
                    return -1;
                }
                ByteBuf peeked = this.current;
                if (peeked == null) {
                    ByteBuf buf;
                    do {
                        if ((buf = (ByteBuf)this.submit(channelSupplier::get)) != null) continue;
                        this.isEOS = true;
                        return -1;
                    } while (!buf.canRead());
                    peeked = buf;
                }
                int result = reader.applyAsInt(peeked);
                if (peeked.canRead()) {
                    this.current = peeked;
                } else {
                    this.current = null;
                    peeked.recycle();
                }
                return result;
            }

            @Override
            public void close() throws IOException {
                if (this.isClosed) {
                    return;
                }
                this.isClosed = true;
                this.current = (ByteBuf)Utils.nullify((Object)this.current, ByteBuf::recycle);
                this.submit(() -> {
                    channelSupplier.close();
                    return Promise.complete();
                });
            }

            private <T> T submit(AsyncSupplier<T> supplier) throws IOException {
                CompletableFuture future = eventloop.submit(() -> supplier.get());
                try {
                    return future.get();
                }
                catch (InterruptedException e) {
                    this.close();
                    Thread.currentThread().interrupt();
                    throw new IOException(e);
                }
                catch (ExecutionException e) {
                    this.close();
                    Throwable cause = e.getCause();
                    if (cause instanceof IOException) {
                        throw (IOException)cause;
                    }
                    if (cause instanceof RuntimeException) {
                        throw (RuntimeException)cause;
                    }
                    if (cause instanceof Exception) {
                        throw new IOException(cause);
                    }
                    if (cause instanceof Error) {
                        throw (Error)cause;
                    }
                    throw new RuntimeException(cause);
                }
            }
        };
    }

    public static final class ChannelSupplierConcat<T>
    extends AbstractChannelSupplier<T> {
        private final Iterator<? extends ChannelSupplier<? extends T>> iterator;
        private final boolean ownership;
        ChannelSupplier<? extends T> current = ChannelSupplier.of();

        public ChannelSupplierConcat(Iterator<? extends ChannelSupplier<? extends T>> iterator, boolean ownership) {
            this.iterator = iterator;
            this.ownership = ownership;
        }

        @Override
        protected Promise<T> doGet() {
            return this.current.get().then((value, e) -> {
                if (e == null) {
                    if (value != null) {
                        return Promise.of((Object)value);
                    }
                    if (this.iterator.hasNext()) {
                        this.current = this.iterator.next();
                        return this.get();
                    }
                    return Promise.of(null);
                }
                this.closeEx((Exception)e);
                return Promise.ofException((Exception)e);
            });
        }

        protected void onClosed(@NotNull Exception e) {
            this.current.closeEx(e);
            if (this.ownership) {
                this.iterator.forEachRemaining(Recyclers::recycle);
            } else {
                Recyclers.recycle(this.iterator);
            }
        }
    }

    public static final class ChannelSupplierOfException<T>
    extends AbstractChannelSupplier<T> {
        private final Exception e;

        public ChannelSupplierOfException(Exception e) {
            this.e = e;
        }

        @Override
        protected Promise<T> doGet() {
            return Promise.ofException((Exception)this.e);
        }
    }

    public static final class ChannelSupplierOfIterator<T>
    extends AbstractChannelSupplier<T> {
        private final Iterator<? extends T> iterator;
        private final boolean ownership;

        public ChannelSupplierOfIterator(Iterator<? extends T> iterator, boolean ownership) {
            this.iterator = iterator;
            this.ownership = ownership;
        }

        @Override
        protected Promise<T> doGet() {
            return Promise.of(this.iterator.hasNext() ? this.iterator.next() : null);
        }

        protected void onCleanup() {
            if (this.ownership) {
                this.iterator.forEachRemaining(Recyclers::recycle);
            } else {
                Recyclers.recycle(this.iterator);
            }
        }
    }

    public static final class ChannelSupplierOfValue<T>
    extends AbstractChannelSupplier<T> {
        private T item;

        public T getValue() {
            return this.item;
        }

        public T takeValue() {
            T item = this.item;
            this.item = null;
            return item;
        }

        public ChannelSupplierOfValue(@NotNull T item) {
            this.item = item;
        }

        @Override
        protected Promise<T> doGet() {
            T item = this.takeValue();
            return Promise.of(item);
        }

        protected void onCleanup() {
            this.item = Utils.nullify(this.item, Recyclers::recycle);
        }
    }

    public static class ChannelSupplierEmpty<T>
    extends AbstractChannelSupplier<T> {
        @Override
        protected Promise<T> doGet() {
            return Promise.of(null);
        }
    }
}

