/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.strands.channels.reactivestreams;

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.FiberFactory;
import co.paralleluniverse.fibers.Instrumented;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.SuspendableAction2;
import co.paralleluniverse.strands.SuspendableCallable;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.ReceivePort;
import co.paralleluniverse.strands.channels.SendPort;
import co.paralleluniverse.strands.channels.reactivestreams.ChannelPublisher;
import co.paralleluniverse.strands.channels.reactivestreams.ChannelSubscriber;
import co.paralleluniverse.strands.channels.reactivestreams.ChannelSubscription;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@Instrumented
class ChannelProcessor<T, R>
implements Processor<T, R> {
    private final ChannelSubscriber<T> subscriber;
    private final ChannelPublisher<R> publisher;
    private final FiberFactory ff;
    private final SuspendableAction2<? extends ReceivePort<? super T>, ? extends SendPort<? extends R>> transformer;
    private final ReceivePort<T> in;
    private final SendPort<R> out;
    private final AtomicInteger connectedEnds = new AtomicInteger();
    private volatile Subscription subscription;
    private static final FiberFactory defaultFiberFactory = new FiberFactory(){

        public <T> Fiber<T> newFiber(SuspendableCallable<T> target) {
            return new Fiber(target);
        }
    };

    public ChannelProcessor(FiberFactory ff, boolean batch, Channel<T> in, Channel<R> out, SuspendableAction2<? extends ReceivePort<? super T>, ? extends SendPort<? extends R>> transformer) {
        this.ff = ff != null ? ff : defaultFiberFactory;
        this.transformer = transformer;
        this.subscriber = new ChannelSubscriber<T>(in, batch){

            @Override
            protected void failedSubscribe(Subscription s) {
                super.failedSubscribe(s);
                throw new FailedSubscriptionException();
            }
        };
        this.publisher = new ChannelPublisher<R>(ff, out, true){

            @Override
            protected void failedSubscribe(Subscriber<? super R> s, Throwable t) {
                super.failedSubscribe(s, t);
                throw new FailedSubscriptionException();
            }

            @Override
            protected ChannelSubscription<R> newChannelSubscription(Subscriber<? super R> s, Object channel) {
                return new ChannelSubscription<R>(s, (ReceivePort)channel){

                    @Override
                    public void cancel() {
                        super.cancel();
                        Subscription ms = ChannelProcessor.this.subscription;
                        if (ms != null) {
                            ms.cancel();
                        }
                    }
                };
            }
        };
        this.in = this.subscriber;
        this.out = out;
    }

    private void connected() {
        int connections = this.connectedEnds.incrementAndGet();
        if (connections == 2) {
            this.start();
        }
        if (connections > 2) {
            throw new AssertionError();
        }
    }

    private void start() {
        this.ff.newFiber((SuspendableCallable)new SuspendableCallable<Void>(){

            /*
             * Exception decompiling
             */
            @Instrumented(methodOptimized=false, methodStart=93, methodEnd=101, suspendableCallSites={93, 93, 93, 93, 94, 96, 98, 100}, suspendableCallSiteNames={"co/paralleluniverse/strands/channels/reactivestreams/ChannelProcessor.access$200(Lco/paralleluniverse/strands/channels/reactivestreams/ChannelProcessor;)Lco/paralleluniverse/strands/channels/ReceivePort;", "co/paralleluniverse/strands/SuspendableAction2.call(Ljava/lang/Object;Ljava/lang/Object;)V", "co/paralleluniverse/strands/channels/reactivestreams/ChannelProcessor.access$400(Lco/paralleluniverse/strands/channels/reactivestreams/ChannelProcessor;)Lco/paralleluniverse/strands/SuspendableAction2;", "co/paralleluniverse/strands/channels/reactivestreams/ChannelProcessor.access$300(Lco/paralleluniverse/strands/channels/reactivestreams/ChannelProcessor;)Lco/paralleluniverse/strands/channels/SendPort;"}, suspendableCallSitesOffsetsAfterInstr={104, 149, 207, 261, 295, 352, 413, 452})
            public Void run() throws SuspendExecution, InterruptedException {
                /*
                 * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
                 * 
                 * org.benf.cfr.reader.util.ConfusedCFRException: non catch before exception catch block
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op02WithProcessedDataAndRefs.insertExceptionBlocks(Op02WithProcessedDataAndRefs.java:2354)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:415)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
                 *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
                 *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
                 *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
                 *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
                 *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
                 *     at org.benf.cfr.reader.Main.main(Main.java:54)
                 */
                throw new IllegalStateException("Decompilation failed");
            }
        }).start();
    }

    public void subscribe(Subscriber<? super R> s) {
        try {
            this.publisher.subscribe(s);
            this.connected();
        }
        catch (FailedSubscriptionException failedSubscriptionException) {
            // empty catch block
        }
    }

    public void onSubscribe(Subscription s) {
        try {
            this.subscriber.onSubscribe(s);
            this.subscription = s;
            this.connected();
        }
        catch (FailedSubscriptionException failedSubscriptionException) {
            // empty catch block
        }
    }

    public void onError(Throwable cause) {
        this.subscriber.onError(cause);
    }

    public void onComplete() {
        this.subscriber.onComplete();
    }

    static /* synthetic */ ReceivePort access$200(ChannelProcessor x0) {
        return x0.in;
    }

    static /* synthetic */ SendPort access$300(ChannelProcessor x0) {
        return x0.out;
    }

    static /* synthetic */ SuspendableAction2 access$400(ChannelProcessor x0) {
        return x0.transformer;
    }

    /*
     * Exception decompiling
     */
    @Suspendable
    @Instrumented(methodOptimized=false, methodStart=128, methodEnd=129, suspendableCallSites={128}, suspendableCallSiteNames={"co/paralleluniverse/strands/channels/reactivestreams/ChannelSubscriber.onNext(Ljava/lang/Object;)V"}, suspendableCallSitesOffsetsAfterInstr={99})
    public void onNext(T var1_1) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: non catch before exception catch block
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op02WithProcessedDataAndRefs.insertExceptionBlocks(Op02WithProcessedDataAndRefs.java:2354)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:415)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private static class FailedSubscriptionException
    extends RuntimeException {
        private FailedSubscriptionException() {
        }
    }
}

