/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.nexus;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.util.AsciiString;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.Loopback;
import reactor.core.publisher.BlockingSink;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.core.scheduler.TimedScheduler;
import reactor.ipc.Channel;
import reactor.ipc.netty.common.DuplexSocket;
import reactor.ipc.netty.common.NettyInbound;
import reactor.ipc.netty.http.HttpChannel;
import reactor.ipc.netty.http.HttpClient;
import reactor.ipc.netty.http.HttpServer;
import reactor.ipc.util.FlowSerializerUtils;
import reactor.util.Logger;
import reactor.util.Loggers;

public final class Nexus
extends DuplexSocket<ByteBuf, ByteBuf, Channel<ByteBuf, ByteBuf>>
implements Function<HttpChannel, Publisher<Void>>,
Loopback {
    final HttpServer server;
    final GraphEvent lastState;
    final SystemEvent lastSystemState;
    final FluxProcessor<Event, Event> eventStream;
    final Scheduler group;
    final Function<Event, Event> lastStateMerge;
    final TimedScheduler timer;
    final BlockingSink<Publisher<Event>> cannons;
    static final AsciiString ALL = new AsciiString((CharSequence)"*");
    volatile FederatedClient[] federatedClients;
    long systemStatsPeriod;
    boolean systemStats;
    long websocketCapacity = 1L;
    static final AtomicReferenceFieldUpdater<Nexus, FederatedClient[]> FEDERATED = AtomicReferenceFieldUpdater.newUpdater(Nexus.class, FederatedClient[].class, "federatedClients");
    static final Logger log = Loggers.getLogger(Nexus.class);
    static final String API_STREAM_URL = "/nexus/stream";
    static final Function<Event, ByteBuf> BUFFER_STRING_FUNCTION = new StringToBuffer();

    public static Nexus create() {
        return Nexus.create("127.0.0.1");
    }

    public static Nexus create(int port) {
        return Nexus.create("127.0.0.1", port);
    }

    public static Nexus create(String bindAddress) {
        return Nexus.create(bindAddress, DEFAULT_PORT);
    }

    public static Nexus create(String bindAddress, int port) {
        return Nexus.create(HttpServer.create(bindAddress, port));
    }

    public static Nexus create(HttpServer server) {
        Nexus nexus = new Nexus(server);
        log.info("Warping Nexus...");
        server.get(API_STREAM_URL, nexus);
        return nexus;
    }

    public static void main(String ... args) throws Exception {
        log.info("Deploying Nexus... ");
        Nexus nexus = Nexus.create();
        CountDownLatch stopped = new CountDownLatch(1);
        nexus.startAndAwait();
        log.info("CTRL-C to return...");
        stopped.await();
    }

    Nexus(HttpServer server) {
        this.server = server;
        this.eventStream = EmitterProcessor.create((boolean)false);
        this.lastStateMerge = new LastGraphStateMap();
        this.timer = Schedulers.newTimer((String)"nexus-poller");
        this.group = Schedulers.newParallel((String)"nexus", (int)4);
        EmitterProcessor cannons = EmitterProcessor.create();
        Flux.merge((Publisher)cannons).subscribe(this.eventStream);
        this.cannons = cannons.connectSink();
        this.lastState = new GraphEvent(server.getListenAddress().toString(), FlowSerializerUtils.createGraph());
        this.lastSystemState = new SystemEvent(server.getListenAddress().toString());
    }

    @Override
    public Publisher<Void> apply(HttpChannel channel) {
        channel.responseHeader((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, (CharSequence)ALL);
        Flux eventStream = this.eventStream.publishOn(this.group).map(this.lastStateMerge);
        Mono p = channel.isWebsocket() ? channel.upgradeToTextWebsocket().then(channel.send((Publisher)this.federateAndEncode(channel, (Flux<Event>)eventStream))) : channel.send((Publisher)this.federateAndEncode(channel, (Flux<Event>)eventStream));
        channel.receiveString().subscribe(command -> {
            int indexArg = command.indexOf("\n");
            if (indexArg > 0) {
                String action = command.substring(0, indexArg);
                String arg = command.length() > indexArg ? command.substring(indexArg + 1) : null;
                log.info("Received [" + action + "] [" + arg + ']');
            }
        });
        return p;
    }

    public Object connectedInput() {
        return this.eventStream;
    }

    public Object connectedOutput() {
        return this.server;
    }

    public final Nexus federate(String ... urls) {
        FederatedClient[] newClients;
        FederatedClient[] clients;
        if (urls == null || urls.length == 0) {
            return this;
        }
        do {
            int n = (clients = this.federatedClients) != null ? clients.length : 0;
            newClients = new FederatedClient[n + urls.length];
            if (n > 0) {
                System.arraycopy(clients, 0, newClients, 0, n);
            }
            for (int i = n; i < newClients.length; ++i) {
                newClients[i] = new FederatedClient(urls[i - n]);
            }
        } while (!FEDERATED.compareAndSet(this, clients, newClients));
        return this;
    }

    public HttpServer getServer() {
        return this.server;
    }

    public final BlockingSink<Object> metricCannon() {
        UnicastProcessor p = UnicastProcessor.create();
        this.cannons.submit((Object)p.map((Function)new MetricMapper()));
        return p.connectSink();
    }

    public final <E> E monitor(E o) {
        return this.monitor(o, -1L);
    }

    public final <E> E monitor(E o, long period) {
        return this.monitor(o, period, null);
    }

    public final <E> E monitor(E o, long period, TimeUnit unit) {
        long _period = period > 0L ? period : 400L;
        UnicastProcessor p = UnicastProcessor.create();
        log.info("State Monitoring Starting on " + FlowSerializerUtils.getName(o));
        this.timer.schedulePeriodically(() -> {
            if (p.isCancelled()) {
                log.info("State Monitoring stopping on " + FlowSerializerUtils.getName((Object)o));
                throw Exceptions.failWithCancel();
            }
            p.onNext((Object)FlowSerializerUtils.scan((Object)o));
        }, 0L, _period, unit != null ? unit : TimeUnit.MILLISECONDS);
        this.cannons.submit((Object)p.map((Function)new GraphMapper()));
        return o;
    }

    public final Mono<Void> start() throws InterruptedException {
        return this.start(null);
    }

    public final void startAndAwait() throws InterruptedException {
        this.start().block();
        InetSocketAddress addr = this.server.getListenAddress();
        log.info("Nexus Warped. Transmitting signal to troops under http://" + addr.getHostName() + ":" + addr.getPort() + API_STREAM_URL);
    }

    public final BlockingSink<Object> streamCannon() {
        UnicastProcessor p = UnicastProcessor.create();
        this.cannons.submit((Object)p.map((Function)new GraphMapper()));
        return p.connectSink();
    }

    public final Nexus useCapacity(long capacity) {
        this.websocketCapacity = capacity;
        return this;
    }

    public final Nexus withSystemStats() {
        return this.withSystemStats(true, 1L);
    }

    public final Nexus withSystemStats(boolean enabled, long period) {
        return this.withSystemStats(enabled, period, TimeUnit.SECONDS);
    }

    public final Nexus withSystemStats(boolean enabled, long period, TimeUnit unit) {
        this.systemStatsPeriod = unit == null || period < 1L ? 1000L : TimeUnit.MILLISECONDS.convert(period, unit);
        this.systemStats = enabled;
        return this;
    }

    @Override
    protected Mono<Void> doStart(Function<? super Channel<ByteBuf, ByteBuf>, ? extends Publisher<Void>> handler) {
        if (this.systemStats) {
            UnicastProcessor p = UnicastProcessor.create();
            this.cannons.submit((Object)p);
            log.info("System Monitoring Starting");
            this.timer.schedulePeriodically(() -> {
                if (p.isCancelled()) {
                    log.info("System Monitoring Stopped");
                    throw Exceptions.failWithCancel();
                }
                p.onNext((Object)this.lastSystemState.scan());
            }, 0L, this.systemStatsPeriod, TimeUnit.MILLISECONDS);
        }
        return this.server.start();
    }

    @Override
    protected Mono<Void> doShutdown() {
        this.timer.shutdown();
        this.cannons.finish();
        this.eventStream.onComplete();
        return this.server.shutdown();
    }

    Flux<? extends ByteBuf> federateAndEncode(HttpChannel c, Flux<Event> stream) {
        Object[] clients = this.federatedClients;
        if (clients == null || clients.length == 0) {
            return stream.map(BUFFER_STRING_FUNCTION);
        }
        Flux mergedUpstreams = Flux.merge((Publisher)Flux.fromArray((Object[])clients).map((Function)new FederatedMerger(c)));
        return Flux.merge((Publisher[])new Publisher[]{stream.map(BUFFER_STRING_FUNCTION), mergedUpstreams});
    }

    final class GraphMapper
    implements Function<Object, Event> {
        GraphMapper() {
        }

        @Override
        public Event apply(Object o) {
            return new GraphEvent(Nexus.this.server.getListenAddress().toString(), FlowSerializerUtils.Graph.class.equals(o.getClass()) ? (FlowSerializerUtils.Graph)o : FlowSerializerUtils.scan((Object)o));
        }
    }

    final class MetricMapper
    implements Function<Object, Event> {
        MetricMapper() {
        }

        @Override
        public Event apply(Object o) {
            return new MetricEvent(Nexus.this.server.getListenAddress().toString());
        }
    }

    class LastGraphStateMap
    implements Function<Event, Event> {
        LastGraphStateMap() {
        }

        @Override
        public Event apply(Event event) {
            if (GraphEvent.class.equals(event.getClass())) {
                Nexus.this.lastState.graph.mergeWith(((GraphEvent)event).graph);
                return Nexus.this.lastState;
            }
            return event;
        }

        public String toString() {
            return "ScanIfGraphEvent";
        }
    }

    static final class FederatedClient {
        final HttpClient client;
        final String targetAPI;

        public FederatedClient(String targetAPI) {
            this.targetAPI = targetAPI;
            this.client = HttpClient.create();
        }
    }

    static final class FederatedMerger
    implements Function<FederatedClient, Publisher<ByteBuf>> {
        final HttpChannel c;

        public FederatedMerger(HttpChannel c) {
            this.c = c;
        }

        @Override
        public Publisher<ByteBuf> apply(FederatedClient o) {
            return o.client.ws(o.targetAPI).flatMap(NettyInbound::receive);
        }
    }

    static class StringToBuffer
    implements Function<Event, ByteBuf> {
        StringToBuffer() {
        }

        @Override
        public ByteBuf apply(Event event) {
            try {
                return Unpooled.wrappedBuffer((byte[])event.toString().getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                throw Exceptions.propagate((Throwable)e);
            }
        }
    }

    static final class SystemEvent
    extends Event {
        final Map<Thread, ThreadState> threads = new WeakHashMap<Thread, ThreadState>();
        static final Runtime runtime = Runtime.getRuntime();
        static final JvmStats jvmStats = new JvmStats();

        public SystemEvent(String hostname) {
            super(hostname);
        }

        public JvmStats getJvmStats() {
            return jvmStats;
        }

        public Collection<ThreadState> getThreads() {
            return this.threads.values();
        }

        public String toString() {
            return "{ " + FlowSerializerUtils.property((String)"jvmStats", (Object)this.getJvmStats()) + ", " + FlowSerializerUtils.property((String)"threads", this.getThreads()) + ", " + FlowSerializerUtils.property((String)"type", (Object)this.getType()) + ", " + FlowSerializerUtils.property((String)"timestamp", (Object)System.currentTimeMillis()) + ", " + FlowSerializerUtils.property((String)"nexusHost", (Object)this.getNexusHost()) + " }";
        }

        SystemEvent scan() {
            int active = Thread.activeCount();
            Thread[] currentThreads = new Thread[active];
            int n = Thread.enumerate(currentThreads);
            for (int i = 0; i < n; ++i) {
                if (this.threads.containsKey(currentThreads[i])) continue;
                this.threads.put(currentThreads[i], new ThreadState(currentThreads[i]));
            }
            return this;
        }

        static final class ThreadState {
            final transient Thread thread;

            public ThreadState(Thread thread) {
                this.thread = thread;
            }

            public long getContextHash() {
                if (this.thread.getContextClassLoader() != null) {
                    return this.thread.getContextClassLoader().hashCode();
                }
                return -1L;
            }

            public long getId() {
                return this.thread.getId();
            }

            public String getName() {
                return this.thread.getName();
            }

            public int getPriority() {
                return this.thread.getPriority();
            }

            public Thread.State getState() {
                return this.thread.getState();
            }

            public String getThreadGroup() {
                ThreadGroup group = this.thread.getThreadGroup();
                return group != null ? this.thread.getThreadGroup().getName() : null;
            }

            public boolean isAlive() {
                return this.thread.isAlive();
            }

            public boolean isDaemon() {
                return this.thread.isDaemon();
            }

            public boolean isInterrupted() {
                return this.thread.isInterrupted();
            }

            public String toString() {
                return "{ " + FlowSerializerUtils.property((String)"id", (Object)this.getId()) + ", " + FlowSerializerUtils.property((String)"name", (Object)this.getName()) + ", " + FlowSerializerUtils.property((String)"alive", (Object)this.isAlive()) + ", " + FlowSerializerUtils.property((String)"state", (Object)this.getState().name()) + (this.getThreadGroup() != null ? ", " + FlowSerializerUtils.property((String)"threadGroup", (Object)this.getThreadGroup()) : "") + (this.getContextHash() != -1L ? ", " + FlowSerializerUtils.property((String)"contextHash", (Object)this.getContextHash()) : "") + ", " + FlowSerializerUtils.property((String)"interrupted", (Object)this.isInterrupted()) + ", " + FlowSerializerUtils.property((String)"priority", (Object)this.getPriority()) + ", " + FlowSerializerUtils.property((String)"daemon", (Object)this.isDaemon()) + " }";
            }
        }

        static final class JvmStats {
            JvmStats() {
            }

            public int getActiveThreads() {
                return Thread.activeCount();
            }

            public int getAvailableProcessors() {
                return runtime.availableProcessors();
            }

            public long getFreeMemory() {
                return runtime.freeMemory();
            }

            public long getMaxMemory() {
                return runtime.maxMemory();
            }

            public long getUsedMemory() {
                return runtime.totalMemory();
            }

            public String toString() {
                return "{ " + FlowSerializerUtils.property((String)"freeMemory", (Object)this.getFreeMemory()) + ", " + FlowSerializerUtils.property((String)"maxMemory", (Object)this.getMaxMemory()) + ", " + FlowSerializerUtils.property((String)"usedMemory", (Object)this.getUsedMemory()) + ", " + FlowSerializerUtils.property((String)"activeThreads", (Object)this.getActiveThreads()) + ", " + FlowSerializerUtils.property((String)"availableProcessors", (Object)this.getAvailableProcessors()) + " }";
            }
        }
    }

    static final class MetricEvent
    extends Event {
        public MetricEvent(String hostname) {
            super(hostname);
        }

        public String toString() {
            return "{ " + FlowSerializerUtils.property((String)"nexusHost", (Object)this.getNexusHost()) + ", " + FlowSerializerUtils.property((String)"type", (Object)this.getType()) + ", " + FlowSerializerUtils.property((String)"timestamp", (Object)System.currentTimeMillis()) + " }";
        }
    }

    static final class LogEvent
    extends Event {
        final String message;
        final String category;
        final Level level;
        final long threadId;
        final String origin;
        final String data;
        final String kind;
        final long timestamp = System.currentTimeMillis();

        public LogEvent(String name, String category, Level level, String message, Object ... args) {
            super(name);
            this.threadId = Thread.currentThread().getId();
            this.message = message;
            this.level = level;
            this.category = category;
            if (args != null && args.length == 3) {
                this.kind = args[0].toString();
                this.data = args[1] != null ? args[1].toString() : null;
                this.origin = FlowSerializerUtils.getIdOrDefault((Object)args[2]);
            } else {
                this.origin = null;
                this.kind = null;
                this.data = null;
            }
        }

        public String getCategory() {
            return this.category;
        }

        public String getData() {
            return this.data;
        }

        public String getKind() {
            return this.kind;
        }

        public Level getLevel() {
            return this.level;
        }

        public String getMessage() {
            return this.message;
        }

        public String getOrigin() {
            return this.origin;
        }

        public long getThreadId() {
            return this.threadId;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public String toString() {
            return "{ " + FlowSerializerUtils.property((String)"timestamp", (Object)this.getTimestamp()) + ", " + FlowSerializerUtils.property((String)"level", (Object)this.getLevel().getName()) + ", " + FlowSerializerUtils.property((String)"category", (Object)this.getCategory()) + (this.kind != null ? ", " + FlowSerializerUtils.property((String)"kind", (Object)this.getKind()) : "") + (this.origin != null ? ", " + FlowSerializerUtils.property((String)"origin", (Object)this.getOrigin()) : "") + (this.data != null ? ", " + FlowSerializerUtils.property((String)"data", (Object)this.getData()) : "") + ", " + FlowSerializerUtils.property((String)"message", (Object)this.getMessage()) + ", " + FlowSerializerUtils.property((String)"threadId", (Object)this.getThreadId()) + ", " + FlowSerializerUtils.property((String)"type", (Object)this.getType()) + ", " + FlowSerializerUtils.property((String)"nexusHost", (Object)this.getNexusHost()) + " }";
        }
    }

    static final class RemovedGraphEvent
    extends Event {
        final Collection<String> ids;

        public RemovedGraphEvent(String name, Collection<String> ids) {
            super(name);
            this.ids = ids;
        }

        public Collection<String> getStreams() {
            return this.ids;
        }

        public String toString() {
            return "{ " + FlowSerializerUtils.property((String)"streams", this.getStreams()) + ", " + FlowSerializerUtils.property((String)"type", (Object)this.getType()) + ", " + FlowSerializerUtils.property((String)"timestamp", (Object)System.currentTimeMillis()) + ", " + FlowSerializerUtils.property((String)"nexusHost", (Object)this.getNexusHost()) + " }";
        }
    }

    static final class GraphEvent
    extends Event {
        final FlowSerializerUtils.Graph graph;

        public GraphEvent(String name, FlowSerializerUtils.Graph graph) {
            super(name);
            this.graph = graph;
        }

        public FlowSerializerUtils.Graph getStreams() {
            return this.graph;
        }

        public String toString() {
            return "{ " + FlowSerializerUtils.property((String)"streams", (Object)this.getStreams()) + ", " + FlowSerializerUtils.property((String)"type", (Object)this.getType()) + ", " + FlowSerializerUtils.property((String)"timestamp", (Object)System.currentTimeMillis()) + ", " + FlowSerializerUtils.property((String)"nexusHost", (Object)this.getNexusHost()) + " }";
        }
    }

    static class Event {
        final String nexusHost;

        public Event(String nexusHost) {
            this.nexusHost = nexusHost;
        }

        public String getNexusHost() {
            return this.nexusHost;
        }

        public String getType() {
            return this.getClass().getSimpleName();
        }
    }
}

