/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.query.pushdown;

import java.io.Serializable;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.exceptions.KylinTimeoutException;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.engine.spark.metadata.cube.StructField;
import org.apache.kylin.query.pushdown.SparkSqlClient;
import org.apache.kylin.query.runtime.plans.QueryToExecutionIDCache$;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.hive.utils.QueryMetricUtils$;
import org.apache.spark.sql.hive.utils.ResourceDetectUtils$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.utils.SparkTypeUtil$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Array$;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.Tuple5;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Buffer$;
import scala.math.Numeric;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;

public final class SparkSqlClient$ {
    public static SparkSqlClient$ MODULE$;
    private final Logger logger;

    static {
        new SparkSqlClient$();
    }

    public Logger logger() {
        return this.logger;
    }

    public Pair<List<List<String>>, List<StructField>> executeSql(SparkSession ss, String sql) {
        ss.sparkContext().setLocalProperty("spark.scheduler.pool", "query_pushdown");
        HadoopUtil.setCurrentConfiguration((Configuration)ss.sparkContext().hadoopConfiguration());
        String queryId = QueryContextFacade.current().getQueryId();
        ss.sparkContext().setLocalProperty(QueryToExecutionIDCache$.MODULE$.KYLIN_QUERY_ID_KEY(), queryId);
        this.logger().info("Start to run sql with SparkSQL...");
        Dataset df = ss.sql(sql);
        this.autoSetShufflePartitions(ss, (Dataset<Row>)df);
        return this.DFToList(ss, sql, (Dataset<Row>)df);
    }

    private void autoSetShufflePartitions(SparkSession ss, Dataset<Row> df) {
        block2: {
            KylinConfig config = KylinConfig.getInstanceFromEnv();
            if (!config.isAutoSetPushDownPartitions()) break block2;
            try {
                int basePartitionSize = config.getBaseShufflePartitionSize();
                Seq paths = ResourceDetectUtils$.MODULE$.getPaths(df.queryExecution().sparkPlan());
                String sourceTableSize = new StringBuilder(1).append(ResourceDetectUtils$.MODULE$.getResourceSize(paths)).append("b").toString();
                String partitions = Long.toString(Math.max(1L, JavaUtils.byteStringAsMb((String)sourceTableSize) / (long)basePartitionSize));
                df.sparkSession().conf().set("spark.sql.shuffle.partitions", partitions);
                this.logger().info(new StringBuilder(69).append("Auto set spark.sql.shuffle.partitions to ").append(partitions).append(", the total sources ").append("size is ").append(sourceTableSize).toString());
            }
            catch (Throwable e) {
                this.logger().error("Auto set spark.sql.shuffle.partitions failed.", e);
            }
        }
    }

