/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.internal.server.tcp;

import com.hazelcast.cluster.Address;
import com.hazelcast.config.EndpointConfig;
import com.hazelcast.instance.EndpointQualifier;
import com.hazelcast.instance.ProtocolType;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.ProbeUnit;
import com.hazelcast.internal.networking.Channel;
import com.hazelcast.internal.networking.ChannelInitializer;
import com.hazelcast.internal.networking.Networking;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.internal.nio.ConnectionLifecycleListener;
import com.hazelcast.internal.nio.ConnectionListener;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.nio.Packet;
import com.hazelcast.internal.server.NetworkStats;
import com.hazelcast.internal.server.ServerConnection;
import com.hazelcast.internal.server.ServerConnectionManager;
import com.hazelcast.internal.server.ServerContext;
import com.hazelcast.internal.server.tcp.TcpServer;
import com.hazelcast.internal.server.tcp.TcpServerConnection;
import com.hazelcast.internal.server.tcp.TcpServerConnectionErrorHandler;
import com.hazelcast.internal.server.tcp.TcpServerConnector;
import com.hazelcast.internal.server.tcp.TcpServerControl;
import com.hazelcast.internal.util.ConcurrencyUtil;
import com.hazelcast.internal.util.ConstructorFunction;
import com.hazelcast.internal.util.MutableLong;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.counters.MwCounter;
import com.hazelcast.internal.util.executor.StripedRunnable;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.properties.ClusterProperty;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;

