/*
 * Decompiled with CFR 0.152.
 */
package io.activej.rpc.server;

import io.activej.codegen.DefiningClassLoader;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.exception.MalformedDataException;
import io.activej.csp.process.frames.FrameFormat;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.net.ServerSocketSettings;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.api.attribute.JmxReducers;
import io.activej.jmx.stats.EventStats;
import io.activej.jmx.stats.ExceptionStats;
import io.activej.jmx.stats.ValueStats;
import io.activej.net.AbstractServer;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import io.activej.rpc.protocol.RpcControlMessage;
import io.activej.rpc.protocol.RpcMessage;
import io.activej.rpc.protocol.RpcStream;
import io.activej.rpc.server.RpcRequestHandler;
import io.activej.rpc.server.RpcServerConnection;
import io.activej.serializer.BinarySerializer;
import io.activej.serializer.SerializerBuilder;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public final class RpcServer
extends AbstractServer<RpcServer> {
    public static final ServerSocketSettings DEFAULT_SERVER_SOCKET_SETTINGS = ServerSocketSettings.create((int)16384);
    public static final MemSize DEFAULT_INITIAL_BUFFER_SIZE = ChannelSerializer.DEFAULT_INITIAL_BUFFER_SIZE;
    private MemSize initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE;
    @Nullable
    private FrameFormat frameFormat;
    private Duration autoFlushInterval = Duration.ZERO;
    private final Map<Class<?>, RpcRequestHandler<?, ?>> handlers = new LinkedHashMap();
    private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    private SerializerBuilder serializerBuilder = SerializerBuilder.create((DefiningClassLoader)DefiningClassLoader.create((ClassLoader)this.classLoader));
    private List<Class<?>> messageTypes;
    private final List<RpcServerConnection> connections = new ArrayList<RpcServerConnection>();
    private BinarySerializer<RpcMessage> serializer;
    private SettablePromise<Void> closeCallback;
    static final Duration SMOOTHING_WINDOW = Duration.ofMinutes(1L);
    private final EventStats totalConnects = EventStats.create((Duration)SMOOTHING_WINDOW);
    private final Map<InetAddress, EventStats> connectsPerAddress = new HashMap<InetAddress, EventStats>();
    private final EventStats successfulRequests = EventStats.create((Duration)SMOOTHING_WINDOW);
    private final EventStats failedRequests = EventStats.create((Duration)SMOOTHING_WINDOW);
    private final ValueStats requestHandlingTime = ValueStats.create((Duration)SMOOTHING_WINDOW).withUnit("milliseconds");
    private final ExceptionStats lastRequestHandlingException = ExceptionStats.create();
    private final ExceptionStats lastProtocolError = ExceptionStats.create();
    private boolean monitoring;

    private RpcServer(Eventloop eventloop) {
        super(eventloop);
    }

    public static RpcServer create(Eventloop eventloop) {
        return ((RpcServer)((RpcServer)new RpcServer(eventloop).withServerSocketSettings(DEFAULT_SERVER_SOCKET_SETTINGS)).withSocketSettings(DEFAULT_SOCKET_SETTINGS)).withHandler(RpcControlMessage.class, request -> {
            if (request == RpcControlMessage.PING) {
                return Promise.of((Object)((Object)RpcControlMessage.PONG));
            }
            return Promise.ofException((Exception)new MalformedDataException("Unknown message: " + (Object)request));
        });
    }

    public RpcServer withClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
        this.serializerBuilder = SerializerBuilder.create((DefiningClassLoader)DefiningClassLoader.create((ClassLoader)classLoader));
        return this;
    }

    public RpcServer withMessageTypes(Class<?> ... messageTypes) {
        return this.withMessageTypes(Arrays.asList(messageTypes));
    }

    public RpcServer withMessageTypes(@NotNull List<Class<?>> messageTypes) {
        Checks.checkArgument((new HashSet(messageTypes).size() == messageTypes.size() ? 1 : 0) != 0, (Object)"Message types must be unique");
        this.messageTypes = messageTypes;
        return this;
    }

    public RpcServer withSerializerBuilder(SerializerBuilder serializerBuilder) {
        this.serializerBuilder = serializerBuilder;
        return this;
    }

    public RpcServer withStreamProtocol(MemSize defaultPacketSize) {
        this.initialBufferSize = defaultPacketSize;
        return this;
    }

    public RpcServer withStreamProtocol(MemSize defaultPacketSize, FrameFormat frameFormat) {
        this.initialBufferSize = defaultPacketSize;
        this.frameFormat = frameFormat;
        return this;
    }

    public RpcServer withAutoFlushInterval(Duration autoFlushInterval) {
        this.autoFlushInterval = autoFlushInterval;
        return this;
    }

    public <I, O> RpcServer withHandler(Class<I> requestClass, RpcRequestHandler<I, O> handler) {
        Checks.checkArgument((!this.handlers.containsKey(requestClass) ? 1 : 0) != 0, (String)"Handler for {} has already been added", (Object[])new Object[]{requestClass});
        this.handlers.put(requestClass, handler);
        return this;
    }

    protected void serve(AsyncTcpSocket socket, InetAddress remoteAddress) {
        RpcStream stream = new RpcStream(socket, this.serializer, this.initialBufferSize, this.autoFlushInterval, this.frameFormat, true);
        RpcServerConnection connection = new RpcServerConnection(this, remoteAddress, this.handlers, stream);
        stream.setListener(connection);
        this.add(connection);
        this.ensureConnectStats(remoteAddress).recordEvent();
        this.totalConnects.recordEvent();
    }

    protected void onListen() {
        Checks.checkState((this.messageTypes != null ? 1 : 0) != 0, (Object)"Message types must be specified");
        this.serializer = this.serializerBuilder.withSubclasses("messageTypes", this.messageTypes).build(RpcMessage.class);
    }

    protected void onClose(SettablePromise<Void> cb) {
        if (this.connections.isEmpty()) {
            this.logger.info("RpcServer is closing. Active connections count: 0.");
            cb.set(null);
        } else {
            this.logger.info("RpcServer is closing. Active connections count: {}", (Object)this.connections.size());
            for (RpcServerConnection connection : new ArrayList<RpcServerConnection>(this.connections)) {
                connection.shutdown();
            }
            this.closeCallback = cb;
        }
    }

    void add(RpcServerConnection connection) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Client connected on {}", (Object)connection);
        }
        if (this.monitoring) {
            connection.startMonitoring();
        }
        this.connections.add(connection);
    }

    boolean remove(RpcServerConnection connection) {
        if (!this.connections.remove(connection)) {
            return false;
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Client disconnected on {}", (Object)connection);
        }
        if (this.closeCallback != null) {
            this.logger.info("RpcServer is closing. One more connection was closed. Active connections count: {}", (Object)this.connections.size());
            if (this.connections.isEmpty()) {
                this.closeCallback.set(null);
            }
        }
        return true;
    }

    @JmxOperation(description="enable monitoring [ when monitoring is enabled more stats are collected, but it causes more overhead (for example, requestHandlingTime stats are collected only when monitoring is enabled) ]")
    public void startMonitoring() {
        this.monitoring = true;
        for (RpcServerConnection connection : this.connections) {
            connection.startMonitoring();
        }
    }

    @JmxOperation(description="disable monitoring [ when monitoring is enabled more stats are collected, but it causes more overhead (for example, requestHandlingTime stats are collected only when monitoring is enabled) ]")
    public void stopMonitoring() {
        this.monitoring = false;
        for (RpcServerConnection connection : this.connections) {
            connection.stopMonitoring();
        }
    }

    @JmxAttribute(description="when monitoring is enabled more stats are collected, but it causes more overhead (for example, requestHandlingTime stats are collected only when monitoring is enabled)")
    public boolean isMonitoring() {
        return this.monitoring;
    }

    @JmxAttribute(description="current number of connections", reducer=JmxReducers.JmxReducerSum.class)
    public int getConnectionsCount() {
        return this.connections.size();
    }

    @JmxAttribute
    public EventStats getTotalConnects() {
        return this.totalConnects;
    }

    public Map<InetAddress, EventStats> getConnectsPerAddress() {
        return this.connectsPerAddress;
    }

    private EventStats ensureConnectStats(InetAddress address) {
        return this.connectsPerAddress.computeIfAbsent(address, $ -> EventStats.create((Duration)SMOOTHING_WINDOW));
    }

    @JmxOperation(description="detailed information about connections")
    public List<RpcServerConnection> getConnections() {
        return this.connections;
    }

    @JmxAttribute(extraSubAttributes={"totalCount"}, description="number of requests which were processed correctly")
    public EventStats getSuccessfulRequests() {
        return this.successfulRequests;
    }

    @JmxAttribute(extraSubAttributes={"totalCount"}, description="request with error responses (number of requests which were handled with error)")
    public EventStats getFailedRequests() {
        return this.failedRequests;
    }

    @JmxAttribute(description="time for handling one request in milliseconds (both successful and failed)")
    public ValueStats getRequestHandlingTime() {
        return this.requestHandlingTime;
    }

    @JmxAttribute(description="exception that occurred because of business logic error (in RpcRequestHandler implementation)")
    public ExceptionStats getLastRequestHandlingException() {
        return this.lastRequestHandlingException;
    }

    @JmxAttribute(description="exception that occurred because of protocol error (serialization, deserialization, compression, decompression, etc)")
    public ExceptionStats getLastProtocolError() {
        return this.lastProtocolError;
    }
}

