/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.stream;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.ipc.stream.Ipc;
import reactor.ipc.stream.IpcDone;
import reactor.ipc.stream.IpcInit;
import reactor.ipc.stream.StreamContext;
import reactor.ipc.stream.StreamOperationsImpl;
import reactor.util.Logger;
import reactor.util.Loggers;

abstract class IpcServiceMapper {
    static final Logger log = Loggers.getLogger(IpcServiceMapper.class);

    private IpcServiceMapper() {
    }

    public static void invokeInit(Object api, StreamContext<?> ctx) {
        for (Method m : api.getClass().getMethods()) {
            if (!m.isAnnotationPresent(IpcInit.class)) continue;
            if (m.getReturnType() == Void.TYPE && m.getParameterCount() == 1 && StreamContext.class.isAssignableFrom(m.getParameterTypes()[0])) {
                try {
                    m.invoke(api, ctx);
                }
                catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
                    if (log.isErrorEnabled()) {
                        log.error("", (Throwable)e);
                    }
                    throw new IllegalStateException(e);
                }
                return;
            }
            throw new IllegalStateException("IpcInit method has to be void and accepting only a single StreamContext parameter");
        }
    }

    public static void invokeDone(Object api, StreamContext<?> ctx) {
        for (Method m : api.getClass().getMethods()) {
            if (!m.isAnnotationPresent(IpcDone.class)) continue;
            if (m.getReturnType() == Void.TYPE && m.getParameterCount() == 1 && StreamContext.class.isAssignableFrom(m.getParameterTypes()[0])) {
                try {
                    m.invoke(api, ctx);
                }
                catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
                    if (log.isErrorEnabled()) {
                        log.error("", (Throwable)e);
                    }
                    throw new IllegalStateException(e);
                }
                return;
            }
            throw new IllegalStateException("IpcInit method has to be void and accepting only a single StreamContext parameter");
        }
    }

    public static Map<String, Object> serverServiceMap(Object api) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        for (Method m : api.getClass().getMethods()) {
            int pc;
            Class<?> rt;
            if (!m.isAnnotationPresent(Ipc.class)) continue;
            Ipc a = m.getAnnotation(Ipc.class);
            String name = m.getName();
            String aname = a.name();
            if (!aname.isEmpty()) {
                name = aname;
            }
            if ((rt = m.getReturnType()) == Void.TYPE) {
                pc = m.getParameterCount();
                if (pc == 2) {
                    if (StreamContext.class.isAssignableFrom(m.getParameterTypes()[0])) {
                        if (Publisher.class.isAssignableFrom(m.getParameterTypes()[1])) {
                            if (Flux.class.equals(m.getParameterTypes()[1])) {
                                result.put(name, new IpcServerReceiveFlux(m, api));
                                continue;
                            }
                            if (Mono.class.equals(m.getParameterTypes()[1])) {
                                result.put(name, new IpcServerReceiveMono(m, api));
                                continue;
                            }
                            result.put(name, new IpcServerReceive(m, api));
                            continue;
                        }
                        throw new IllegalStateException("Ipc annotated methods require a second Publisher as a parameter: " + m);
                    }
                    throw new IllegalStateException("Ipc annotated methods require a first StreamContext as a parameter: " + m);
                }
                throw new IllegalStateException("Ipc annotated methods require one StreamContext and one Publisher as a parameter: " + m);
            }
            if (Publisher.class.isAssignableFrom(rt)) {
                pc = m.getParameterCount();
                if (pc == 1) {
                    if (StreamContext.class.isAssignableFrom(m.getParameterTypes()[0])) {
                        result.put(name, new IpcServerSend(m, api));
                        continue;
                    }
                    throw new IllegalStateException("Ipc annotated methods require at one StreamContext as a parameter: " + m);
                }
                if (pc == 2) {
                    if (StreamContext.class.isAssignableFrom(m.getParameterTypes()[0])) {
                        if (Publisher.class.isAssignableFrom(m.getParameterTypes()[1])) {
                            if (Flux.class.equals(m.getParameterTypes()[1])) {
                                result.put(name, new IpcServerMapFlux(m, api));
                                continue;
                            }
                            if (Mono.class.equals(m.getParameterTypes()[1])) {
                                result.put(name, new IpcServerMapMono(m, api));
                                continue;
                            }
                            result.put(name, new IpcServerMap(m, api));
                            continue;
                        }
                        throw new IllegalStateException("Ipc annotated methods require the second parameter to be Publisher.");
                    }
                    throw new IllegalStateException("Ipc annotated methods require the first parameter to be StreamContext.");
                }
                throw new IllegalStateException("Ipc annotated methods require one StreamContext and one Publisher as a parameter: " + m);
            }
            throw new IllegalStateException("Ipc annotated methods require Publisher: " + m);
        }
        return result;
    }

    public static Map<String, Object> clientServiceMap(Class<?> api) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        for (Method m : api.getMethods()) {
            int pc;
            if (!m.isAnnotationPresent(Ipc.class)) continue;
            Ipc a = m.getAnnotation(Ipc.class);
            String name = m.getName();
            String aname = a.name();
            if (!aname.isEmpty()) {
                name = aname;
            }
            if (result.containsKey(name)) {
                throw new IllegalStateException("Overloads with the same target name are not supported");
            }
            Class<?> rt = m.getReturnType();
            if (rt == Void.TYPE) {
                pc = m.getParameterCount();
                if (pc == 0) {
                    throw new IllegalStateException("Ipc annotated void methods require at least one parameter");
                }
                if (pc == 1) {
                    if (Function.class.isAssignableFrom(m.getParameterTypes()[0])) {
                        String s = m.toGenericString();
                        if (s.contains("<" + Flux.class.getName())) {
                            result.put(name, new IpcClientUmapFlux());
                            continue;
                        }
                        if (s.contains("<" + Mono.class.getName())) {
                            result.put(name, new IpcClientUmapMono());
                            continue;
                        }
                        result.put(name, new IpcClientUmap());
                        continue;
                    }
                    if (Publisher.class.isAssignableFrom(m.getParameterTypes()[0])) {
                        result.put(name, new IpcClientSend());
                        continue;
                    }
                }
                throw new IllegalStateException("Ipc annotated methods returning a void require 1 parameter: " + m);
            }
            if (Publisher.class.isAssignableFrom(rt)) {
                pc = m.getParameterCount();
                if (pc > 1) {
                    throw new IllegalStateException("Ipc annotated methods returning a Publisher require 0 or 1 parameter: " + m);
                }
                if (pc == 0) {
                    if (Flux.class.equals(rt)) {
                        result.put(name, new IpcClientReceiveFlux());
                        continue;
                    }
                    if (Mono.class.equals(rt)) {
                        result.put(name, new IpcClientReceiveMono());
                        continue;
                    }
                    result.put(name, new IpcClientReceive());
                    continue;
                }
                if (Publisher.class.isAssignableFrom(m.getParameterTypes()[0])) {
                    if (Flux.class.equals(rt)) {
                        result.put(name, new IpcClientMapFlux());
                        continue;
                    }
                    if (Mono.class.equals(rt)) {
                        result.put(name, new IpcClientMapMono());
                        continue;
                    }
                    result.put(name, new IpcClientMap());
                    continue;
                }
                throw new IllegalStateException("Ipc annotated methods returning a Publisher allows only Publisher as parameter: " + m);
            }
            throw new IllegalStateException("Ipc annotated methods require Publisher: " + m);
        }
        return result;
    }

    public static boolean dispatchServer(long streamId, Object action, StreamOperationsImpl io, StreamContext<?> ctx) {
        if (action instanceof IpcServerSend) {
            IpcServerSend rpcServerSend = (IpcServerSend)action;
            return rpcServerSend.send(streamId, ctx, io);
        }
        if (action instanceof IpcServerReceive) {
            IpcServerReceive rpcServerReceive = (IpcServerReceive)action;
            return rpcServerReceive.receive(streamId, ctx, io);
        }
        if (action instanceof IpcServerMap) {
            IpcServerMap rpcServerMap = (IpcServerMap)action;
            return rpcServerMap.map(streamId, ctx, io);
        }
        if (log.isErrorEnabled()) {
            log.error("", (Throwable)new IllegalStateException("Unsupported action: " + action.getClass()));
        }
        return false;
    }

    public static Publisher<?> dispatchClient(String name, Object action, Object[] args, StreamOperationsImpl io) {
        if (action instanceof IpcClientSend) {
            if (args[0] == null) {
                throw new NullPointerException("The source Publisher is null");
            }
            IpcClientSend rpcSend = (IpcClientSend)action;
            rpcSend.send(name, (Publisher)args[0], io);
            return null;
        }
        if (action instanceof IpcClientReceive) {
            IpcClientReceive rpcReceive = (IpcClientReceive)action;
            return rpcReceive.receive(name, io);
        }
        if (action instanceof IpcClientMap) {
            if (args[0] == null) {
                throw new NullPointerException("The source Publisher is null");
            }
            IpcClientMap rpcMap = (IpcClientMap)action;
            return rpcMap.map(name, (Publisher)args[0], io);
        }
        if (action instanceof IpcClientUmap) {
            if (args[0] == null) {
                throw new NullPointerException("The umapper function is null");
            }
            IpcClientUmap rpcUmap = (IpcClientUmap)action;
            Function f = (Function)args[0];
            rpcUmap.umap(name, f, io);
            return null;
        }
        throw new IllegalStateException("Unsupported action class: " + action.getClass());
    }

    static final class IpcServerMapFlux
    extends IpcServerMap {
        public IpcServerMapFlux(Method m, Object instance) {
            super(m, instance);
        }

        @Override
        Publisher<?> producer(long streamId, AtomicInteger innerOnce, IpcServerMap.ServerSendSubscriber sender, StreamOperationsImpl io) {
            return Flux.from(super.producer(streamId, innerOnce, sender, io));
        }
    }

    static final class IpcServerMapMono
    extends IpcServerMap {
        public IpcServerMapMono(Method m, Object instance) {
            super(m, instance);
        }

        @Override
        Publisher<?> producer(long streamId, AtomicInteger innerOnce, IpcServerMap.ServerSendSubscriber sender, StreamOperationsImpl io) {
            return Mono.from(super.producer(streamId, innerOnce, sender, io));
        }
    }

    static class IpcServerMap {
        final Method m;
        final Object instance;

        public IpcServerMap(Method m, Object instance) {
            this.m = m;
            this.instance = instance;
        }

        Publisher<?> producer(long streamId, AtomicInteger innerOnce, ServerSendSubscriber sender, StreamOperationsImpl io) {
            ServerMapSubscriber parent = new ServerMapSubscriber(streamId, io, innerOnce);
            parent.sender = sender;
            io.registerSubscriber(streamId, parent);
            io.registerSubscription(streamId, (Subscription)sender);
            AtomicBoolean once = new AtomicBoolean();
            return s -> {
                if (once.compareAndSet(false, true)) {
                    parent.actual = s;
                    s.onSubscribe(parent.s);
                } else {
                    Operators.error((Subscriber)s, (Throwable)new IllegalStateException("This Publisher allows only a single subscriber"));
                }
            };
        }

        final boolean map(long streamId, StreamContext<?> ctx, StreamOperationsImpl io) {
            Publisher u;
            AtomicInteger innerOnce = new AtomicInteger(2);
            ServerSendSubscriber sender = new ServerSendSubscriber(streamId, io, innerOnce);
            Publisher<?> p = this.producer(streamId, innerOnce, sender, io);
            try {
                u = (Publisher)this.m.invoke(this.instance, ctx, p);
            }
            catch (Throwable ex) {
                if (log.isErrorEnabled()) {
                    log.error("", ex);
                }
                u = s -> Operators.error((Subscriber)s, (Throwable)ex);
            }
            if (u == null) {
                u = s -> Operators.error((Subscriber)s, (Throwable)new NullPointerException("The service implementation returned a null Publisher"));
            }
            u.subscribe((Subscriber)sender);
            return true;
        }

        static final class ServerSendSubscriber
        extends Operators.DeferredSubscription
        implements Subscriber<Object> {
            final long streamId;
            final StreamOperationsImpl io;
            final AtomicInteger once;
            boolean done;

            public ServerSendSubscriber(long streamId, StreamOperationsImpl io, AtomicInteger once) {
                this.streamId = streamId;
                this.io = io;
                this.once = once;
            }

            public void onSubscribe(Subscription s) {
                this.set(s);
            }

            public void onNext(Object t) {
                if (this.done) {
                    return;
                }
                try {
                    this.io.sendNext(this.streamId, t);
                }
                catch (IOException ex) {
                    this.cancel();
                    this.onError(ex);
                }
            }

            public void onError(Throwable t) {
                if (this.done) {
                    Operators.onErrorDropped((Throwable)t);
                    return;
                }
                this.done = true;
                if (this.once.decrementAndGet() == 0) {
                    this.io.deregister(this.streamId);
                }
                this.io.sendError(this.streamId, t);
            }

            public void onComplete() {
                if (this.done) {
                    return;
                }
                this.done = true;
                if (this.once.decrementAndGet() == 0) {
                    this.io.deregister(this.streamId);
                }
                this.io.sendComplete(this.streamId);
            }
        }

        static final class ServerMapSubscriber
        implements Subscriber<Object>,
        Subscription {
            final long streamId;
            final StreamOperationsImpl io;
            final AtomicInteger once;
            final Subscription s;
            Subscriber<Object> actual;
            ServerSendSubscriber sender;

            public ServerMapSubscriber(long streamId, StreamOperationsImpl io, AtomicInteger once) {
                this.streamId = streamId;
                this.io = io;
                this.once = once;
                this.s = new Subscription(){

                    public void request(long n) {
                        this.innerRequest(n);
                    }

                    public void cancel() {
                        this.innerCancel();
                    }
                };
            }

            public void onSubscribe(Subscription s) {
            }

            public void onNext(Object t) {
                this.actual.onNext(t);
            }

            public void onError(Throwable t) {
                if (this.once.decrementAndGet() == 0) {
                    this.io.deregister(this.streamId);
                }
                this.actual.onError(t);
            }

            public void onComplete() {
                if (this.once.decrementAndGet() == 0) {
                    this.io.deregister(this.streamId);
                }
                this.actual.onComplete();
            }

            public void innerRequest(long n) {
                if (Operators.validate((long)n)) {
                    this.io.sendRequested(this.streamId, n);
                }
            }

            public void innerCancel() {
                if (this.once.decrementAndGet() == 0) {
                    this.io.deregister(this.streamId);
                }
                this.io.sendCancel(this.streamId, "");
            }

            public void request(long n) {
                this.sender.request(n);
            }

            public void cancel() {
                this.sender.cancel();
            }
        }
    }

    static final class IpcServerReceiveMono
    extends IpcServerReceive {
        public IpcServerReceiveMono(Method m, Object instance) {
            super(m, instance);
        }

        @Override
        Publisher<?> producer(long streamId, StreamOperationsImpl io) {
            return Mono.from(super.producer(streamId, io));
        }
    }

    static final class IpcServerReceiveFlux
    extends IpcServerReceive {
        public IpcServerReceiveFlux(Method m, Object instance) {
            super(m, instance);
        }

        @Override
        Publisher<?> producer(long streamId, StreamOperationsImpl io) {
            return Flux.from(super.producer(streamId, io));
        }
    }

    static class IpcServerReceive {
        final Method m;
        final Object instance;

        public IpcServerReceive(Method m, Object instance) {
            this.m = m;
            this.instance = instance;
        }

        Publisher<?> producer(long streamId, StreamOperationsImpl io) {
            ServerReceiveSubscriber parent = new ServerReceiveSubscriber(streamId, io);
            AtomicBoolean once = new AtomicBoolean();
            return s -> {
                if (once.compareAndSet(false, true)) {
                    parent.actual = s;
                    io.registerSubscriber(streamId, parent);
                    s.onSubscribe((Subscription)parent);
                } else {
                    Operators.error((Subscriber)s, (Throwable)new IllegalStateException("This Publisher allows only a single subscriber"));
                }
            };
        }

        final boolean receive(long streamId, StreamContext<?> ctx, StreamOperationsImpl io) {
            Publisher<?> p = this.producer(streamId, io);
            try {
                this.m.invoke(this.instance, ctx, p);
            }
            catch (Throwable ex) {
                if (log.isErrorEnabled()) {
                    log.error("", ex);
                }
                io.sendCancel(streamId, ex.toString());
            }
            return true;
        }

        static final class ServerReceiveSubscriber
        implements Subscriber<Object>,
        Subscription {
            final long streamId;
            final StreamOperationsImpl io;
            Subscriber<Object> actual;

            public ServerReceiveSubscriber(long streamId, StreamOperationsImpl io) {
                this.streamId = streamId;
                this.io = io;
            }

            public void onSubscribe(Subscription s) {
            }

            public void onNext(Object t) {
                this.actual.onNext(t);
            }

            public void onError(Throwable t) {
                this.io.deregister(this.streamId);
                this.actual.onError(t);
            }

            public void onComplete() {
                this.io.deregister(this.streamId);
                this.actual.onComplete();
            }

            public void request(long n) {
                if (Operators.validate((long)n)) {
                    this.io.sendRequested(this.streamId, n);
                }
            }

            public void cancel() {
                if (this.io.deregister(this.streamId)) {
                    this.io.sendCancel(this.streamId, "");
                }
            }
        }
    }

    static final class IpcServerSend {
        final Method m;
        final Object instance;

        public IpcServerSend(Method m, Object instance) {
            this.m = m;
            this.instance = instance;
        }

        public boolean send(long streamId, StreamContext<?> ctx, StreamOperationsImpl io) {
            Publisher output;
            try {
                output = (Publisher)this.m.invoke(this.instance, ctx);
            }
            catch (Throwable ex) {
                if (log.isErrorEnabled()) {
                    log.error("", ex);
                }
                io.sendError(streamId, ex);
                return true;
            }
            if (output == null) {
                io.sendError(streamId, new IllegalStateException("The service implementation returned a null Publisher"));
                return true;
            }
            ServerSendSubscriber parent = new ServerSendSubscriber(streamId, io);
            io.registerSubscription(streamId, (Subscription)parent);
            output.subscribe((Subscriber)parent);
            return true;
        }

        static final class ServerSendSubscriber
        extends Operators.DeferredSubscription
        implements Subscriber<Object> {
            final long streamId;
            final StreamOperationsImpl io;
            boolean done;

            public ServerSendSubscriber(long streamId, StreamOperationsImpl io) {
                this.streamId = streamId;
                this.io = io;
            }

            public void onSubscribe(Subscription s) {
                this.set(s);
            }

            public void onNext(Object t) {
                if (this.done) {
                    return;
                }
                try {
                    this.io.sendNext(this.streamId, t);
                }
                catch (IOException ex) {
                    this.cancel();
                    this.onError(ex);
                }
            }

            public void onError(Throwable t) {
                if (this.done) {
                    Operators.onErrorDropped((Throwable)t);
                    return;
                }
                this.done = true;
                this.io.deregister(this.streamId);
                this.io.sendError(this.streamId, t);
            }

            public void onComplete() {
                if (this.done) {
                    return;
                }
                this.done = true;
                this.io.deregister(this.streamId);
                this.io.sendComplete(this.streamId);
            }
        }
    }

    static final class IpcClientUmapMono
    extends IpcClientUmap {
        IpcClientUmapMono() {
        }

        @Override
        Publisher<?> producer(IpcClientUmap.IpcUmapReceiver receiver) {
            return Mono.from(super.producer(receiver));
        }
    }

    static final class IpcClientUmapFlux
    extends IpcClientUmap {
        IpcClientUmapFlux() {
        }

        @Override
        Publisher<?> producer(IpcClientUmap.IpcUmapReceiver receiver) {
            return Flux.from(super.producer(receiver));
        }
    }

    static class IpcClientUmap {
        IpcClientUmap() {
        }

        Publisher<?> producer(IpcUmapReceiver receiver) {
            AtomicBoolean once = new AtomicBoolean();
            return s -> {
                if (once.compareAndSet(false, true)) {
                    receiver.actual = s;
                    s.onSubscribe(receiver.s);
                } else {
                    Operators.error((Subscriber)s, (Throwable)new IllegalStateException("Only one subscriber allowed"));
                }
            };
        }

        final void umap(String function, Function<Publisher<?>, Publisher<?>> mapper, StreamOperationsImpl io) {
            Publisher u;
            long streamId = io.newStreamId();
            AtomicBoolean onceInner = new AtomicBoolean();
            IpcUmapReceiver receiver = new IpcUmapReceiver(streamId, io, onceInner);
            receiver.provider = new IpcUmapProvider(streamId, io, onceInner);
            io.registerSubscriber(streamId, receiver);
            io.registerSubscription(streamId, receiver);
            io.sendNew(streamId, function);
            Publisher<?> p = this.producer(receiver);
            try {
                u = mapper.apply(p);
            }
            catch (Throwable ex) {
                u = w -> Operators.error((Subscriber)w, (Throwable)ex);
            }
            if (u == null) {
                u = w -> Operators.error((Subscriber)w, (Throwable)new NullPointerException("The umapper returned a null Publisher"));
            }
            u.subscribe((Subscriber)receiver.provider);
        }

        static final class IpcUmapProvider
        extends Operators.DeferredSubscription
        implements Subscriber<Object> {
            final long streamId;
            final StreamOperationsImpl io;
            final AtomicBoolean once;
            boolean done;

            public IpcUmapProvider(long streamId, StreamOperationsImpl io, AtomicBoolean once) {
                this.streamId = streamId;
                this.io = io;
                this.once = once;
            }

            public void onSubscribe(Subscription s) {
                this.set(s);
            }

            public void onNext(Object t) {
                if (this.done) {
                    return;
                }
                try {
                    this.io.sendNext(this.streamId, t);
                }
                catch (IOException ex) {
                    this.onError(ex);
                }
            }

            public void onError(Throwable t) {
                if (this.done) {
                    Operators.onErrorDropped((Throwable)t);
                    return;
                }
                this.done = true;
                this.cancel();
                this.io.deregister(this.streamId);
                if (this.once.compareAndSet(false, true)) {
                    this.io.sendCancel(this.streamId, "");
                }
                this.io.sendError(this.streamId, t);
            }

            public void onComplete() {
                if (this.done) {
                    return;
                }
                this.done = true;
                this.cancel();
                this.io.deregister(this.streamId);
                if (this.once.compareAndSet(false, true)) {
                    this.io.sendCancel(this.streamId, "");
                }
                this.io.sendComplete(this.streamId);
            }
        }

        static final class IpcUmapReceiver
        implements Subscriber<Object>,
        Subscription {
            final long streamId;
            final StreamOperationsImpl io;
            final AtomicBoolean once;
            Subscriber<Object> actual;
            IpcUmapProvider provider;
            Subscription s;

            public IpcUmapReceiver(final long streamId, final StreamOperationsImpl io, final AtomicBoolean once) {
                this.streamId = streamId;
                this.io = io;
                this.once = once;
                this.s = new Subscription(){

                    public void request(long n) {
                        if (Operators.validate((long)n)) {
                            io.sendRequested(streamId, n);
                        }
                    }

                    public void cancel() {
                        if (once.compareAndSet(false, true)) {
                            io.sendCancel(streamId, "");
                        }
                    }
                };
            }

            public void onSubscribe(Subscription s) {
            }

            public void onNext(Object t) {
                this.actual.onNext(t);
            }

            public void onError(Throwable t) {
                this.once.set(true);
                this.actual.onError(t);
            }

            public void onComplete() {
                this.once.set(true);
                this.actual.onComplete();
            }

            public void request(long n) {
                this.provider.request(n);
            }

            public void cancel() {
                this.provider.cancel();
            }
        }
    }

    static final class IpcClientMapFlux
    extends IpcClientMap {
        IpcClientMapFlux() {
        }

        @Override
        public Publisher<?> map(String function, Publisher<?> values, StreamOperationsImpl io) {
            return Flux.from(super.map(function, values, io));
        }
    }

    static final class IpcClientMapMono
    extends IpcClientMap {
        IpcClientMapMono() {
        }

        @Override
        public Publisher<?> map(String function, Publisher<?> values, StreamOperationsImpl io) {
            return Mono.from(super.map(function, values, io));
        }
    }

    static class IpcClientMap {
        IpcClientMap() {
        }

        public Publisher<?> map(String function, Publisher<?> values, StreamOperationsImpl io) {
            return s -> {
                IpcMapSubscriber sender;
                long streamId = io.newStreamId();
                AtomicInteger open = new AtomicInteger(2);
                IpcMapReceiverSubscriber receiver = new IpcMapReceiverSubscriber((Subscriber<Object>)s, streamId, open, io);
                receiver.sender = sender = new IpcMapSubscriber(streamId, open, io);
                io.registerSubscriber(streamId, receiver);
                io.registerSubscription(streamId, (Subscription)sender);
                io.sendNew(streamId, function);
                s.onSubscribe(receiver.s);
                values.subscribe((Subscriber)sender);
            };
        }

        static final class IpcMapReceiverSubscriber
        implements Subscriber<Object>,
        Subscription {
            final Subscriber<Object> actual;
            final long streamId;
            final AtomicInteger open;
            final StreamOperationsImpl io;
            Subscription s;
            IpcMapSubscriber sender;

            public IpcMapReceiverSubscriber(Subscriber<Object> actual, long streamId, AtomicInteger open, StreamOperationsImpl io) {
                this.actual = actual;
                this.streamId = streamId;
                this.open = open;
                this.io = io;
                this.s = new Subscription(){

                    public void request(long n) {
                        this.innerRequest(n);
                    }

                    public void cancel() {
                        this.innerCancel();
                    }
                };
            }

            void innerRequest(long n) {
                if (Operators.validate((long)n)) {
                    this.io.sendRequested(this.streamId, n);
                }
            }

            void innerCancel() {
                if (this.open.decrementAndGet() == 0) {
                    this.io.deregister(this.streamId);
                }
                this.io.sendCancel(this.streamId, "");
            }

            public void onSubscribe(Subscription s) {
            }

            public void onNext(Object t) {
                this.actual.onNext(t);
            }

            public void onError(Throwable t) {
                if (this.open.decrementAndGet() == 0) {
                    this.io.deregister(this.streamId);
                }
                this.actual.onError(t);
            }

            public void onComplete() {
                if (this.open.decrementAndGet() == 0) {
                    this.io.deregister(this.streamId);
                }
                this.actual.onComplete();
            }

            public void request(long n) {
                this.sender.request(n);
            }

            public void cancel() {
                this.sender.cancel();
            }
        }

        static final class IpcMapSubscriber
        extends Operators.DeferredSubscription
        implements Subscriber<Object> {
            final long streamId;
            final AtomicInteger open;
            final StreamOperationsImpl io;
            boolean done;

            public IpcMapSubscriber(long streamId, AtomicInteger open, StreamOperationsImpl io) {
                this.streamId = streamId;
                this.open = open;
                this.io = io;
            }

            public void onSubscribe(Subscription s) {
                super.set(s);
            }

            public void onNext(Object t) {
                if (this.done) {
                    return;
                }
                try {
                    this.io.sendNext(this.streamId, t);
                }
                catch (IOException ex) {
                    this.cancel();
                    this.onError(ex);
                }
            }

            public void onError(Throwable t) {
                if (this.done) {
                    Operators.onErrorDropped((Throwable)t);
                    return;
                }
                this.done = true;
                if (this.open.decrementAndGet() == 0) {
                    this.io.deregister(this.streamId);
                }
                this.io.sendError(this.streamId, t);
            }

            public void onComplete() {
                if (this.done) {
                    return;
                }
                this.done = true;
                if (this.open.decrementAndGet() == 0) {
                    this.io.deregister(this.streamId);
                }
                this.io.sendComplete(this.streamId);
            }
        }
    }

    static final class IpcClientReceiveFlux
    extends IpcClientReceive {
        IpcClientReceiveFlux() {
        }

        @Override
        public Publisher<?> receive(String function, StreamOperationsImpl io) {
            return Flux.from(super.receive(function, io));
        }
    }

    static final class IpcClientReceiveMono
    extends IpcClientReceive {
        IpcClientReceiveMono() {
        }

        @Override
        public Publisher<?> receive(String function, StreamOperationsImpl io) {
            return Mono.from(super.receive(function, io));
        }
    }

    static class IpcClientReceive {
        IpcClientReceive() {
        }

        public Publisher<?> receive(String function, StreamOperationsImpl io) {
            return s -> {
                long streamId = io.newStreamId();
                io.registerSubscriber(streamId, s);
                io.sendNew(streamId, function);
                s.onSubscribe((Subscription)new IpcReceiveSubscription(streamId, io));
            };
        }

        static final class IpcReceiveSubscription
        implements Subscription {
            final long streamId;
            final StreamOperationsImpl io;

            public IpcReceiveSubscription(long streamId, StreamOperationsImpl io) {
                this.streamId = streamId;
                this.io = io;
            }

            public void request(long n) {
                if (Operators.validate((long)n)) {
                    this.io.sendRequested(this.streamId, n);
                }
            }

            public void cancel() {
                if (this.io.deregister(this.streamId)) {
                    this.io.sendCancel(this.streamId, "");
                }
            }
        }
    }

    static final class IpcClientSend {
        IpcClientSend() {
        }

        public static void sendStatic(String function, Publisher<?> values, StreamOperationsImpl io) {
            long streamId = io.newStreamId();
            SendSubscriber s = new SendSubscriber(io, streamId);
            io.registerSubscription(streamId, (Subscription)s);
            io.sendNew(streamId, function);
            values.subscribe((Subscriber)s);
        }

        public void send(String function, Publisher<?> values, StreamOperationsImpl io) {
            IpcClientSend.sendStatic(function, values, io);
        }

        static final class SendSubscriber
        extends Operators.DeferredSubscription
        implements Subscriber<Object> {
            final StreamOperationsImpl io;
            final long streamId;
            boolean done;

            public SendSubscriber(StreamOperationsImpl io, long streamId) {
                this.io = io;
                this.streamId = streamId;
            }

            public void onSubscribe(Subscription s) {
                super.set(s);
            }

            public void onNext(Object t) {
                if (this.done) {
                    return;
                }
                try {
                    this.io.sendNext(this.streamId, t);
                }
                catch (IOException ex) {
                    this.cancel();
                    this.onError(ex);
                }
            }

            public void onError(Throwable t) {
                if (this.done) {
                    Operators.onErrorDropped((Throwable)t);
                    return;
                }
                this.done = true;
                this.io.deregister(this.streamId);
                this.io.sendError(this.streamId, t);
            }

            public void onComplete() {
                if (this.done) {
                    return;
                }
                this.done = true;
                this.io.deregister(this.streamId);
                this.io.sendComplete(this.streamId);
            }
        }
    }
}

