/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.nakadi.service;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.InetAddresses;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.zalando.nakadi.util.FeatureToggleService;

@Component
public class ClosedConnectionsCrutch {
    private final int port;
    private final Map<ConnectionInfo, List<BooleanSupplier>> listeners = new HashMap<ConnectionInfo, List<BooleanSupplier>>();
    private final Map<ConnectionInfo, List<BooleanSupplier>> toAdd = new HashMap<ConnectionInfo, List<BooleanSupplier>>();
    private final Meter meterClosed;
    private final FeatureToggleService featureToggleService;
    private static final Logger LOG = LoggerFactory.getLogger(ClosedConnectionsCrutch.class);
    private static final Set<ConnectionState> CLOSED_STATES = ImmutableSet.of((Object)((Object)ConnectionState.TCP_TIME_WAIT), (Object)((Object)ConnectionState.TCP_CLOSE_WAIT), (Object)((Object)ConnectionState.TCP_LAST_ACK), (Object)((Object)ConnectionState.TCP_CLOSING), (Object)((Object)ConnectionState.TCP_CLOSE));

    @Autowired
    public ClosedConnectionsCrutch(@Value(value="${server.port}") int port, MetricRegistry metricRegistry, FeatureToggleService featureToggleService) {
        this.port = port;
        this.meterClosed = metricRegistry.meter("nakadi.close_crutch.closed");
        this.featureToggleService = featureToggleService;
    }

    public AtomicBoolean listenForConnectionClose(HttpServletRequest request) throws UnknownHostException {
        AtomicBoolean connectionReady = new AtomicBoolean(true);
        this.listenForConnectionClose(InetAddress.getByName(request.getRemoteAddr()), request.getRemotePort(), () -> connectionReady.compareAndSet(true, false));
        return connectionReady;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void listenForConnectionClose(InetAddress address, int port, BooleanSupplier onCloseListener) {
        if (!this.featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.CONNECTION_CLOSE_CRUTCH)) {
            return;
        }
        LOG.debug("Listening for connection to close using crutch (" + address + ":" + port + ")");
        Map<ConnectionInfo, List<BooleanSupplier>> map = this.toAdd;
        synchronized (map) {
            this.toAdd.computeIfAbsent(new ConnectionInfo(address, port), tmp -> new ArrayList()).add(onCloseListener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Scheduled(fixedDelay=1000L)
    public void refresh() throws IOException {
        if (!this.featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.CONNECTION_CLOSE_CRUTCH)) {
            return;
        }
        Map<ConnectionInfo, List<BooleanSupplier>> map = this.toAdd;
        synchronized (map) {
            this.toAdd.forEach((conn, toAddListeners) -> this.listeners.computeIfAbsent((ConnectionInfo)conn, c -> new ArrayList()).addAll(toAddListeners));
            this.toAdd.clear();
        }
        Map<ConnectionInfo, ConnectionState> currentConnections = this.readAllConnectionStates();
        long closedCount = new HashSet<ConnectionInfo>(this.listeners.keySet()).stream().filter(key -> CLOSED_STATES.contains((Object)currentConnections.getOrDefault(key, ConnectionState.TCP_CLOSE))).mapToLong(key -> {
            LOG.info("Notifying about connection close via crutch: " + key);
            return this.listeners.remove(key).stream().filter(BooleanSupplier::getAsBoolean).count();
        }).sum();
        if (closedCount > 0L) {
            this.meterClosed.mark(closedCount);
        }
    }

    private Map<ConnectionInfo, ConnectionState> readAllConnectionStates() throws IOException {
        HashMap<ConnectionInfo, ConnectionState> result = new HashMap<ConnectionInfo, ConnectionState>();
        for (File f : new File[]{new File("/proc/net/tcp"), new File("/proc/net/tcp6")}) {
            try (FileInputStream in = new FileInputStream(f);){
                result.putAll(this.getCurrentConnections(in));
            }
            catch (FileNotFoundException e) {
                LOG.warn("Failed to find file " + f.getName() + ", skipping");
            }
        }
        return result;
    }

    @VisibleForTesting
    Map<ConnectionInfo, ConnectionState> getCurrentConnections(InputStream in) throws IOException {
        HashMap<ConnectionInfo, ConnectionState> connectionToState = new HashMap<ConnectionInfo, ConnectionState>();
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(in));){
            String line;
            reader.readLine();
            while (null != (line = reader.readLine())) {
                StringTokenizer tokenizer = new StringTokenizer(line);
                tokenizer.nextToken();
                try {
                    String[] localAddress = tokenizer.nextToken().split(":");
                    int localPort = Integer.parseInt(localAddress[1], 16);
                    if (localPort != this.port) continue;
                    String[] remoteAddressPair = tokenizer.nextToken().split(":");
                    InetAddress[] remoteAddresses = ClosedConnectionsCrutch.restoreAddresses(remoteAddressPair[0]);
                    int remotePort = Integer.parseInt(remoteAddressPair[1], 16);
                    ConnectionState connectionState = ConnectionState.fromCode(tokenizer.nextToken());
                    Stream.of(remoteAddresses).map(address -> new ConnectionInfo((InetAddress)address, remotePort)).forEach(info -> connectionToState.put((ConnectionInfo)info, connectionState));
                }
                catch (RuntimeException | DecoderException ex) {
                    LOG.error("Failed to parse line, skipping: " + line, ex);
                }
            }
        }
        return connectionToState;
    }

