/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.spark.agent.lifecycle.plan;

import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.DatasetFactory;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.QueryPlanVisitor;
import io.openlineage.spark.shaded.com.fasterxml.jackson.databind.JsonNode;
import io.openlineage.spark.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.LogicalRelation;
import org.apache.spark.sql.sources.BaseRelation;
import org.apache.spark.sql.sources.CreatableRelationProvider;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;

public class KafkaRelationVisitor<D extends OpenLineage.Dataset>
extends QueryPlanVisitor<LogicalRelation, D> {
    private static final Logger log = LoggerFactory.getLogger(KafkaRelationVisitor.class);
    private static final String KAFKA_SOURCE_PROVIDER_CLASS_NAME = "org.apache.spark.sql.kafka010.KafkaSourceProvider";
    private static final String KAFKA_RELATION_CLASS_NAME = "org.apache.spark.sql.kafka010.KafkaRelation";
    private static final AtomicBoolean KAFKA_PROVIDER_CLASS_PRESENT = new AtomicBoolean(false);
    private static final AtomicBoolean KAFKA_PROVIDER_CHECKED = new AtomicBoolean(false);
    private static final AtomicBoolean KAFKA_RELATION_CLASS_PRESENT = new AtomicBoolean(false);
    private static final AtomicBoolean KAFKA_RELATION_CHECKED = new AtomicBoolean(false);
    private final DatasetFactory<D> datasetFactory;

    public KafkaRelationVisitor(OpenLineageContext context, DatasetFactory<D> datasetFactory) {
        super(context);
        this.datasetFactory = datasetFactory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static boolean hasKafkaClasses() {
        log.debug("Checking if Kafka classes are available");
        if (KAFKA_PROVIDER_CHECKED.get()) return KAFKA_PROVIDER_CLASS_PRESENT.get();
        log.debug("Kafka classes have not been checked yet");
        Class<KafkaRelationVisitor> clazz = KafkaRelationVisitor.class;
        synchronized (KafkaRelationVisitor.class) {
            if (KAFKA_PROVIDER_CHECKED.get()) return KAFKA_PROVIDER_CLASS_PRESENT.get();
            boolean available = KafkaRelationVisitor.checkWithCurrentClassClassLoader(KAFKA_SOURCE_PROVIDER_CLASS_NAME) || KafkaRelationVisitor.checkWithCurrentThreadContextClassLoader(KAFKA_SOURCE_PROVIDER_CLASS_NAME);
            KAFKA_PROVIDER_CLASS_PRESENT.set(available);
            KAFKA_PROVIDER_CHECKED.set(true);
            log.debug("Kafka classes availability: " + available);
            // ** MonitorExit[var0] (shouldn't be in output)
            return KAFKA_PROVIDER_CLASS_PRESENT.get();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private static boolean hasKafkaRelationClass() {
        log.debug("Checking if KafkaRelation class is available");
        if (KAFKA_RELATION_CHECKED.get()) return KAFKA_RELATION_CLASS_PRESENT.get();
        log.debug("KafkaRelation class has not been checked yet");
        Class<KafkaRelationVisitor> clazz = KafkaRelationVisitor.class;
        synchronized (KafkaRelationVisitor.class) {
            if (KAFKA_RELATION_CHECKED.get()) return KAFKA_RELATION_CLASS_PRESENT.get();
            boolean available = KafkaRelationVisitor.checkWithCurrentClassClassLoader(KAFKA_RELATION_CLASS_NAME) || KafkaRelationVisitor.checkWithCurrentThreadContextClassLoader(KAFKA_RELATION_CLASS_NAME);
            KAFKA_RELATION_CLASS_PRESENT.set(available);
            KAFKA_RELATION_CHECKED.set(true);
            log.debug("KafkaRelation class availability: " + available);
            // ** MonitorExit[var0] (shouldn't be in output)
            return KAFKA_RELATION_CLASS_PRESENT.get();
        }
    }

    private static boolean checkWithCurrentClassClassLoader(String className) {
        try {
            KafkaRelationVisitor.class.getClassLoader().loadClass(className);
            return true;
        }
        catch (Exception e) {
            return false;
        }
    }

    private static boolean checkWithCurrentThreadContextClassLoader(String className) {
        return KafkaRelationVisitor.loadClassWithCurrentThreadContextClassLoader(className) != null;
    }

    @Nullable
    private static Class<?> loadClassWithCurrentThreadContextClassLoader(String className) {
        try {
            return Thread.currentThread().getContextClassLoader().loadClass(className);
        }
        catch (Exception e) {
            return null;
        }
    }

    public static boolean isKafkaSource(CreatableRelationProvider provider) {
        log.debug("Checking if provider is KafkaSourceProvider");
        if (!KafkaRelationVisitor.hasKafkaClasses()) {
            log.debug("Kafka classes are not available to check whether provider is KafkaSourceProvider");
            return false;
        }
        try {
            log.debug("Checking if provider is KafkaSourceProvider");
            Class<?> c = KafkaRelationVisitor.loadClassWithCurrentThreadContextClassLoader(KAFKA_SOURCE_PROVIDER_CLASS_NAME);
            if (c == null) {
                return false;
            }
            return c.isAssignableFrom(provider.getClass());
        }
        catch (Exception e) {
            return false;
        }
    }

    public static <D extends OpenLineage.Dataset> List<D> createKafkaDatasets(DatasetFactory<D> datasetFactory, CreatableRelationProvider relationProvider, Map<String, String> options, SaveMode mode, StructType schema) {
        return KafkaRelationVisitor.createDatasetsFromOptions(datasetFactory, options, schema);
    }

    @Override
    public boolean isDefinedAt(LogicalPlan x) {
        if (!KafkaRelationVisitor.hasKafkaRelationClass()) {
            return false;
        }
        if (!(x instanceof LogicalRelation)) {
            return false;
        }
        Class<?> c = KafkaRelationVisitor.loadClassWithCurrentThreadContextClassLoader(KAFKA_RELATION_CLASS_NAME);
        if (c == null) {
            return false;
        }
        LogicalRelation logicalRelation = (LogicalRelation)x;
        BaseRelation baseRelation = logicalRelation.relation();
        if (log.isDebugEnabled()) {
            log.debug("Checking if {} is assignable from {}", (Object)c.getCanonicalName(), (Object)baseRelation.getClass().getCanonicalName());
        }
        return c.isAssignableFrom(baseRelation.getClass());
    }

    public List<D> apply(LogicalPlan x) {
        Map sourceOptions;
        LogicalRelation logicalRelation = (LogicalRelation)x;
        BaseRelation relation = logicalRelation.relation();
        try {
            Field sourceOptionsField = relation.getClass().getDeclaredField("sourceOptions");
            sourceOptionsField.setAccessible(true);
            sourceOptions = (Map)sourceOptionsField.get(relation);
        }
        catch (Exception e) {
            log.error("Can't extract kafka server options", (Throwable)e);
            sourceOptions = Map$.MODULE$.empty();
        }
        return KafkaRelationVisitor.createDatasetsFromOptions(this.datasetFactory, (Map<String, String>)sourceOptions, relation.schema());
    }

    private static <D extends OpenLineage.Dataset> List<D> createDatasetsFromOptions(DatasetFactory<D> datasetFactory, Map<String, String> sourceOptions, StructType schema) {
        Optional<String> servers = ScalaConversionUtils.asJavaOptional(sourceOptions.get((Object)"kafka.bootstrap.servers"));
        List topics = Stream.concat(Stream.of("subscribe", "topic").map(it -> sourceOptions.get(it)).filter(it -> it.nonEmpty()).map(it -> (String)it.get()).map(String.class::cast), ScalaConversionUtils.asJavaOptional(sourceOptions.get((Object)"assign")).map(str -> {
            try {
                JsonNode jsonNode = new ObjectMapper().readTree((String)str);
                long fieldCount = jsonNode.size();
                return StreamSupport.stream(Spliterators.spliterator(jsonNode.fieldNames(), fieldCount, 0), false);
            }
            catch (IOException e) {
                log.warn("Unable to find topics from Kafka source configuration {}", str, (Object)e);
                return Stream.empty();
            }
        }).orElse(Stream.empty())).collect(Collectors.toList());
        String server = servers.map(str -> {
            if (!str.matches("\\w+://.*")) {
                return "PLAINTEXT://" + str;
            }
            return str;
        }).map(str -> URI.create(str.split(",")[0])).map(uri -> uri.getHost() + ":" + uri.getPort()).orElse("");
        String namespace = "kafka://" + server;
        return topics.stream().map(topic -> datasetFactory.getDataset((String)topic, namespace, schema)).collect(Collectors.toList());
    }
}

