/*
 * Decompiled with CFR 0.152.
 */
package com.artipie.asto.streams;

import com.artipie.asto.ByteArray;
import com.artipie.asto.Content;
import com.artipie.asto.Key;
import com.artipie.asto.Storage;
import com.artipie.asto.misc.UncheckedIOSupplier;
import com.artipie.asto.misc.UncheckedRunnable;
import io.reactivex.Flowable;
import io.reactivex.FlowableSubscriber;
import io.reactivex.Scheduler;
import io.reactivex.processors.UnicastProcessor;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.DefaultSubscriber;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;

public final class StorageValuePipeline<R> {
    private final Storage asto;
    private final Key read;
    private final Key write;

    public StorageValuePipeline(Storage asto, Key read, Key write) {
        this.asto = asto;
        this.read = read;
        this.write = write;
    }

    public StorageValuePipeline(Storage asto, Key key) {
        this(asto, key, key);
    }

    public CompletionStage<Void> process(BiConsumer<Optional<InputStream>, OutputStream> action) {
        return this.processWithResult((opt, input) -> {
            action.accept((Optional<InputStream>)opt, (OutputStream)input);
            return null;
        }).thenAccept(nothing -> {});
    }

    public CompletionStage<R> processWithResult(BiFunction<Optional<InputStream>, OutputStream, R> action) {
        AtomicReference res = new AtomicReference();
        return ((CompletableFuture)((CompletableFuture)this.asto.exists(this.read).thenCompose(exists -> {
            CompletionStage stage = exists != false ? this.asto.value(this.read).thenApply(content -> Optional.of(new ContentAsInputStream((Content)content).inputStream())) : CompletableFuture.completedFuture(Optional.empty());
            return stage;
        })).thenCompose(optional -> {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1050)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        })).thenApply(nothing -> res.get());
    }

    static class PublishingOutputStream
    extends OutputStream {
        private static final long DEFAULT_TIMESPAN = 100L;
        private static final int DEFAULT_BUF_SIZE = 4096;
        private final UnicastProcessor<ByteBuffer> pub = UnicastProcessor.create();
        private final UnicastProcessor<Byte> bufproc = UnicastProcessor.create();

        PublishingOutputStream() {
            this(100L, TimeUnit.MILLISECONDS, 4096);
        }

        PublishingOutputStream(long timespan, TimeUnit unit, int count) {
            this.bufproc.buffer(timespan, unit, count).doOnNext(list -> this.pub.onNext((Object)ByteBuffer.wrap(new ByteArray((List<Byte>)list).primitiveBytes()))).subscribeOn(Schedulers.io()).doOnComplete(() -> this.pub.onComplete()).subscribe();
        }

        @Override
        public void write(int b) throws IOException {
            this.bufproc.onNext((Object)((byte)b));
        }

        @Override
        public void close() throws IOException {
            super.close();
            this.bufproc.onComplete();
        }

        Publisher<ByteBuffer> publisher() {
            return this.pub;
        }
    }

    static class ContentAsInputStream
    extends DefaultSubscriber<ByteBuffer> {
        private final Content content;
        private final Scheduler scheduler;
        private final PipedOutputStream out;
        private final PipedInputStream input;
        private final WritableByteChannel channel;

        ContentAsInputStream(Content content) {
            this(content, Schedulers.io());
        }

        ContentAsInputStream(Content content, Scheduler scheduler) {
            this.content = content;
            this.scheduler = scheduler;
            this.out = new PipedOutputStream();
            this.input = new UncheckedIOSupplier<PipedInputStream>(() -> new PipedInputStream(this.out)).get();
            this.channel = Channels.newChannel(this.out);
        }

        public void onNext(ByteBuffer buffer) {
            Objects.requireNonNull(buffer);
            UncheckedRunnable.newIoRunnable(() -> {
                while (buffer.hasRemaining()) {
                    this.channel.write(buffer);
                }
            }).run();
        }

        public void onError(Throwable err) {
            UncheckedRunnable.newIoRunnable(this.input::close).run();
        }

        public void onComplete() {
            UncheckedRunnable.newIoRunnable(this.out::close).run();
        }

        InputStream inputStream() {
            Flowable.fromPublisher((Publisher)this.content).subscribeOn(this.scheduler).subscribe((FlowableSubscriber)this);
            return this.input;
        }
    }
}