    private Pair<List<List<String>>, List<StructField>> DFToList(SparkSession ss, String sql, Dataset<Row> df) {
        Pair pair;
        String jobGroup = Thread.currentThread().getName();
        ss.sparkContext().setJobGroup(jobGroup, new StringBuilder(19).append("Pushdown Query Id: ").append(QueryContextFacade.current().getQueryId()).toString(), true);
        try {
            try {
                String[] temporarySchema = (String[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])df.schema().fields())).zipWithIndex(Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).map((Function1 & Serializable & scala.Serializable)x0$1 -> {
                    Tuple2 tuple2 = x0$1;
                    if (tuple2 == null) {
                        throw new MatchError((Object)tuple2);
                    }
                    int index = tuple2._2$mcI$sp();
                    String string = new StringBuilder(10).append("temporary_").append(index).toString();
                    return string;
                }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)));
                Dataset tempDF = df.toDF((Seq)Predef$.MODULE$.wrapRefArray((Object[])temporarySchema));
                Seq columns = (Seq)tempDF.schema().map((Function1 & Serializable & scala.Serializable)tp -> functions$.MODULE$.col(new StringBuilder(2).append("`").append(tp.name()).append("`").toString()).cast((DataType)StringType$.MODULE$), Seq$.MODULE$.canBuildFrom());
                Dataset frame = tempDF.select(columns);
                List rowList = (List)JavaConverters$.MODULE$.seqAsJavaListConverter(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])frame.collect())).map((Function1 & Serializable & scala.Serializable)x$1 -> (List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)x$1.toSeq().map((Function1 & Serializable & scala.Serializable)x$2 -> (String)x$2, Seq$.MODULE$.canBuildFrom())).asJava(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(List.class))))).toSeq()).asJava();
                List fieldList = (List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)df.schema().map((Function1 & Serializable & scala.Serializable)field -> SparkTypeUtil$.MODULE$.convertSparkFieldToJavaField(field), Seq$.MODULE$.canBuildFrom())).asJava();
                Tuple5 tuple5 = QueryMetricUtils$.MODULE$.collectScanMetrics(frame.queryExecution().executedPlan());
                if (tuple5 == null) {
                    throw new MatchError((Object)tuple5);
                }
                List scanRows = (List)tuple5._1();
                List scanFiles = (List)tuple5._2();
                List metadataTime = (List)tuple5._3();
                List scanTime = (List)tuple5._4();
                List scanBytes = (List)tuple5._5();
                Tuple5 tuple52 = new Tuple5((Object)scanRows, (Object)scanFiles, (Object)metadataTime, (Object)scanTime, (Object)scanBytes);
                Tuple5 tuple53 = tuple52;
                List scanRows2 = (List)tuple53._1();
                List scanFiles2 = (List)tuple53._2();
                List metadataTime2 = (List)tuple53._3();
                List scanTime2 = (List)tuple53._4();
                List scanBytes2 = (List)tuple53._5();
                QueryContextFacade.current().addAndGetScannedRows(BoxesRunTime.unboxToLong((Object)((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(scanRows2).asScala()).map((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToLong((long)Predef$.MODULE$.Long2long(x$4)), Buffer$.MODULE$.canBuildFrom())).sum((Numeric)Numeric.LongIsIntegral$.MODULE$)));
                QueryContextFacade.current().addAndGetScanFiles(BoxesRunTime.unboxToLong((Object)((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(scanFiles2).asScala()).map((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToLong((long)Predef$.MODULE$.Long2long(x$5)), Buffer$.MODULE$.canBuildFrom())).sum((Numeric)Numeric.LongIsIntegral$.MODULE$)));
                QueryContextFacade.current().addAndGetScannedBytes(BoxesRunTime.unboxToLong((Object)((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(scanBytes2).asScala()).map((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToLong((long)Predef$.MODULE$.Long2long(x$6)), Buffer$.MODULE$.canBuildFrom())).sum((Numeric)Numeric.LongIsIntegral$.MODULE$)));
                QueryContextFacade.current().addAndGetMetadataTime(BoxesRunTime.unboxToLong((Object)((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(metadataTime2).asScala()).map((Function1 & Serializable & scala.Serializable)x$7 -> BoxesRunTime.boxToLong((long)Predef$.MODULE$.Long2long(x$7)), Buffer$.MODULE$.canBuildFrom())).sum((Numeric)Numeric.LongIsIntegral$.MODULE$)));
                QueryContextFacade.current().addAndGetScanTime(BoxesRunTime.unboxToLong((Object)((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(scanTime2).asScala()).map((Function1 & Serializable & scala.Serializable)x$8 -> BoxesRunTime.boxToLong((long)Predef$.MODULE$.Long2long(x$8)), Buffer$.MODULE$.canBuildFrom())).sum((Numeric)Numeric.LongIsIntegral$.MODULE$)));
                pair = Pair.newPair((Object)rowList, (Object)fieldList);
            }
            catch (Throwable e) {
                if (e instanceof InterruptedException) {
                    ss.sparkContext().cancelJobGroup(jobGroup);
                    this.logger().info("Query timeout ", e);
                    Thread.currentThread().interrupt();
                    throw new KylinTimeoutException(new StringBuilder(22).append("Query timeout after: ").append(KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds()).append("s").toString());
                }
                throw e;
            }
        }
        finally {
            HadoopUtil.setCurrentConfiguration(null);
        }
        return pair;
    }

    private SparkSqlClient$() {
        MODULE$ = this;
        this.logger = LoggerFactory.getLogger(SparkSqlClient.class);
    }
}

