/*
 * Decompiled with CFR 0.152.
 */
package org.apache.skywalking.oap.server.receiver.envoy;

import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v2.AccessLogServiceGrpc;
import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsResponse;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceLoader;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AccessLogServiceGRPCHandler
extends AccessLogServiceGrpc.AccessLogServiceImplBase {
    private static final Logger LOGGER = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class);
    private final List<ALSHTTPAnalysis> envoyHTTPAnalysisList;
    private final SourceReceiver sourceReceiver;
    private final CounterMetrics counter;
    private final HistogramMetrics histogram;
    private final CounterMetrics sourceDispatcherCounter;

    public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverConfig config) {
        ServiceLoader<ALSHTTPAnalysis> alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class);
        this.envoyHTTPAnalysisList = new ArrayList<ALSHTTPAnalysis>();
        for (String httpAnalysisName : config.getAlsHTTPAnalysis()) {
            for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) {
                if (!httpAnalysisName.equals(httpAnalysis.name())) continue;
                httpAnalysis.init(config);
                this.envoyHTTPAnalysisList.add(httpAnalysis);
            }
        }
        LOGGER.debug("envoy HTTP analysis: " + this.envoyHTTPAnalysisList);
        this.sourceReceiver = (SourceReceiver)manager.find("core").provider().getService(SourceReceiver.class);
        MetricsCreator metricCreator = (MetricsCreator)manager.find("telemetry").provider().getService(MetricsCreator.class);
        this.counter = metricCreator.createCounter("envoy_als_in_count", "The count of envoy ALS metric received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
        this.histogram = metricCreator.createHistogramMetric("envoy_als_in_latency", "The process latency of service ALS metric receiver", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE, new double[0]);
        this.sourceDispatcherCounter = metricCreator.createCounter("envoy_als_source_dispatch_count", "The count of envoy ALS metric received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
    }

    public StreamObserver<StreamAccessLogsMessage> streamAccessLogs(final StreamObserver<StreamAccessLogsResponse> responseObserver) {
        return new StreamObserver<StreamAccessLogsMessage>(){
            private volatile boolean isFirst = true;
            private Role role;
            private StreamAccessLogsMessage.Identifier identifier;

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Enabled aggressive block sorting
             * Enabled unnecessary exception pruning
             * Enabled aggressive exception aggregation
             */
            public void onNext(StreamAccessLogsMessage message) {
                AccessLogServiceGRPCHandler.this.counter.inc();
                HistogramMetrics.Timer timer = AccessLogServiceGRPCHandler.this.histogram.createTimer();
                try {
                    if (this.isFirst) {
                        this.identifier = message.getIdentifier();
                        this.isFirst = false;
                        this.role = Role.NONE;
                        for (ALSHTTPAnalysis analysis : AccessLogServiceGRPCHandler.this.envoyHTTPAnalysisList) {
                            this.role = analysis.identify(this.identifier, this.role);
                        }
                    }
                    StreamAccessLogsMessage.LogEntriesCase logCase = message.getLogEntriesCase();
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Messaged is identified from Envoy[{}], role[{}] in [{}]. Received msg {}", new Object[]{this.identifier.getNode().getId(), this.role, logCase, message});
                    }
                    switch (logCase) {
                        case HTTP_LOGS: {
                            StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs();
                            ArrayList sourceResult = new ArrayList();
                            Iterator iterator = AccessLogServiceGRPCHandler.this.envoyHTTPAnalysisList.iterator();
                            while (true) {
                                if (!iterator.hasNext()) {
                                    AccessLogServiceGRPCHandler.this.sourceDispatcherCounter.inc((double)sourceResult.size());
                                    sourceResult.forEach(arg_0 -> ((SourceReceiver)AccessLogServiceGRPCHandler.this.sourceReceiver).receive(arg_0));
                                    return;
                                }
                                ALSHTTPAnalysis analysis = (ALSHTTPAnalysis)iterator.next();
                                logs.getLogEntryList().forEach(log -> sourceResult.addAll(analysis.analysis(this.identifier, (HTTPAccessLogEntry)log, this.role)));
                            }
                        }
                    }
                    return;
                }
                finally {
                    timer.finish();
                }
            }

            public void onError(Throwable throwable) {
                LOGGER.error("Error in receiving access log from envoy", throwable);
                responseObserver.onCompleted();
            }

            public void onCompleted() {
                responseObserver.onNext((Object)StreamAccessLogsResponse.newBuilder().build());
                responseObserver.onCompleted();
            }
        };
    }
}

