/*
 * Decompiled with CFR 0.152.
 */
package org.apache.skywalking.oap.server.receiver.envoy.als;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Duration;
import com.google.protobuf.Timestamp;
import io.envoyproxy.envoy.api.v2.core.Address;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties;
import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties;
import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.apis.ExtensionsV1beta1Api;
import io.kubernetes.client.models.V1ObjectMeta;
import io.kubernetes.client.models.V1OwnerReference;
import io.kubernetes.client.models.V1Pod;
import io.kubernetes.client.models.V1PodList;
import io.kubernetes.client.util.Config;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
import org.apache.skywalking.apm.network.common.DetectPoint;
import org.apache.skywalking.apm.network.servicemesh.Protocol;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
import org.apache.skywalking.oap.server.core.source.Source;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.DependencyResource;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class K8sALSServiceMeshHTTPAnalysis
implements ALSHTTPAnalysis {
    private static final Logger logger = LoggerFactory.getLogger(K8sALSServiceMeshHTTPAnalysis.class);
    private static final String ADDRESS_TYPE_INTERNAL_IP = "InternalIP";
    private static final String VALID_PHASE = "Running";
    private final AtomicReference<Map<String, ServiceMetaInfo>> ipServiceMap = new AtomicReference();
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("load-pod-%d").setDaemon(true).build());

    @Override
    public String name() {
        return "k8s-mesh";
    }

    @Override
    public void init(EnvoyMetricReceiverConfig config) {
        this.executorService.scheduleAtFixedRate(this::loadPodInfo, 0L, 15L, TimeUnit.SECONDS);
    }

    private boolean invalidPodList() {
        Map<String, ServiceMetaInfo> map = this.ipServiceMap.get();
        return map == null || map.isEmpty();
    }

    private void loadPodInfo() {
        try {
            ApiClient client = Config.defaultClient();
            client.getHttpClient().setReadTimeout(20L, TimeUnit.SECONDS);
            Configuration.setDefaultApiClient((ApiClient)client);
            CoreV1Api api = new CoreV1Api();
            V1PodList list = api.listPodForAllNamespaces(null, null, null, null, null, null, null, null, null);
            HashMap<String, ServiceMetaInfo> ipMap = new HashMap<String, ServiceMetaInfo>(list.getItems().size());
            long startTime = System.nanoTime();
            for (V1Pod item : list.getItems()) {
                if (!item.getStatus().getPhase().equals(VALID_PHASE)) {
                    logger.debug("Invalid pod {} is not in a valid phase {}", (Object)item.getMetadata().getName(), (Object)item.getStatus().getPhase());
                    continue;
                }
                if (item.getStatus().getPodIP().equals(item.getStatus().getHostIP())) {
                    logger.debug("Pod {}.{} is removed because hostIP and podIP are identical ", (Object)item.getMetadata().getName(), (Object)item.getMetadata().getNamespace());
                    continue;
                }
                ipMap.put(item.getStatus().getPodIP(), this.createServiceMetaInfo(item.getMetadata()));
            }
            logger.info("Load {} pods in {}ms", (Object)ipMap.size(), (Object)((System.nanoTime() - startTime) / 1000000L));
            this.ipServiceMap.set(ipMap);
        }
        catch (Throwable th) {
            logger.error("run load pod error", th);
        }
    }

    private ServiceMetaInfo createServiceMetaInfo(V1ObjectMeta podMeta) {
        ExtensionsV1beta1Api extensionsApi = new ExtensionsV1beta1Api();
        DependencyResource dr = new DependencyResource(podMeta);
        DependencyResource meta = dr.getOwnerResource("ReplicaSet", ownerReference -> extensionsApi.readNamespacedReplicaSet(ownerReference.getName(), podMeta.getNamespace(), "", Boolean.valueOf(true), Boolean.valueOf(true)).getMetadata());
        ServiceMetaInfo result = new ServiceMetaInfo();
        if (meta.getMetadata().getOwnerReferences() != null && meta.getMetadata().getOwnerReferences().size() > 0) {
            V1OwnerReference owner = (V1OwnerReference)meta.getMetadata().getOwnerReferences().get(0);
            result.setServiceName(String.format("%s.%s", owner.getName(), meta.getMetadata().getNamespace()));
        } else {
            result.setServiceName(String.format("%s.%s", meta.getMetadata().getName(), meta.getMetadata().getNamespace()));
        }
        result.setServiceInstanceName(String.format("%s.%s", podMeta.getName(), podMeta.getNamespace()));
        result.setTags(this.transformLabelsToTags(podMeta.getLabels()));
        return result;
    }

    private List<ServiceMetaInfo.KeyValue> transformLabelsToTags(Map<String, String> labels) {
        if (labels == null || labels.size() < 1) {
            return Collections.emptyList();
        }
        ArrayList<ServiceMetaInfo.KeyValue> result = new ArrayList<ServiceMetaInfo.KeyValue>(labels.size());
        for (Map.Entry<String, String> each : labels.entrySet()) {
            result.add(new ServiceMetaInfo.KeyValue(each.getKey(), each.getValue()));
        }
        return result;
    }

    @Override
    public List<Source> analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) {
        if (this.invalidPodList()) {
            return Collections.emptyList();
        }
        switch (role) {
            case PROXY: {
                this.analysisProxy(identifier, entry);
                break;
            }
            case SIDECAR: {
                return this.analysisSideCar(identifier, entry);
            }
        }
        return Collections.emptyList();
    }

    protected List<Source> analysisSideCar(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry) {
        String cluster;
        ArrayList<Source> sources = new ArrayList<Source>();
        AccessLogCommon properties = entry.getCommonProperties();
        if (properties != null && (cluster = properties.getUpstreamCluster()) != null) {
            long startTime = this.formatAsLong(properties.getStartTime());
            long duration = this.formatAsLong(properties.getTimeToLastDownstreamTxByte());
            HTTPRequestProperties request = entry.getRequest();
            String endpoint = "/";
            Protocol protocol = Protocol.HTTP;
            if (request != null) {
                endpoint = request.getPath();
                String schema = request.getScheme();
                protocol = "http".equals(schema) || "https".equals(schema) ? Protocol.HTTP : Protocol.gRPC;
            }
            HTTPResponseProperties response = entry.getResponse();
            int responseCode = 200;
            if (response != null) {
                responseCode = response.getResponseCode().getValue();
            }
            boolean status = responseCode >= 200 && responseCode < 400;
            Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress();
            ServiceMetaInfo downstreamService = this.find(downstreamRemoteAddress.getSocketAddress().getAddress(), downstreamRemoteAddress.getSocketAddress().getPortValue());
            Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
            ServiceMetaInfo localService = this.find(downstreamLocalAddress.getSocketAddress().getAddress(), downstreamLocalAddress.getSocketAddress().getPortValue());
            if (cluster.startsWith("inbound|")) {
                if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
                    ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime).setEndTime(startTime + duration).setDestServiceName(localService.getServiceName()).setDestServiceInstance(localService.getServiceInstanceName()).setEndpoint(endpoint).setLatency((int)duration).setResponseCode(Math.toIntExact(responseCode)).setStatus(status).setProtocol(protocol).setDetectPoint(DetectPoint.server).build();
                    logger.debug("Transformed ingress->sidecar inbound mesh metric {}", (Object)metric);
                    this.forward(metric);
                } else {
                    ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime).setEndTime(startTime + duration).setSourceServiceName(downstreamService.getServiceName()).setSourceServiceInstance(downstreamService.getServiceInstanceName()).setDestServiceName(localService.getServiceName()).setDestServiceInstance(localService.getServiceInstanceName()).setEndpoint(endpoint).setLatency((int)duration).setResponseCode(Math.toIntExact(responseCode)).setStatus(status).setProtocol(protocol).setDetectPoint(DetectPoint.server).build();
                    logger.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", (Object)metric);
                    this.forward(metric);
                }
            } else if (cluster.startsWith("outbound|")) {
                Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
                ServiceMetaInfo destService = this.find(upstreamRemoteAddress.getSocketAddress().getAddress(), upstreamRemoteAddress.getSocketAddress().getPortValue());
                ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime).setEndTime(startTime + duration).setSourceServiceName(downstreamService.getServiceName()).setSourceServiceInstance(downstreamService.getServiceInstanceName()).setDestServiceName(destService.getServiceName()).setDestServiceInstance(destService.getServiceInstanceName()).setEndpoint(endpoint).setLatency((int)duration).setResponseCode(Math.toIntExact(responseCode)).setStatus(status).setProtocol(protocol).setDetectPoint(DetectPoint.client).build();
                logger.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", (Object)metric);
                this.forward(metric);
            }
        }
        return sources;
    }

    protected void analysisProxy(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry) {
        AccessLogCommon properties = entry.getCommonProperties();
        if (properties != null) {
            Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
            Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress();
            Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
            if (downstreamLocalAddress != null && downstreamRemoteAddress != null && upstreamRemoteAddress != null) {
                SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress();
                ServiceMetaInfo outside = this.find(downstreamRemoteAddressSocketAddress.getAddress(), downstreamRemoteAddressSocketAddress.getPortValue());
                SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress();
                ServiceMetaInfo ingress = this.find(downstreamLocalAddressSocketAddress.getAddress(), downstreamLocalAddressSocketAddress.getPortValue());
                long startTime = this.formatAsLong(properties.getStartTime());
                long duration = this.formatAsLong(properties.getTimeToLastDownstreamTxByte());
                HTTPRequestProperties request = entry.getRequest();
                String endpoint = "/";
                Protocol protocol = Protocol.HTTP;
                if (request != null) {
                    endpoint = request.getPath();
                    String schema = request.getScheme();
                    protocol = "http".equals(schema) || "https".equals(schema) ? Protocol.HTTP : Protocol.gRPC;
                }
                HTTPResponseProperties response = entry.getResponse();
                int responseCode = 200;
                if (response != null) {
                    responseCode = response.getResponseCode().getValue();
                }
                boolean status = responseCode >= 200 && responseCode < 400;
                ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime).setEndTime(startTime + duration).setSourceServiceName(outside.getServiceName()).setSourceServiceInstance(outside.getServiceInstanceName()).setDestServiceName(ingress.getServiceName()).setDestServiceInstance(ingress.getServiceInstanceName()).setEndpoint(endpoint).setLatency((int)duration).setResponseCode(Math.toIntExact(responseCode)).setStatus(status).setProtocol(protocol).setDetectPoint(DetectPoint.server).build();
                logger.debug("Transformed ingress inbound mesh metric {}", (Object)metric);
                this.forward(metric);
                SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress();
                ServiceMetaInfo targetService = this.find(upstreamRemoteAddressSocketAddress.getAddress(), upstreamRemoteAddressSocketAddress.getPortValue());
                long outboundStartTime = startTime + this.formatAsLong(properties.getTimeToFirstUpstreamTxByte());
                long outboundEndTime = startTime + this.formatAsLong(properties.getTimeToLastUpstreamRxByte());
                ServiceMeshMetric outboundMetric = ServiceMeshMetric.newBuilder().setStartTime(outboundStartTime).setEndTime(outboundEndTime).setSourceServiceName(ingress.getServiceName()).setSourceServiceInstance(ingress.getServiceInstanceName()).setDestServiceName(targetService.getServiceName()).setDestServiceInstance(targetService.getServiceInstanceName()).setEndpoint(endpoint).setLatency((int)(outboundEndTime - outboundStartTime)).setResponseCode(Math.toIntExact(responseCode)).setStatus(status).setProtocol(protocol).setDetectPoint(DetectPoint.client).build();
                logger.debug("Transformed ingress outbound mesh metric {}", (Object)outboundMetric);
                this.forward(outboundMetric);
            }
        }
    }

    @Override
    public Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev) {
        Node node;
        if (alsIdentifier != null && (node = alsIdentifier.getNode()) != null) {
            String id = node.getId();
            if (id.startsWith("router~")) {
                return Role.PROXY;
            }
            if (id.startsWith("sidecar~")) {
                return Role.SIDECAR;
            }
        }
        return prev;
    }

    protected ServiceMetaInfo find(String ip, int port) {
        Map<String, ServiceMetaInfo> map = this.ipServiceMap.get();
        if (map == null) {
            logger.debug("Unknown ip {}, ip -> service is null", (Object)ip);
            return ServiceMetaInfo.UNKNOWN;
        }
        if (map.containsKey(ip)) {
            return map.get(ip);
        }
        logger.debug("Unknown ip {}, ip -> service is {}", (Object)ip, map);
        return ServiceMetaInfo.UNKNOWN;
    }

    protected void forward(ServiceMeshMetric metric) {
        TelemetryDataDispatcher.preProcess((ServiceMeshMetric)metric);
    }

    private long formatAsLong(Timestamp timestamp) {
        return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli();
    }

    private long formatAsLong(Duration duration) {
        return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli();
    }

    protected AtomicReference<Map<String, ServiceMetaInfo>> getIpServiceMap() {
        return this.ipServiceMap;
    }
}

