/*
 * Decompiled with CFR 0.152.
 */
package org.apache.skywalking.oap.server.receiver.envoy;

import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v2.AccessLogServiceGrpc;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsResponse;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AccessLogServiceGRPCHandler
extends AccessLogServiceGrpc.AccessLogServiceImplBase {
    private static final Logger LOGGER = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class);
    private final List<ALSHTTPAnalysis> envoyHTTPAnalysisList;
    private final List<TCPAccessLogAnalyzer> envoyTCPAnalysisList;
    private final CounterMetrics counter;
    private final HistogramMetrics histogram;
    private final CounterMetrics sourceDispatcherCounter;

    public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException {
        ServiceLoader<ALSHTTPAnalysis> alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class);
        ServiceLoader<TCPAccessLogAnalyzer> alsTcpAnalyzers = ServiceLoader.load(TCPAccessLogAnalyzer.class);
        this.envoyHTTPAnalysisList = new ArrayList<ALSHTTPAnalysis>();
        for (String httpAnalysisName : config.getAlsHTTPAnalysis()) {
            for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) {
                if (!httpAnalysisName.equals(httpAnalysis.name())) continue;
                httpAnalysis.init(manager, config);
                this.envoyHTTPAnalysisList.add(httpAnalysis);
            }
        }
        this.envoyTCPAnalysisList = new ArrayList<TCPAccessLogAnalyzer>();
        for (String analyzerName : config.getAlsTCPAnalysis()) {
            for (TCPAccessLogAnalyzer tcpAnalyzer : alsTcpAnalyzers) {
                if (!analyzerName.equals(tcpAnalyzer.name())) continue;
                tcpAnalyzer.init(manager, config);
                this.envoyTCPAnalysisList.add(tcpAnalyzer);
            }
        }
        LOGGER.debug("envoy HTTP analysis: {}, envoy TCP analysis: {}", this.envoyHTTPAnalysisList, this.envoyTCPAnalysisList);
        MetricsCreator metricCreator = (MetricsCreator)manager.find("telemetry").provider().getService(MetricsCreator.class);
        this.counter = metricCreator.createCounter("envoy_als_in_count", "The count of envoy ALS message received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
        this.histogram = metricCreator.createHistogramMetric("envoy_als_in_latency", "The process latency of service ALS metric receiver", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE, new double[0]);
        this.sourceDispatcherCounter = metricCreator.createCounter("envoy_als_source_dispatch_count", "The count of envoy ALS metric received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
    }

    public StreamObserver<StreamAccessLogsMessage> streamAccessLogs(StreamObserver<StreamAccessLogsResponse> responseObserver) {
        return this.streamAccessLogs(responseObserver, false);
    }

    public StreamObserver<StreamAccessLogsMessage> streamAccessLogs(final StreamObserver<StreamAccessLogsResponse> responseObserver, final boolean alwaysAnalyzeIdentity) {
        return new StreamObserver<StreamAccessLogsMessage>(){
            private volatile boolean isFirst = true;
            private Role role;
            private StreamAccessLogsMessage.Identifier identifier;

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onNext(StreamAccessLogsMessage message) {
                HistogramMetrics.Timer timer = AccessLogServiceGRPCHandler.this.histogram.createTimer();
                try {
                    if (this.isFirst || alwaysAnalyzeIdentity && message.hasIdentifier()) {
                        this.identifier = message.getIdentifier();
                        this.isFirst = false;
                        this.role = Role.NONE;
                        for (ALSHTTPAnalysis analysis : AccessLogServiceGRPCHandler.this.envoyHTTPAnalysisList) {
                            this.role = analysis.identify(this.identifier, this.role);
                        }
                    }
                    StreamAccessLogsMessage.LogEntriesCase logCase = message.getLogEntriesCase();
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Messaged is identified from Envoy[{}], role[{}] in [{}]. Received msg {}", new Object[]{this.identifier.getNode().getId(), this.role, logCase, message});
                    }
                    ArrayList<ServiceMeshMetric.Builder> sourceResult = new ArrayList<ServiceMeshMetric.Builder>();
                    switch (logCase) {
                        case HTTP_LOGS: {
                            StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs();
                            AccessLogServiceGRPCHandler.this.counter.inc((double)logs.getLogEntryCount());
                            for (HTTPAccessLogEntry log : logs.getLogEntryList()) {
                                AccessLogAnalyzer.Result result = AccessLogAnalyzer.Result.builder().build();
                                for (ALSHTTPAnalysis analysis : AccessLogServiceGRPCHandler.this.envoyHTTPAnalysisList) {
                                    result = analysis.analysis(result, this.identifier, log, this.role);
                                }
                                if (!CollectionUtils.isNotEmpty(result.getMetrics())) continue;
                                sourceResult.addAll(result.getMetrics());
                            }
                            break;
                        }
                        case TCP_LOGS: {
                            StreamAccessLogsMessage.TCPAccessLogEntries tcpLogs = message.getTcpLogs();
                            AccessLogServiceGRPCHandler.this.counter.inc((double)tcpLogs.getLogEntryCount());
                            for (TCPAccessLogEntry tcpLog : tcpLogs.getLogEntryList()) {
                                AccessLogAnalyzer.Result result = AccessLogAnalyzer.Result.builder().build();
                                for (TCPAccessLogAnalyzer analyzer : AccessLogServiceGRPCHandler.this.envoyTCPAnalysisList) {
                                    result = analyzer.analysis(result, this.identifier, tcpLog, this.role);
                                }
                                if (!CollectionUtils.isNotEmpty(result.getMetrics())) continue;
                                sourceResult.addAll(result.getMetrics());
                            }
                            break;
                        }
                    }
                    AccessLogServiceGRPCHandler.this.sourceDispatcherCounter.inc((double)sourceResult.size());
                    sourceResult.forEach(TelemetryDataDispatcher::process);
                }
                finally {
                    timer.finish();
                }
            }

            public void onError(Throwable throwable) {
                LOGGER.error("Error in receiving access log from envoy", throwable);
                responseObserver.onCompleted();
            }

            public void onCompleted() {
                responseObserver.onNext((Object)StreamAccessLogsResponse.newBuilder().build());
                responseObserver.onCompleted();
            }
        };
    }
}

