/*
 * Decompiled with CFR 0.152.
 */
package io.activej.rpc.server;

import io.activej.common.exception.MalformedDataException;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.jmx.api.JmxRefreshable;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.stats.EventStats;
import io.activej.jmx.stats.ExceptionStats;
import io.activej.jmx.stats.ValueStats;
import io.activej.promise.Promise;
import io.activej.rpc.protocol.RpcControlMessage;
import io.activej.rpc.protocol.RpcMessage;
import io.activej.rpc.protocol.RpcRemoteException;
import io.activej.rpc.protocol.RpcStream;
import io.activej.rpc.server.RpcRequestHandler;
import io.activej.rpc.server.RpcServer;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Map;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RpcServerConnection
implements RpcStream.Listener,
JmxRefreshable {
    private static final Logger logger = LoggerFactory.getLogger(RpcServerConnection.class);
    private StreamDataAcceptor<RpcMessage> downstreamDataAcceptor;
    private final RpcServer rpcServer;
    private final RpcStream stream;
    private final Map<Class<?>, RpcRequestHandler<?, ?>> handlers;
    private int activeRequests = 1;
    private final InetAddress remoteAddress;
    private final ExceptionStats lastRequestHandlingException = ExceptionStats.create();
    private final ValueStats requestHandlingTime = ValueStats.create((Duration)RpcServer.SMOOTHING_WINDOW).withUnit("milliseconds");
    private final EventStats successfulRequests = EventStats.create((Duration)RpcServer.SMOOTHING_WINDOW);
    private final EventStats failedRequests = EventStats.create((Duration)RpcServer.SMOOTHING_WINDOW);
    private boolean monitoring = false;

    RpcServerConnection(RpcServer rpcServer, InetAddress remoteAddress, Map<Class<?>, RpcRequestHandler<?, ?>> handlers, RpcStream stream) {
        this.rpcServer = rpcServer;
        this.stream = stream;
        this.handlers = handlers;
        this.remoteAddress = remoteAddress;
    }

    private Promise<Object> serve(Object request) {
        RpcRequestHandler<?, ?> requestHandler = this.handlers.get(request.getClass());
        if (requestHandler == null) {
            return Promise.ofException((Exception)new MalformedDataException("Failed to process request " + request));
        }
        return requestHandler.run(request).promise();
    }

    public void accept(RpcMessage message) {
        ++this.activeRequests;
        int cookie = message.getCookie();
        long startTime = this.monitoring ? System.currentTimeMillis() : 0L;
        Object messageData = message.getData();
        this.serve(messageData).run((result, e) -> {
            if (startTime != 0L) {
                int value = (int)(System.currentTimeMillis() - startTime);
                this.requestHandlingTime.recordValue(value);
                this.rpcServer.getRequestHandlingTime().recordValue(value);
            }
            if (e == null) {
                this.downstreamDataAcceptor.accept((Object)RpcMessage.of(cookie, result));
                this.successfulRequests.recordEvent();
                this.rpcServer.getSuccessfulRequests().recordEvent();
            } else {
                logger.warn("Exception while processing request ID {}", (Object)cookie, (Object)e);
                RpcMessage errorMessage = RpcMessage.of(cookie, new RpcRemoteException(e));
                this.sendError(errorMessage, messageData, e);
            }
            if (--this.activeRequests == 0) {
                this.doClose();
                this.stream.sendEndOfStream();
            }
        });
    }

    @Override
    public void onReceiverEndOfStream() {
        --this.activeRequests;
        if (this.activeRequests == 0) {
            this.doClose();
            this.stream.sendEndOfStream();
        }
    }

    @Override
    public void onReceiverError(@NotNull Exception e) {
        logger.error("Receiver error {}", (Object)this.remoteAddress, (Object)e);
        this.rpcServer.getLastProtocolError().recordException((Throwable)e, (Object)this.remoteAddress);
        this.doClose();
        this.stream.close();
    }

    @Override
    public void onSenderError(@NotNull Exception e) {
        logger.error("Sender error: {}", (Object)this.remoteAddress, (Object)e);
        this.rpcServer.getLastProtocolError().recordException((Throwable)e, (Object)this.remoteAddress);
        this.doClose();
        this.stream.close();
    }

    @Override
    public void onSerializationError(RpcMessage message, @NotNull Exception e) {
        logger.error("Serialization error: {} for data {}", new Object[]{this.remoteAddress, message.getData(), e});
        RpcMessage errorMessage = RpcMessage.of(message.getCookie(), new RpcRemoteException(e));
        this.sendError(errorMessage, message.getData(), e);
    }

    @Override
    public void onSenderReady(@NotNull StreamDataAcceptor<RpcMessage> acceptor) {
        this.downstreamDataAcceptor = acceptor;
        this.stream.receiverResume();
    }

    @Override
    public void onSenderSuspended() {
        this.stream.receiverSuspend();
    }

    private void sendError(RpcMessage errorMessage, Object messageData, @Nullable Exception e) {
        this.downstreamDataAcceptor.accept((Object)errorMessage);
        this.lastRequestHandlingException.recordException((Throwable)e, messageData);
        this.rpcServer.getLastRequestHandlingException().recordException((Throwable)e, messageData);
        this.failedRequests.recordEvent();
        this.rpcServer.getFailedRequests().recordEvent();
    }

    private void doClose() {
        this.rpcServer.remove(this);
        this.downstreamDataAcceptor = $ -> {};
    }

    public void shutdown() {
        if (this.downstreamDataAcceptor != null) {
            this.downstreamDataAcceptor.accept((Object)RpcMessage.of(-1, (Object)RpcControlMessage.CLOSE));
        }
    }

    public void startMonitoring() {
        this.monitoring = true;
    }

    public void stopMonitoring() {
        this.monitoring = false;
    }

    @JmxAttribute
    public EventStats getSuccessfulRequests() {
        return this.successfulRequests;
    }

    @JmxAttribute
    public EventStats getFailedRequests() {
        return this.failedRequests;
    }

    @JmxAttribute
    public ValueStats getRequestHandlingTime() {
        return this.requestHandlingTime;
    }

    @JmxAttribute
    public ExceptionStats getLastRequestHandlingException() {
        return this.lastRequestHandlingException;
    }

    @JmxAttribute
    public String getRemoteAddress() {
        return this.remoteAddress.toString();
    }

    public void refresh(long timestamp) {
        this.successfulRequests.refresh(timestamp);
        this.failedRequests.refresh(timestamp);
        this.requestHandlingTime.refresh(timestamp);
    }

    public String toString() {
        return "RpcServerConnection{address=" + this.remoteAddress + ", active=" + this.activeRequests + ", successes=" + this.successfulRequests.getTotalCount() + ", failures=" + this.failedRequests.getTotalCount() + '}';
    }
}

