/*
 * Decompiled with CFR 0.152.
 */
package io.activej.rpc.client;

import io.activej.async.callback.Callback;
import io.activej.async.exception.AsyncTimeoutException;
import io.activej.async.service.EventloopService;
import io.activej.codegen.DefiningClassLoader;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.Utils;
import io.activej.common.initializer.WithInitializer;
import io.activej.common.time.CurrentTimeProvider;
import io.activej.csp.process.frames.FrameFormat;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanWithStats;
import io.activej.eventloop.net.SocketSettings;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.api.attribute.JmxReducers;
import io.activej.jmx.stats.ExceptionStats;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.net.socket.tcp.AsyncTcpSocketNio;
import io.activej.net.socket.tcp.AsyncTcpSocketSsl;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettablePromise;
import io.activej.rpc.client.IRpcClient;
import io.activej.rpc.client.RpcClientConnection;
import io.activej.rpc.client.RpcClientConnectionPool;
import io.activej.rpc.client.jmx.RpcConnectStats;
import io.activej.rpc.client.jmx.RpcRequestStats;
import io.activej.rpc.client.sender.DiscoveryService;
import io.activej.rpc.client.sender.RpcSender;
import io.activej.rpc.client.sender.RpcStrategy;
import io.activej.rpc.protocol.RpcException;
import io.activej.rpc.protocol.RpcMessage;
import io.activej.rpc.protocol.RpcStream;
import io.activej.serializer.BinarySerializer;
import io.activej.serializer.SerializerBuilder;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RpcClient
implements IRpcClient,
EventloopService,
WithInitializer<RpcClient>,
EventloopJmxBeanWithStats {
    private static final boolean CHECK = Checks.isEnabled(RpcClient.class);
    public static final SocketSettings DEFAULT_SOCKET_SETTINGS = SocketSettings.createDefault();
    public static final Duration DEFAULT_CONNECT_TIMEOUT = ApplicationSettings.getDuration(RpcClient.class, (String)"connectTimeout", (Duration)Duration.ZERO);
    public static final Duration DEFAULT_RECONNECT_INTERVAL = ApplicationSettings.getDuration(RpcClient.class, (String)"reconnectInterval", (Duration)Duration.ZERO);
    public static final MemSize DEFAULT_PACKET_SIZE = ApplicationSettings.getMemSize(RpcClient.class, (String)"packetSize", (MemSize)ChannelSerializer.DEFAULT_INITIAL_BUFFER_SIZE);
    private static final RpcException START_EXCEPTION = new RpcException("Could not establish initial connection");
    private static final RpcException NO_SENDER_AVAILABLE_EXCEPTION = new RpcException("No senders available");
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private final Eventloop eventloop;
    private SocketSettings socketSettings = DEFAULT_SOCKET_SETTINGS;
    private SSLContext sslContext;
    private Executor sslExecutor;
    private RpcStrategy strategy = new NoServersStrategy();
    private List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
    private final Map<InetSocketAddress, RpcClientConnection> connections = new HashMap<InetSocketAddress, RpcClientConnection>();
    private Map<Object, InetSocketAddress> previouslyDiscovered;
    private MemSize defaultPacketSize = DEFAULT_PACKET_SIZE;
    @Nullable
    private FrameFormat frameFormat;
    private Duration autoFlushInterval = Duration.ZERO;
    private Duration keepAliveInterval = Duration.ZERO;
    private List<Class<?>> messageTypes;
    private long connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT.toMillis();
    private long reconnectIntervalMillis = DEFAULT_RECONNECT_INTERVAL.toMillis();
    private boolean forcedStart;
    private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    private SerializerBuilder serializerBuilder = SerializerBuilder.create((DefiningClassLoader)DefiningClassLoader.create((ClassLoader)this.classLoader));
    private BinarySerializer<RpcMessage> serializer;
    private RpcSender requestSender = new NoSenderAvailable();
    @Nullable
    private SettablePromise<Void> stopPromise;
    private final RpcClientConnectionPool pool = this.connections::get;
    static final Duration SMOOTHING_WINDOW = Duration.ofMinutes(1L);
    private boolean monitoring = false;
    private final RpcRequestStats generalRequestsStats = RpcRequestStats.create(SMOOTHING_WINDOW);
    private final Map<Class<?>, RpcRequestStats> requestStatsPerClass = new HashMap();
    private final Map<InetSocketAddress, RpcConnectStats> connectsStatsPerAddress = new HashMap<InetSocketAddress, RpcConnectStats>();
    private final ExceptionStats lastProtocolError = ExceptionStats.create();
    private final AsyncTcpSocketNio.JmxInspector statsSocket = new AsyncTcpSocketNio.JmxInspector();

    private RpcClient(Eventloop eventloop) {
        this.eventloop = eventloop;
    }

    public static RpcClient create(Eventloop eventloop) {
        return new RpcClient(eventloop);
    }

    public RpcClient withClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
        this.serializerBuilder = SerializerBuilder.create((DefiningClassLoader)DefiningClassLoader.create((ClassLoader)classLoader));
        return this;
    }

    public RpcClient withSocketSettings(SocketSettings socketSettings) {
        this.socketSettings = socketSettings;
        return this;
    }

    public RpcClient withMessageTypes(Class<?> ... messageTypes) {
        return this.withMessageTypes(Arrays.asList(messageTypes));
    }

    public RpcClient withMessageTypes(List<Class<?>> messageTypes) {
        Checks.checkArgument((new HashSet(messageTypes).size() == messageTypes.size() ? 1 : 0) != 0, (Object)"Message types must be unique");
        this.messageTypes = messageTypes;
        return this;
    }

    public RpcClient withSerializerBuilder(SerializerBuilder serializerBuilder) {
        this.serializerBuilder = serializerBuilder;
        return this;
    }

    public RpcClient withStrategy(RpcStrategy requestSendingStrategy) {
        this.strategy = requestSendingStrategy;
        return this;
    }

    public RpcClient withStreamProtocol(MemSize defaultPacketSize) {
        this.defaultPacketSize = defaultPacketSize;
        return this;
    }

    public RpcClient withStreamProtocol(MemSize defaultPacketSize, @Nullable FrameFormat frameFormat) {
        this.defaultPacketSize = defaultPacketSize;
        this.frameFormat = frameFormat;
        return this;
    }

    public RpcClient withAutoFlush(Duration autoFlushInterval) {
        this.autoFlushInterval = autoFlushInterval;
        return this;
    }

    public RpcClient withKeepAlive(Duration keepAliveInterval) {
        this.keepAliveInterval = keepAliveInterval;
        return this;
    }

    public RpcClient withConnectTimeout(Duration connectTimeout) {
        this.connectTimeoutMillis = connectTimeout.toMillis();
        return this;
    }

    public RpcClient withReconnectInterval(Duration reconnectInterval) {
        this.reconnectIntervalMillis = reconnectInterval.toMillis();
        return this;
    }

    public RpcClient withSslEnabled(SSLContext sslContext, Executor sslExecutor) {
        this.sslContext = sslContext;
        this.sslExecutor = sslExecutor;
        return this;
    }

    public RpcClient withLogger(Logger logger) {
        this.logger = logger;
        return this;
    }

    public RpcClient withForcedStart() {
        this.forcedStart = true;
        return this;
    }

    public SocketSettings getSocketSettings() {
        return this.socketSettings;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @NotNull
    public Promise<Void> start() {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        Checks.checkNotNull(this.messageTypes, (Object)"Message types must be specified");
        Checks.checkState((this.stopPromise == null ? 1 : 0) != 0);
        this.serializer = this.serializerBuilder.withSubclasses("messageTypes", this.messageTypes).build(RpcMessage.class);
        return Promise.ofCallback(cb -> this.strategy.getDiscoveryService().discover(null, (Callback<Map<Object, InetSocketAddress>>)cb)).map(result -> {
            this.previouslyDiscovered = result;
            Collection addresses = result.values();
            this.addresses = new ArrayList(addresses);
            for (InetSocketAddress address : addresses) {
                if (this.connectsStatsPerAddress.containsKey(address)) continue;
                this.connectsStatsPerAddress.put(address, new RpcConnectStats((CurrentTimeProvider)this.eventloop));
            }
            return addresses;
        }).then(addresses -> Promises.all(addresses.stream().map(address -> {
            this.logger.info("Connecting: {}", address);
            return this.connect((InetSocketAddress)address).map(($, e) -> null);
        })).then(() -> !this.forcedStart && this.requestSender instanceof NoSenderAvailable ? Promise.ofException((Exception)START_EXCEPTION) : Promise.complete())).whenResult(this::rediscover);
    }

    @NotNull
    public Promise<Void> stop() {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        if (this.stopPromise != null) {
            return this.stopPromise;
        }
        this.stopPromise = new SettablePromise();
        if (this.connections.size() == 0) {
            this.stopPromise.set(null);
            return this.stopPromise;
        }
        for (RpcClientConnection connection : this.connections.values()) {
            connection.shutdown();
        }
        return this.stopPromise;
    }

    private Promise<Void> connect(InetSocketAddress address) {
        return AsyncTcpSocketNio.connect((InetSocketAddress)address, (long)this.connectTimeoutMillis, (SocketSettings)this.socketSettings).whenResult(asyncTcpSocketImpl -> {
            if (this.stopPromise != null || !this.addresses.contains(address)) {
                asyncTcpSocketImpl.close();
                return;
            }
            this.statsSocket.onConnect(asyncTcpSocketImpl);
            asyncTcpSocketImpl.setInspector((AsyncTcpSocketNio.Inspector)this.statsSocket);
            AsyncTcpSocketNio socket = this.sslContext == null ? asyncTcpSocketImpl : AsyncTcpSocketSsl.wrapClientSocket((AsyncTcpSocket)asyncTcpSocketImpl, (SSLContext)this.sslContext, (Executor)this.sslExecutor);
            RpcStream stream = new RpcStream((AsyncTcpSocket)socket, this.serializer, this.defaultPacketSize, this.autoFlushInterval, this.frameFormat, false);
            RpcClientConnection connection = new RpcClientConnection(this.eventloop, this, address, stream, this.keepAliveInterval.toMillis());
            stream.setListener(connection);
            if (this.isMonitoring()) {
                connection.startMonitoring();
            }
            this.connections.put(address, connection);
            this.requestSender = (RpcSender)Utils.nonNullElseGet((Object)this.strategy.createSender(this.pool), () -> new NoSenderAvailable());
            this.connectsStatsPerAddress.get(address).recordSuccessfulConnect();
            this.logger.info("Connection to {} established", (Object)address);
        }).whenException(e -> {
            this.logger.warn("Connection {} failed: {}", (Object)address, e);
            if (this.stopPromise == null) {
                this.processClosedConnection(address);
            }
        }).toVoid();
    }

    void removeConnection(InetSocketAddress address) {
        if (this.connections.remove(address) == null) {
            return;
        }
        this.requestSender = (RpcSender)Utils.nonNullElseGet((Object)this.strategy.createSender(this.pool), () -> new NoSenderAvailable());
        this.logger.info("Connection closed: {}", (Object)address);
        this.processClosedConnection(address);
    }

    private void processClosedConnection(InetSocketAddress address) {
        if (this.stopPromise == null) {
            if (!this.addresses.contains(address)) {
                return;
            }
            this.connectsStatsPerAddress.get(address).recordFailedConnect();
            this.eventloop.delayBackground(this.reconnectIntervalMillis, () -> {
                if (this.stopPromise == null) {
                    this.logger.info("Reconnecting: {}", (Object)address);
                    this.connect(address);
                }
            });
        } else if (this.connections.size() == 0) {
            this.stopPromise.set(null);
        }
    }

    @Override
    public <I, O> void sendRequest(I request, int timeout, Callback<O> cb) {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        if (timeout > 0) {
            this.requestSender.sendRequest(request, timeout, cb);
        } else {
            cb.accept(null, (Exception)new AsyncTimeoutException("RPC request has timed out"));
        }
    }

    @Override
    public <I, O> void sendRequest(I request, Callback<O> cb) {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        this.requestSender.sendRequest(request, cb);
    }

    private void rediscover() {
        if (this.stopPromise != null) {
            return;
        }
        this.strategy.getDiscoveryService().discover(this.previouslyDiscovered, (Callback<Map<Object, InetSocketAddress>>)((Callback)(result, e) -> {
            if (this.stopPromise != null) {
                return;
            }
            if (e == null) {
                this.updateAddresses((Map<Object, InetSocketAddress>)result);
                this.rediscover();
            } else {
                this.logger.warn("Could not discover addresses", (Throwable)e);
                this.eventloop.delayBackground(Duration.ofSeconds(1L), this::rediscover);
            }
        }));
    }

    private void updateAddresses(Map<Object, InetSocketAddress> newAddresses) {
        this.previouslyDiscovered = newAddresses;
        List<InetSocketAddress> previousAddresses = this.addresses;
        this.addresses = new ArrayList<InetSocketAddress>(newAddresses.values());
        boolean changed = false;
        for (InetSocketAddress address : previousAddresses) {
            if (this.addresses.contains(address)) continue;
            this.connections.remove(address).shutdown();
            this.connectsStatsPerAddress.remove(address);
            changed = true;
        }
        if (changed) {
            this.requestSender = (RpcSender)Utils.nonNullElseGet((Object)this.strategy.createSender(this.pool), () -> new NoSenderAvailable());
        }
        for (InetSocketAddress address : this.addresses) {
            if (previousAddresses.contains(address)) continue;
            this.connectsStatsPerAddress.put(address, new RpcConnectStats((CurrentTimeProvider)this.eventloop));
            this.connect(address);
        }
    }

    public IRpcClient adaptToAnotherEventloop(final Eventloop anotherEventloop) {
        if (anotherEventloop == this.eventloop) {
            return this;
        }
        return new IRpcClient(){

            @Override
            public <I, O> void sendRequest(I request, int timeout, Callback<O> cb) {
                if (CHECK) {
                    Checks.checkState((boolean)anotherEventloop.inEventloopThread(), (Object)"Not in eventloop thread");
                }
                if (timeout > 0) {
                    RpcClient.this.eventloop.execute(() -> RpcClient.this.requestSender.sendRequest(request, timeout, Callback.toAnotherEventloop((Eventloop)anotherEventloop, (Callback)cb)));
                } else {
                    cb.accept(null, (Exception)new AsyncTimeoutException("RPC request has timed out"));
                }
            }
        };
    }

    @VisibleForTesting
    public RpcSender getRequestSender() {
        return this.requestSender;
    }

    public String toString() {
        return "RpcClient{" + this.connections + '}';
    }

    @JmxOperation(description="enable monitoring [ when monitoring is enabled more stats are collected, but it causes more overhead (for example, responseTime and requestsStatsPerClass are collected only when monitoring is enabled) ]")
    public void startMonitoring() {
        this.monitoring = true;
        for (InetSocketAddress address : this.addresses) {
            RpcClientConnection connection = this.connections.get(address);
            if (connection == null) continue;
            connection.startMonitoring();
        }
    }

    @JmxOperation(description="disable monitoring [ when monitoring is enabled more stats are collected, but it causes more overhead (for example, responseTime and requestsStatsPerClass are collected only when monitoring is enabled) ]")
    public void stopMonitoring() {
        this.monitoring = false;
        for (InetSocketAddress address : this.addresses) {
            RpcClientConnection connection = this.connections.get(address);
            if (connection == null) continue;
            connection.stopMonitoring();
        }
    }

    @JmxAttribute(description="when monitoring is enabled more stats are collected, but it causes more overhead (for example, responseTime and requestsStatsPerClass are collected only when monitoring is enabled)")
    public boolean isMonitoring() {
        return this.monitoring;
    }

    @JmxAttribute(name="requests", extraSubAttributes={"totalRequests"})
    public RpcRequestStats getGeneralRequestsStats() {
        return this.generalRequestsStats;
    }

    @JmxAttribute(reducer=JmxReducers.JmxReducerSum.class)
    public long getTotalSuccessfulConnects() {
        return this.connectsStatsPerAddress.values().stream().mapToLong(RpcConnectStats::getSuccessfulConnects).sum();
    }

    @JmxAttribute(reducer=JmxReducers.JmxReducerSum.class)
    public long getTotalFailedConnects() {
        return this.connectsStatsPerAddress.values().stream().mapToLong(RpcConnectStats::getFailedConnects).sum();
    }

    @JmxAttribute(description="request stats distributed by request class")
    public Map<Class<?>, RpcRequestStats> getRequestsStatsPerClass() {
        return this.requestStatsPerClass;
    }

    @JmxAttribute
    public Map<InetSocketAddress, RpcConnectStats> getConnectsStatsPerAddress() {
        return this.connectsStatsPerAddress;
    }

    @JmxAttribute(description="request stats for current connections (when connection is closed stats are removed)")
    public Map<InetSocketAddress, RpcClientConnection> getRequestStatsPerConnection() {
        return this.connections;
    }

    @JmxAttribute(reducer=JmxReducers.JmxReducerSum.class)
    public int getActiveConnections() {
        return this.connections.size();
    }

    @JmxAttribute(reducer=JmxReducers.JmxReducerSum.class)
    public int getActiveRequests() {
        int count = 0;
        for (RpcClientConnection connection : this.connections.values()) {
            count += connection.getActiveRequests();
        }
        return count;
    }

    @JmxAttribute(description="exception that occurred because of protocol error (serialization, deserialization, compression, decompression, etc)")
    public ExceptionStats getLastProtocolError() {
        return this.lastProtocolError;
    }

    @JmxAttribute
    public AsyncTcpSocketNio.JmxInspector getStatsSocket() {
        return this.statsSocket;
    }

    @JmxAttribute
    public List<String> getUnresponsiveServers() {
        if (this.stopPromise != null) {
            return Collections.emptyList();
        }
        return this.connectsStatsPerAddress.entrySet().stream().filter(entry -> !((RpcConnectStats)entry.getValue()).isConnected()).map(entry -> ((InetSocketAddress)entry.getKey()).toString()).collect(Collectors.toList());
    }

    RpcRequestStats ensureRequestStatsPerClass(Class<?> requestClass) {
        return this.requestStatsPerClass.computeIfAbsent(requestClass, $ -> RpcRequestStats.create(SMOOTHING_WINDOW));
    }

    private static final class NoServersStrategy
    implements RpcStrategy {
        private NoServersStrategy() {
        }

        @Override
        public DiscoveryService getDiscoveryService() {
            return DiscoveryService.constant(Collections.emptyMap());
        }

        @Override
        public RpcSender createSender(RpcClientConnectionPool pool) {
            return null;
        }
    }

    private static final class NoSenderAvailable
    implements RpcSender {
        private NoSenderAvailable() {
        }

        @Override
        public <I, O> void sendRequest(I request, int timeout, @NotNull Callback<O> cb) {
            cb.accept(null, (Exception)NO_SENDER_AVAILABLE_EXCEPTION);
        }
    }
}

