/*
 * Decompiled with CFR 0.152.
 */
package io.activej.rpc.client;

import io.activej.async.callback.Callback;
import io.activej.async.exception.AsyncCloseException;
import io.activej.async.exception.AsyncTimeoutException;
import io.activej.common.Checks;
import io.activej.common.recycle.Recyclers;
import io.activej.common.time.Stopwatch;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.schedule.ScheduledRunnable;
import io.activej.jmx.api.JmxRefreshable;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxReducers;
import io.activej.jmx.stats.EventStats;
import io.activej.rpc.client.RpcClient;
import io.activej.rpc.client.jmx.RpcRequestStats;
import io.activej.rpc.client.sender.RpcSender;
import io.activej.rpc.protocol.RpcControlMessage;
import io.activej.rpc.protocol.RpcException;
import io.activej.rpc.protocol.RpcMandatoryData;
import io.activej.rpc.protocol.RpcMessage;
import io.activej.rpc.protocol.RpcOverloadException;
import io.activej.rpc.protocol.RpcRemoteException;
import io.activej.rpc.protocol.RpcStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RpcClientConnection
implements RpcStream.Listener,
RpcSender,
JmxRefreshable {
    private static final Logger logger = LoggerFactory.getLogger(RpcClientConnection.class);
    private static final boolean CHECK = Checks.isEnabled(RpcClientConnection.class);
    private static final RpcException CONNECTION_UNRESPONSIVE = new RpcException("Unresponsive connection");
    private static final RpcOverloadException RPC_OVERLOAD_EXCEPTION = new RpcOverloadException("RPC client is overloaded");
    private StreamDataAcceptor<RpcMessage> downstreamDataAcceptor = null;
    private boolean overloaded = false;
    private boolean closed;
    private final Eventloop eventloop;
    private final RpcClient rpcClient;
    private final RpcStream stream;
    private final InetSocketAddress address;
    private final Map<Integer, Callback<?>> activeRequests = new HashMap();
    private ArrayList<RpcMessage> initialBuffer = new ArrayList();
    private int cookie = 0;
    private boolean serverClosing;
    private boolean monitoring;
    private final RpcRequestStats connectionStats;
    private final EventStats totalRequests;
    private final EventStats connectionRequests;
    private final long keepAliveMillis;
    private boolean pongReceived;

    RpcClientConnection(Eventloop eventloop, RpcClient rpcClient, InetSocketAddress address, RpcStream stream, long keepAliveMillis) {
        this.eventloop = eventloop;
        this.rpcClient = rpcClient;
        this.stream = stream;
        this.address = address;
        this.keepAliveMillis = keepAliveMillis;
        this.monitoring = false;
        this.connectionStats = RpcRequestStats.create(RpcClient.SMOOTHING_WINDOW);
        this.connectionRequests = this.connectionStats.getTotalRequests();
        this.totalRequests = rpcClient.getGeneralRequestsStats().getTotalRequests();
    }

    @Override
    public <I, O> void sendRequest(I request, int timeout, @NotNull Callback<O> cb) {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        this.totalRequests.recordEvent();
        this.connectionRequests.recordEvent();
        if (!this.overloaded || request instanceof RpcMandatoryData) {
            ++this.cookie;
            if (this.monitoring) {
                cb = this.doJmxMonitoring(request, timeout, cb);
            }
            if (timeout == Integer.MAX_VALUE) {
                this.activeRequests.put(this.cookie, cb);
            } else {
                ScheduledCallback<O> scheduledCallback = new ScheduledCallback<O>(this.cookie, cb);
                scheduledCallback.scheduledRunnable = this.eventloop.delayBackground((long)timeout, scheduledCallback);
                this.activeRequests.put(this.cookie, scheduledCallback);
            }
            this.downstreamDataAcceptor.accept((Object)RpcMessage.of(this.cookie, request));
        } else {
            this.doProcessOverloaded(cb);
        }
    }

    @Override
    public <I, O> void sendRequest(I request, @NotNull Callback<O> cb) {
        if (CHECK) {
            Checks.checkState((boolean)this.eventloop.inEventloopThread(), (Object)"Not in eventloop thread");
        }
        this.totalRequests.recordEvent();
        this.connectionRequests.recordEvent();
        if (!this.overloaded || request instanceof RpcMandatoryData) {
            ++this.cookie;
            if (this.monitoring) {
                cb = this.doJmxMonitoring(request, Integer.MAX_VALUE, cb);
            }
            this.activeRequests.put(this.cookie, cb);
            this.downstreamDataAcceptor.accept((Object)RpcMessage.of(this.cookie, request));
        } else {
            this.doProcessOverloaded(cb);
        }
    }

    private <I, O> Callback<O> doJmxMonitoring(I request, int timeout, @NotNull Callback<O> cb) {
        RpcRequestStats requestStatsPerClass = this.rpcClient.ensureRequestStatsPerClass(request.getClass());
        requestStatsPerClass.getTotalRequests().recordEvent();
        return new JmxConnectionMonitoringResultCallback<O>(requestStatsPerClass, cb, timeout);
    }

    private <O> void doProcessOverloaded(@NotNull Callback<O> cb) {
        this.rpcClient.getGeneralRequestsStats().getRejectedRequests().recordEvent();
        this.connectionStats.getRejectedRequests().recordEvent();
        if (logger.isTraceEnabled()) {
            logger.trace("RPC client uplink is overloaded");
        }
        cb.accept(null, (Exception)RPC_OVERLOAD_EXCEPTION);
    }

    public void accept(RpcMessage message) {
        if (message.getData().getClass() == RpcRemoteException.class) {
            this.processErrorMessage(message);
        } else if (message.getData().getClass() == RpcControlMessage.class) {
            this.processControlMessage((RpcControlMessage)((Object)message.getData()));
        } else {
            Callback<?> cb = this.activeRequests.remove(message.getCookie());
            if (cb == null) {
                return;
            }
            cb.accept(message.getData(), null);
            if (this.serverClosing && this.activeRequests.size() == 0) {
                this.shutdown();
            }
        }
    }

    private void processErrorMessage(RpcMessage message) {
        RpcRemoteException remoteException = (RpcRemoteException)message.getData();
        this.connectionStats.getFailedRequests().recordEvent();
        this.rpcClient.getGeneralRequestsStats().getFailedRequests().recordEvent();
        this.connectionStats.getServerExceptions().recordException((Throwable)remoteException, null);
        this.rpcClient.getGeneralRequestsStats().getServerExceptions().recordException((Throwable)remoteException, null);
        Callback<?> cb = this.activeRequests.remove(message.getCookie());
        if (cb != null) {
            cb.accept(null, (Exception)remoteException);
        }
    }

    private void processControlMessage(RpcControlMessage controlMessage) {
        if (controlMessage == RpcControlMessage.CLOSE) {
            this.rpcClient.removeConnection(this.address);
            this.serverClosing = true;
            if (this.activeRequests.size() == 0) {
                this.shutdown();
            }
        } else if (controlMessage == RpcControlMessage.PONG) {
            this.pongReceived = true;
        } else {
            throw new RuntimeException("Received unknown RpcControlMessage");
        }
    }

    private void ping() {
        if (this.isClosed()) {
            return;
        }
        if (this.keepAliveMillis == 0L) {
            return;
        }
        this.pongReceived = false;
        this.downstreamDataAcceptor.accept((Object)RpcMessage.of(-1, (Object)RpcControlMessage.PING));
        this.eventloop.delayBackground(this.keepAliveMillis, () -> {
            if (this.isClosed()) {
                return;
            }
            if (!this.pongReceived) {
                this.onReceiverError(CONNECTION_UNRESPONSIVE);
            } else {
                this.ping();
            }
        });
    }

    @Override
    public void onReceiverEndOfStream() {
        if (this.isClosed()) {
            return;
        }
        logger.info("Receiver EOS: {}", (Object)this.address);
        this.stream.close();
        this.doClose();
    }

    @Override
    public void onReceiverError(@NotNull Exception e) {
        if (this.isClosed()) {
            return;
        }
        logger.error("Receiver error: {}", (Object)this.address, (Object)e);
        this.rpcClient.getLastProtocolError().recordException((Throwable)e, (Object)this.address);
        this.stream.close();
        this.doClose();
    }

    @Override
    public void onSenderError(@NotNull Exception e) {
        if (this.isClosed()) {
            return;
        }
        logger.error("Sender error: {}", (Object)this.address, (Object)e);
        this.rpcClient.getLastProtocolError().recordException((Throwable)e, (Object)this.address);
        this.stream.close();
        this.doClose();
    }

    @Override
    public void onSerializationError(RpcMessage message, @NotNull Exception e) {
        if (this.isClosed()) {
            return;
        }
        logger.error("Serialization error: {} for data {}", new Object[]{this.address, message.getData(), e});
        this.rpcClient.getLastProtocolError().recordException((Throwable)e, (Object)this.address);
        this.activeRequests.remove(message.getCookie()).accept(null, e);
    }

    @Override
    public void onSenderReady(@NotNull StreamDataAcceptor<RpcMessage> acceptor) {
        if (this.isClosed()) {
            return;
        }
        this.downstreamDataAcceptor = acceptor;
        this.overloaded = false;
        if (this.initialBuffer != null) {
            for (RpcMessage message : this.initialBuffer) {
                acceptor.accept((Object)message);
            }
            this.initialBuffer = null;
            this.ping();
        }
    }

    @Override
    public void onSenderSuspended() {
        this.overloaded = true;
    }

    private void doClose() {
        if (this.isClosed()) {
            return;
        }
        this.downstreamDataAcceptor = null;
        this.closed = true;
        this.rpcClient.removeConnection(this.address);
        while (!this.activeRequests.isEmpty()) {
            for (Integer cookie : new HashSet<Integer>(this.activeRequests.keySet())) {
                Callback<?> cb = this.activeRequests.remove(cookie);
                if (cb == null) continue;
                cb.accept(null, (Exception)new AsyncCloseException("Connection closed"));
            }
        }
    }

    public boolean isClosed() {
        return this.closed;
    }

    public void shutdown() {
        if (this.isClosed()) {
            return;
        }
        this.stream.sendEndOfStream();
    }

    public void startMonitoring() {
        this.monitoring = true;
    }

    public void stopMonitoring() {
        this.monitoring = false;
    }

    @JmxAttribute(name="")
    public RpcRequestStats getRequestStats() {
        return this.connectionStats;
    }

    @JmxAttribute(reducer=JmxReducers.JmxReducerSum.class)
    public int getActiveRequests() {
        return this.activeRequests.size();
    }

    public void refresh(long timestamp) {
        this.connectionStats.refresh(timestamp);
    }

    public String toString() {
        int active = this.activeRequests.size();
        long failed = this.connectionStats.getFailedRequests().getTotalCount();
        return "RpcClientConnection{address=" + this.address + ", active=" + active + ", successes=" + (this.connectionStats.getTotalRequests().getTotalCount() - failed - (long)active) + ", failures=" + failed + '}';
    }

    private class ScheduledCallback<O>
    implements Runnable,
    Callback<O> {
        ScheduledRunnable scheduledRunnable;
        final Callback<O> cb;
        final int cookie;

        ScheduledCallback(@NotNull int cookie, Callback<O> cb) {
            this.cookie = cookie;
            this.cb = cb;
        }

        public void accept(O result, @Nullable Exception e) {
            if (this.scheduledRunnable != null) {
                this.scheduledRunnable.cancel();
                this.cb.accept(result, e);
            } else {
                Recyclers.recycle(result);
            }
        }

        @Override
        public void run() {
            this.scheduledRunnable = null;
            Callback expiredCb = (Callback)RpcClientConnection.this.activeRequests.remove(this.cookie);
            if (expiredCb != null) {
                assert (expiredCb == this);
                RpcClientConnection.this.connectionStats.getExpiredRequests().recordEvent();
                RpcClientConnection.this.rpcClient.getGeneralRequestsStats().getExpiredRequests().recordEvent();
                this.cb.accept(null, (Exception)new AsyncTimeoutException("RPC request has timed out"));
            }
            if (RpcClientConnection.this.serverClosing && RpcClientConnection.this.activeRequests.size() == 0) {
                RpcClientConnection.this.shutdown();
            }
        }
    }

    private final class JmxConnectionMonitoringResultCallback<T>
    implements Callback<T> {
        private final Stopwatch stopwatch = Stopwatch.createStarted();
        private final Callback<T> callback;
        private final RpcRequestStats requestStatsPerClass;
        private final long dueTimestamp;

        public JmxConnectionMonitoringResultCallback(RpcRequestStats requestStatsPerClass, Callback<T> cb, long timeout) {
            this.callback = cb;
            this.requestStatsPerClass = requestStatsPerClass;
            this.dueTimestamp = RpcClientConnection.this.eventloop.currentTimeMillis() + timeout;
        }

        public void accept(T result, @Nullable Exception e) {
            if (e == null) {
                this.onResult(result);
            } else {
                this.onException(e);
            }
        }

        private void onResult(T result) {
            int responseTime = this.timeElapsed();
            RpcClientConnection.this.connectionStats.getResponseTime().recordValue(responseTime);
            this.requestStatsPerClass.getResponseTime().recordValue(responseTime);
            RpcClientConnection.this.rpcClient.getGeneralRequestsStats().getResponseTime().recordValue(responseTime);
            this.recordOverdue();
            this.callback.accept(result, null);
        }

        private void onException(@NotNull Exception e) {
            if (e instanceof RpcRemoteException) {
                int responseTime = this.timeElapsed();
                RpcClientConnection.this.connectionStats.getFailedRequests().recordEvent();
                RpcClientConnection.this.connectionStats.getResponseTime().recordValue(responseTime);
                RpcClientConnection.this.connectionStats.getServerExceptions().recordException((Throwable)e, null);
                this.requestStatsPerClass.getFailedRequests().recordEvent();
                this.requestStatsPerClass.getResponseTime().recordValue(responseTime);
                RpcClientConnection.this.rpcClient.getGeneralRequestsStats().getResponseTime().recordValue(responseTime);
                this.requestStatsPerClass.getServerExceptions().recordException((Throwable)e, null);
                this.recordOverdue();
            } else if (e instanceof AsyncTimeoutException) {
                RpcClientConnection.this.connectionStats.getExpiredRequests().recordEvent();
                this.requestStatsPerClass.getExpiredRequests().recordEvent();
            } else if (e instanceof RpcOverloadException) {
                RpcClientConnection.this.connectionStats.getRejectedRequests().recordEvent();
                this.requestStatsPerClass.getRejectedRequests().recordEvent();
            }
            this.callback.accept(null, e);
        }

        private int timeElapsed() {
            return (int)this.stopwatch.elapsed(TimeUnit.MILLISECONDS);
        }

        private void recordOverdue() {
            int overdue = (int)(System.currentTimeMillis() - this.dueTimestamp);
            if (overdue > 0) {
                RpcClientConnection.this.connectionStats.getOverdues().recordValue(overdue);
                this.requestStatsPerClass.getOverdues().recordValue(overdue);
                RpcClientConnection.this.rpcClient.getGeneralRequestsStats().getOverdues().recordValue(overdue);
            }
        }
    }
}