    private static InetAddress[] restoreAddresses(String address) throws DecoderException, UnknownHostException {
        byte[] data = Hex.decodeHex((char[])address.toCharArray());
        if (data.length == 16) {
            for (int i = 0; i < 4; ++i) {
                ArrayUtils.reverse((byte[])data, (int)(i * 4), (int)(i * 4 + 4));
            }
            InetAddress result = InetAddress.getByAddress(data);
            if (result instanceof Inet6Address && InetAddresses.hasEmbeddedIPv4ClientAddress((Inet6Address)((Inet6Address)result))) {
                return new InetAddress[]{result, InetAddresses.getEmbeddedIPv4ClientAddress((Inet6Address)((Inet6Address)result))};
            }
            return new InetAddress[]{result};
        }
        ArrayUtils.reverse((byte[])data);
        return new InetAddress[]{InetAddress.getByAddress(data)};
    }

    static class ConnectionInfo {
        private final InetAddress address;
        private final int port;

        ConnectionInfo(InetAddress address, int port) {
            this.address = address;
            this.port = port;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ConnectionInfo that = (ConnectionInfo)o;
            return this.port == that.port && this.address.equals(that.address);
        }

        public int hashCode() {
            int result = this.address.hashCode();
            result = 31 * result + this.port;
            return result;
        }

        public String toString() {
            return "ConnectionInfo{address=" + this.address + ", port=" + this.port + '}';
        }
    }

    static enum ConnectionState {
        TCP_ESTABLISHED(1),
        TCP_SYN_SENT(2),
        TCP_SYN_RECV(3),
        TCP_FIN_WAIT1(4),
        TCP_FIN_WAIT2(5),
        TCP_TIME_WAIT(6),
        TCP_CLOSE(7),
        TCP_CLOSE_WAIT(8),
        TCP_LAST_ACK(9),
        TCP_LISTEN(10),
        TCP_CLOSING(11),
        TCP_NEW_SYN_RECV(12);

        private final int stateCode;

        private ConnectionState(int stateCode) {
            this.stateCode = stateCode;
        }

        public static ConnectionState fromCode(String value) {
            int intCode = Integer.parseInt(value, 16);
            for (ConnectionState val : ConnectionState.values()) {
                if (val.stateCode != intCode) continue;
                return val;
            }
            throw new IllegalArgumentException("Failed to find connection state from " + value);
        }
    }
}