public class TcpServerConnectionManager
implements ServerConnectionManager,
Consumer<Packet>,
DynamicMetricsProvider {
    private static final int RETRY_NUMBER = 5;
    private static final long DELAY_FACTOR = 100L;
    final Plane[] planes;
    final int planeCount;
    @Probe(name="activeCount", level=ProbeLevel.MANDATORY)
    final Set<TcpServerConnection> connections = Collections.newSetFromMap(new ConcurrentHashMap());
    @Probe(name="acceptedSocketCount", level=ProbeLevel.MANDATORY)
    final Set<Channel> acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap());
    @Probe(name="connectionListenerCount")
    final Set<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<ConnectionListener>();
    final ConnectionLifecycleListenerImpl connectionLifecycleListener = new ConnectionLifecycleListenerImpl();
    private final ILogger logger;
    private final ServerContext serverContext;
    private final EndpointConfig endpointConfig;
    private final EndpointQualifier endpointQualifier;
    private final Function<EndpointQualifier, ChannelInitializer> channelInitializerFn;
    private final TcpServer server;
    private final TcpServerConnector connector;
    private final TcpServerControl serverControl;
    private final NetworkStatsImpl networkStats;
    private final ConstructorFunction<Address, TcpServerConnectionErrorHandler> errorHandlerConstructor = endpoint -> new TcpServerConnectionErrorHandler(this, (Address)endpoint);
    private final AtomicInteger connectionIdGen = new AtomicInteger();
    @Probe(name="openedCount")
    private final MwCounter openedCount = MwCounter.newMwCounter();
    @Probe(name="closedCount")
    private final MwCounter closedCount = MwCounter.newMwCounter();

    TcpServerConnectionManager(TcpServer server, EndpointConfig endpointConfig, Function<EndpointQualifier, ChannelInitializer> channelInitializerFn, ServerContext serverContext, Set<ProtocolType> supportedProtocolTypes) {
        this.server = server;
        this.endpointConfig = endpointConfig;
        this.endpointQualifier = endpointConfig != null ? endpointConfig.getQualifier() : null;
        this.channelInitializerFn = channelInitializerFn;
        this.planeCount = serverContext.properties().getInteger(ClusterProperty.CHANNEL_COUNT);
        this.serverContext = serverContext;
        this.logger = serverContext.getLoggingService().getLogger(TcpServerConnectionManager.class);
        this.connector = new TcpServerConnector(this);
        this.serverControl = new TcpServerControl(this, serverContext, this.logger, supportedProtocolTypes);
        this.networkStats = this.endpointQualifier == null ? null : new NetworkStatsImpl();
        this.planes = new Plane[this.planeCount];
        for (int planeIndex = 0; planeIndex < this.planes.length; ++planeIndex) {
            this.planes[planeIndex] = new Plane(planeIndex);
        }
    }

    @Override
    public TcpServer getServer() {
        return this.server;
    }

    public EndpointQualifier getEndpointQualifier() {
        return this.endpointQualifier;
    }

    @Override
    public Collection<ServerConnection> getConnections() {
        return Collections.unmodifiableSet(this.connections);
    }

    @Override
    public void addConnectionListener(ConnectionListener listener) {
        Preconditions.checkNotNull(listener, "listener can't be null");
        this.connectionListeners.add(listener);
    }

    @Override
    public synchronized void accept(Packet packet) {
        this.serverControl.process(packet);
    }

    @Override
    public ServerConnection get(Address address, int streamId) {
        return this.getPlane((int)streamId).connectionMap.get(address);
    }

    @Override
    public ServerConnection getOrConnect(Address address, int streamId) {
        return this.getOrConnect(address, false, streamId);
    }

    @Override
    public ServerConnection getOrConnect(Address address, boolean silent, int streamId) {
        Plane plane = this.getPlane(streamId);
        TcpServerConnection connection = plane.connectionMap.get(address);
        if (connection == null && this.server.isLive()) {
            if (plane.connectionsInProgress.add(address)) {
                if (this.logger.isFineEnabled()) {
                    this.logger.fine("Connection to: " + address + " streamId:" + streamId + " is not yet progress");
                }
                this.connector.asyncConnect(address, silent, plane.index);
            } else if (this.logger.isFineEnabled()) {
                this.logger.fine("Connection to: " + address + " streamId:" + streamId + " is already in progress");
            }
        }
        return connection;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized boolean register(final Address remoteAddress, ServerConnection c, int planeIndex) {
        Plane plane = this.planes[planeIndex];
        final TcpServerConnection connection = (TcpServerConnection)c;
        try {
            if (remoteAddress.equals(this.serverContext.getThisAddress())) {
                boolean bl = false;
                return bl;
            }
            if (!connection.isAlive()) {
                if (this.logger.isFinestEnabled()) {
                    this.logger.finest(connection + " to " + remoteAddress + " is not registered since connection is not active.");
                }
                boolean bl = false;
                return bl;
            }
            Address currentRemoteAddress = connection.getRemoteAddress();
            if (currentRemoteAddress != null && !currentRemoteAddress.equals(remoteAddress)) {
                throw new IllegalArgumentException(connection + " has already a different remoteAddress than: " + remoteAddress);
            }
            connection.setRemoteAddress(remoteAddress);
            if (!connection.isClient()) {
                connection.setErrorHandler(this.getErrorHandler(remoteAddress, plane.index, true));
            }
            plane.connectionMap.put(remoteAddress, connection);
            this.serverContext.getEventService().executeEventCallback(new StripedRunnable(){

                @Override
                public void run() {
                    TcpServerConnectionManager.this.connectionListeners.forEach(listener -> listener.connectionAdded(connection));
                }

                @Override
                public int getKey() {
                    return remoteAddress.hashCode();
                }
            });
            boolean bl = true;
            return bl;
        }
        finally {
            plane.connectionsInProgress.remove(remoteAddress);
        }
    }

    public Plane getPlane(int streamId) {
        int planeIndex = streamId == -1 || streamId == Integer.MIN_VALUE ? 0 : Math.abs(streamId) % this.planeCount;
        return this.planes[planeIndex];
    }

    private void fireConnectionRemovedEvent(final Connection connection, final Address endPoint) {
        if (this.server.isLive()) {
            this.serverContext.getEventService().executeEventCallback(new StripedRunnable(){

                @Override
                public void run() {
                    TcpServerConnectionManager.this.connectionListeners.forEach(listener -> listener.connectionRemoved(connection));
                }

                @Override
                public int getKey() {
                    return endPoint.hashCode();
                }
            });
        }
    }

    public synchronized void reset(boolean cleanListeners) {
        this.acceptedChannels.forEach(IOUtil::closeResource);
        for (Plane plane2 : this.planes) {
            plane2.connectionMap.values().forEach(conn -> IOUtil.close(conn, "TcpServer is stopping"));
            plane2.connectionMap.clear();
        }
        this.connections.forEach(conn -> IOUtil.close(conn, "TcpServer is stopping"));
        this.acceptedChannels.clear();
        Arrays.stream(this.planes).forEach(plane -> plane.connectionsInProgress.clear());
        Arrays.stream(this.planes).forEach(plane -> plane.errorHandlers.clear());
        this.connections.clear();
        if (cleanListeners) {
            this.connectionListeners.clear();
        }
    }

    @Override
    public boolean transmit(Packet packet, Address target, int streamId) {
        Preconditions.checkNotNull(packet, "packet can't be null");
        Preconditions.checkNotNull(target, "target can't be null");
        return this.send(packet, target, null, streamId);
    }

    @Override
    public NetworkStats getNetworkStats() {
        return this.networkStats;
    }

    void refreshNetworkStats() {
        if (this.networkStats != null) {
            this.networkStats.refresh();
        }
    }

    private TcpServerConnectionErrorHandler getErrorHandler(Address endpoint, int planeIndex, boolean reset) {
        ConcurrentHashMap<Address, TcpServerConnectionErrorHandler> errorHandlers = this.planes[planeIndex].errorHandlers;
        TcpServerConnectionErrorHandler handler = ConcurrencyUtil.getOrPutIfAbsent(errorHandlers, endpoint, this.errorHandlerConstructor);
        if (reset) {
            handler.reset();
        }
        return handler;
    }

    Channel newChannel(SocketChannel socketChannel, boolean clientMode) throws IOException {
        Networking networking = this.server.getNetworking();
        ChannelInitializer channelInitializer = this.channelInitializerFn.apply(this.endpointQualifier);
        assert (channelInitializer != null) : "Found NULL channel initializer for endpoint-qualifier " + this.endpointQualifier;
        Channel channel = networking.register(channelInitializer, socketChannel, clientMode);
        if (this.endpointConfig != null) {
            IOUtil.setChannelOptions(channel, this.endpointConfig);
        }
        this.acceptedChannels.add(channel);
        return channel;
    }

    void removeAcceptedChannel(Channel channel) {
        this.acceptedChannels.remove(channel);
    }

    void failedConnection(Address address, int planeIndex, Throwable t, boolean silent) {
        this.planes[planeIndex].connectionsInProgress.remove(address);
        this.serverContext.onFailedConnection(address);
        if (!silent) {
            this.getErrorHandler(address, planeIndex, false).onError(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized TcpServerConnection newConnection(Channel channel, Address remoteAddress) {
        try {
            if (!this.server.isLive()) {
                throw new IllegalStateException("connection manager is not live!");
            }
            TcpServerConnection connection = new TcpServerConnection(this, this.connectionLifecycleListener, this.connectionIdGen.incrementAndGet(), channel);
            connection.setRemoteAddress(remoteAddress);
            this.connections.add(connection);
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Established socket connection between " + channel.localSocketAddress() + " and " + channel.remoteSocketAddress());
            }
            this.openedCount.inc();
            channel.start();
            TcpServerConnection tcpServerConnection = connection;
            return tcpServerConnection;
        }
        finally {
            this.acceptedChannels.remove(channel);
        }
    }

    private boolean send(Packet packet, Address target, SendTask sendTask, int streamId) {
        block6: {
            int retries;
            ServerConnection connection = this.get(target, streamId);
            if (connection != null) {
                return connection.write(packet);
            }
            if (sendTask == null) {
                sendTask = new SendTask(packet, target, streamId);
            }
            if ((retries = sendTask.retries) < 5 && this.serverContext.isNodeActive()) {
                this.getOrConnect(target, true, streamId);
                try {
                    this.server.scheduleDeferred(sendTask, (long)(retries + 1) * 100L, TimeUnit.MILLISECONDS);
                    return true;
                }
                catch (RejectedExecutionException e) {
                    if (this.server.isLive()) {
                        throw e;
                    }
                    if (!this.logger.isFinestEnabled()) break block6;
                    this.logger.finest("Packet send task is rejected. Packet cannot be sent to " + target);
                }
            }
        }
        return false;
    }

    public String toString() {
        return "TcpServerConnectionManager{endpointQualifier=" + this.endpointQualifier + ", connectionsMap=" + null + '}';
    }

    @Probe(name="inProgressCount")
    private int connectionsInProgress() {
        int c = 0;
        for (Plane plane : this.planes) {
            c += plane.connectionsInProgress.size();
        }
        return c;
    }

    @Override
    @Probe(name="count", level=ProbeLevel.MANDATORY)
    public int connectionCount() {
        int c = 0;
        for (Plane plane : this.planes) {
            c += plane.connectionMap.size();
        }
        return c;
    }

    @Override
    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        MetricDescriptor rootDescriptor = descriptor.withPrefix("tcp.connection");
        if (this.endpointQualifier == null) {
            context.collect(rootDescriptor.copy(), this);
        } else {
            context.collect(rootDescriptor.copy().withDiscriminator("endpoint", this.endpointQualifier.toMetricsPrefixString()), this);
        }
        for (TcpServerConnection connection : this.connections) {
            if (connection.getRemoteAddress() == null) continue;
            context.collect(rootDescriptor.copy().withDiscriminator("endpoint", connection.getRemoteAddress().toString()), connection);
        }
        int clientCount = 0;
        int textCount = 0;
        for (Plane plane : this.planes) {
            for (Map.Entry<Address, TcpServerConnection> entry : plane.connectionMap.entrySet()) {
                Address bindAddress = entry.getKey();
                TcpServerConnection connection = entry.getValue();
                if (connection.isClient()) {
                    ++clientCount;
                    String connectionType = connection.getConnectionType();
                    if ("REST".equals(connectionType) || "MEMCACHE".equals(connectionType)) {
                        ++textCount;
                    }
                }
                if (connection.getRemoteAddress() == null) continue;
                context.collect(rootDescriptor.copy().withDiscriminator("bindAddress", bindAddress.toString()).withTag("endpoint", connection.getRemoteAddress().toString()), connection);
            }
        }
        if (this.endpointConfig == null) {
            context.collect(rootDescriptor.copy(), "clientCount", ProbeLevel.MANDATORY, ProbeUnit.COUNT, clientCount);
            context.collect(rootDescriptor.copy(), "textCount", ProbeLevel.MANDATORY, ProbeUnit.COUNT, textCount);
        }
    }

    private class NetworkStatsImpl
    implements NetworkStats {
        private final AtomicLong bytesReceivedLastCalc = new AtomicLong();
        private final MwCounter bytesReceivedOnClosed = MwCounter.newMwCounter();
        private final AtomicLong bytesSentLastCalc = new AtomicLong();
        private final MwCounter bytesSentOnClosed = MwCounter.newMwCounter();

        private NetworkStatsImpl() {
        }

        @Override
        public long getBytesReceived() {
            return this.bytesReceivedLastCalc.get();
        }

        @Override
        public long getBytesSent() {
            return this.bytesSentLastCalc.get();
        }

        void refresh() {
            MutableLong totalReceived = MutableLong.valueOf(this.bytesReceivedOnClosed.get());
            MutableLong totalSent = MutableLong.valueOf(this.bytesSentOnClosed.get());
            TcpServerConnectionManager.this.connections.forEach(conn -> {
                totalReceived.value += conn.getChannel().bytesRead();
                totalSent.value += conn.getChannel().bytesWritten();
            });
            this.bytesReceivedLastCalc.updateAndGet(v -> Math.max(v, totalReceived.value));
            this.bytesSentLastCalc.updateAndGet(v -> Math.max(v, totalSent.value));
        }

        void onConnectionClose(TcpServerConnection connection) {
            this.bytesReceivedOnClosed.inc(connection.getChannel().bytesRead());
            this.bytesSentOnClosed.inc(connection.getChannel().bytesWritten());
        }
    }

    private final class ConnectionLifecycleListenerImpl
    implements ConnectionLifecycleListener<TcpServerConnection> {
        private ConnectionLifecycleListenerImpl() {
        }

        @Override
        public void onConnectionClose(TcpServerConnection connection, Throwable cause, boolean silent) {
            Address remoteAddress;
            TcpServerConnectionManager.this.closedCount.inc();
            TcpServerConnectionManager.this.connections.remove(connection);
            if (TcpServerConnectionManager.this.networkStats != null) {
                TcpServerConnectionManager.this.networkStats.onConnectionClose(connection);
            }
            if ((remoteAddress = connection.getRemoteAddress()) != null) {
                int planeIndex = connection.getPlaneIndex();
                if (planeIndex > -1) {
                    Plane plane = TcpServerConnectionManager.this.planes[connection.getPlaneIndex()];
                    plane.connectionsInProgress.remove(remoteAddress);
                    plane.connectionMap.remove(remoteAddress);
                    TcpServerConnectionManager.this.fireConnectionRemovedEvent(connection, remoteAddress);
                } else {
                    boolean removed = false;
                    for (Plane plane : TcpServerConnectionManager.this.planes) {
                        plane.connectionsInProgress.remove(remoteAddress);
                        Iterator<TcpServerConnection> connections = plane.connectionMap.values().iterator();
                        while (connections.hasNext()) {
                            TcpServerConnection cxn = connections.next();
                            if (cxn.getConnectionId() != connection.getConnectionId()) continue;
                            connections.remove();
                            removed = true;
                        }
                    }
                    if (removed) {
                        TcpServerConnectionManager.this.fireConnectionRemovedEvent(connection, remoteAddress);
                    }
                }
            }
            if (cause != null) {
                TcpServerConnectionManager.this.serverContext.onFailedConnection(remoteAddress);
                if (!silent) {
                    TcpServerConnectionManager.this.getErrorHandler(remoteAddress, connection.getPlaneIndex(), false).onError(cause);
                }
            }
        }
    }

    private final class SendTask
    implements Runnable {
        private final Packet packet;
        private final Address target;
        private final int streamId;
        private volatile int retries;

        private SendTask(Packet packet, Address target, int streamId) {
            this.packet = packet;
            this.target = target;
            this.streamId = streamId;
        }

        @Override
        @SuppressFBWarnings(value={"VO_VOLATILE_INCREMENT"}, justification="single-writer, many-reader")
        public void run() {
            ++this.retries;
            if (TcpServerConnectionManager.this.logger.isFinestEnabled()) {
                TcpServerConnectionManager.this.logger.finest("Retrying[" + this.retries + "] packet send operation to: " + this.target);
            }
            TcpServerConnectionManager.this.send(this.packet, this.target, this, this.streamId);
        }
    }

    static class Plane {
        final ConcurrentHashMap<Address, TcpServerConnection> connectionMap = new ConcurrentHashMap(100);
        final Set<Address> connectionsInProgress = Collections.newSetFromMap(new ConcurrentHashMap());
        final ConcurrentHashMap<Address, TcpServerConnectionErrorHandler> errorHandlers = new ConcurrentHashMap(100);
        final int index;

        Plane(int index) {
            this.index = index;
        }
    }
}

