/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark;

import com.facebook.presto.Session;
import com.facebook.presto.cost.PlanNodeStatsEstimate;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.spark.PrestoSparkQueryRunner;
import com.facebook.presto.spark.RddAndMore;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkQueryExecution;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskRdd;
import com.facebook.presto.spark.execution.FragmentExecutionResult;
import com.facebook.presto.spark.execution.PrestoSparkAdaptiveQueryExecution;
import com.facebook.presto.spark.execution.PrestoSparkStaticQueryExecution;
import com.facebook.presto.spark.execution.RuntimeStatistics;
import com.facebook.presto.sql.planner.SubPlan;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.QueryAssertions;
import com.google.common.base.Throwables;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.spark.Dependency;
import org.apache.spark.MapOutputStatistics;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rdd.ShuffledRDD;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestPrestoSparkQueryExecution
extends AbstractTestQueryFramework {
    PrestoSparkQueryRunner prestoSparkQueryRunner;

    protected QueryRunner createQueryRunner() {
        this.prestoSparkQueryRunner = PrestoSparkQueryRunner.createHivePrestoSparkQueryRunner();
        return this.prestoSparkQueryRunner;
    }

    private IPrestoSparkQueryExecution getPrestoSparkQueryExecution(Session session, String sql) {
        return this.prestoSparkQueryRunner.createPrestoSparkQueryExecution(session, sql, Optional.empty());
    }

    @Test
    public void testQueryExecutionCreation() {
        String sqlText = "select * from lineitem";
        Session session = Session.builder((Session)this.getSession()).setSystemProperty("spark_adaptive_query_execution_enabled", "false").build();
        IPrestoSparkQueryExecution psQueryExecution = this.getPrestoSparkQueryExecution(session, sqlText);
        Assert.assertTrue((boolean)(psQueryExecution instanceof PrestoSparkStaticQueryExecution));
        session = Session.builder((Session)this.getSession()).setSystemProperty("spark_adaptive_query_execution_enabled", "true").build();
        psQueryExecution = this.getPrestoSparkQueryExecution(session, sqlText);
        Assert.assertTrue((boolean)(psQueryExecution instanceof PrestoSparkAdaptiveQueryExecution));
    }

    @Test
    public void testSingleFragmentQueryAdaptiveExecution() {
        String sqlText = "select * from lineitem";
        Session session = Session.builder((Session)this.getSession()).setSystemProperty("spark_adaptive_query_execution_enabled", "false").build();
        MaterializedResult staticResults = this.prestoSparkQueryRunner.execute(session, sqlText);
        session = Session.builder((Session)this.getSession()).setSystemProperty("spark_adaptive_query_execution_enabled", "true").build();
        MaterializedResult dynamicResults = this.prestoSparkQueryRunner.execute(session, sqlText);
        QueryAssertions.assertEqualsIgnoreOrder((Iterable)staticResults, (Iterable)dynamicResults);
    }

    @Test
    public void testJoinQueryAdaptiveExecution() {
        String sqlText = "select * from lineitem l join orders o on l.orderkey = o.orderkey";
        Session session = Session.builder((Session)this.getSession()).setSystemProperty("spark_adaptive_query_execution_enabled", "false").setSystemProperty("join_distribution_type", "partitioned").setSystemProperty("spark_retry_on_out_of_memory_with_increased_memory_settings_enabled", "false").build();
        MaterializedResult staticResults = this.prestoSparkQueryRunner.execute(session, sqlText);
        session = Session.builder((Session)this.getSession()).setSystemProperty("spark_adaptive_query_execution_enabled", "true").setSystemProperty("join_distribution_type", "partitioned").setSystemProperty("spark_retry_on_out_of_memory_with_increased_memory_settings_enabled", "false").build();
        MaterializedResult dynamicResults = this.prestoSparkQueryRunner.execute(session, sqlText);
        QueryAssertions.assertEqualsIgnoreOrder((Iterable)staticResults, (Iterable)dynamicResults);
    }

    @Test
    public void testGroupByAdaptiveExecution() {
        String sqlText = "SELECT custkey, orderstatus FROM orders ORDER BY orderkey DESC LIMIT 10";
        Session session = Session.builder((Session)this.getSession()).setSystemProperty("spark_adaptive_query_execution_enabled", "false").setSystemProperty("join_distribution_type", "partitioned").setSystemProperty("spark_retry_on_out_of_memory_with_increased_memory_settings_enabled", "false").build();
        MaterializedResult staticResults = this.prestoSparkQueryRunner.execute(session, sqlText);
        session = Session.builder((Session)this.getSession()).setSystemProperty("spark_adaptive_query_execution_enabled", "true").setSystemProperty("join_distribution_type", "partitioned").setSystemProperty("spark_retry_on_out_of_memory_with_increased_memory_settings_enabled", "false").build();
        MaterializedResult dynamicResults = this.prestoSparkQueryRunner.execute(session, sqlText);
        QueryAssertions.assertEqualsIgnoreOrder((Iterable)staticResults, (Iterable)dynamicResults);
    }

    @Test
    public void testRddCreationForPartitionedJoinWithoutShuffle() {
        Session session = Session.builder((Session)this.getSession()).setSystemProperty("join_distribution_type", "partitioned").setSystemProperty("spark_retry_on_out_of_memory_with_increased_memory_settings_enabled", "false").build();
        String sql = "select * from lineitem l join orders o on l.orderkey = o.orderkey";
        this.validateFragmentedRddCreation(session, sql);
    }

    @Test
    public void testRddCreationForPartitionedJoinWithShuffle() {
        Session session = Session.builder((Session)this.getSession()).setSystemProperty("join_distribution_type", "partitioned").setSystemProperty("spark_retry_on_out_of_memory_with_increased_memory_settings_enabled", "false").build();
        String sql = "select * from lineitem l join orders o on l.orderkey = o.orderkey UNION ALL select * from lineitem l join orders o on l.orderkey = o.orderkey";
        this.validateFragmentedRddCreation(session, sql);
    }

    @Test
    public void testRddCreationForMemoryBasedBroadcastJoin() {
        Session session = Session.builder((Session)this.getSession()).setSystemProperty("join_distribution_type", "broadcast").setSystemProperty("spark_retry_on_out_of_memory_with_increased_memory_settings_enabled", "false").setSystemProperty("storage_based_broadcast_join_enabled", "false").build();
        String sql = "select * from lineitem l join orders o on l.orderkey = o.orderkey";
        this.validateFragmentedRddCreation(session, sql);
    }

    @Test
    public void testRddCreationForStorageBasedBroadcastJoin() {
        Session session = Session.builder((Session)this.getSession()).setSystemProperty("join_distribution_type", "broadcast").setSystemProperty("spark_retry_on_out_of_memory_with_increased_memory_settings_enabled", "false").setSystemProperty("storage_based_broadcast_join_enabled", "true").build();
        String sql = "select * from lineitem l join orders o on l.orderkey = o.orderkey";
        this.validateFragmentedRddCreation(session, sql);
    }

    @Test
    public void testMapOutputStatsExtraction() {
        Session session = Session.builder((Session)this.getSession()).setSystemProperty("join_distribution_type", "broadcast").setSystemProperty("spark_retry_on_out_of_memory_with_increased_memory_settings_enabled", "false").build();
        String sql = "select * from lineitem l join orders o on l.orderkey = o.orderkey";
        PrestoSparkStaticQueryExecution execution = (PrestoSparkStaticQueryExecution)this.getPrestoSparkQueryExecution(session, sql);
        Optional planNodeStatsEstimate = RuntimeStatistics.createRuntimeStats(Optional.empty());
        Assert.assertFalse((boolean)planNodeStatsEstimate.isPresent());
        planNodeStatsEstimate = RuntimeStatistics.createRuntimeStats(Optional.of(new MapOutputStatistics(0, new long[0])));
        Assert.assertEquals((double)((PlanNodeStatsEstimate)planNodeStatsEstimate.get()).getOutputSizeInBytes(), (double)0.0);
        planNodeStatsEstimate = RuntimeStatistics.createRuntimeStats(Optional.of(new MapOutputStatistics(0, new long[]{23L})));
        Assert.assertEquals((double)((PlanNodeStatsEstimate)planNodeStatsEstimate.get()).getOutputSizeInBytes(), (double)23.0);
        planNodeStatsEstimate = RuntimeStatistics.createRuntimeStats(Optional.of(new MapOutputStatistics(0, new long[]{23L, 520L, 190L})));
        Assert.assertEquals((double)((PlanNodeStatsEstimate)planNodeStatsEstimate.get()).getOutputSizeInBytes(), (double)733.0);
    }

    private void validateFragmentedRddCreation(Session session, String sql) {
        PrestoSparkStaticQueryExecution execution = (PrestoSparkStaticQueryExecution)this.getPrestoSparkQueryExecution(session, sql);
        RddAndMore rddAndMoreStatic = null;
        RddAndMore rddAndMoreFromFragmentedExecution = null;
        try {
            SubPlan rootFragmentedPlan = execution.createFragmentedPlan();
            TableWriteInfo tableWriteInfo = execution.getTableWriteInfo(session, rootFragmentedPlan);
            rddAndMoreStatic = execution.createRdd(rootFragmentedPlan, PrestoSparkSerializedPage.class, tableWriteInfo);
            FragmentExecutionResult fragmentExecutionResult = this.executeInStages(execution, rootFragmentedPlan, tableWriteInfo);
            rddAndMoreFromFragmentedExecution = fragmentExecutionResult.getRddAndMore();
        }
        catch (Exception e) {
            Assert.fail((String)"Failed while creating RDD", (Throwable)e);
        }
        this.assertRddAndMoreEquals(rddAndMoreStatic, rddAndMoreFromFragmentedExecution);
    }

    private FragmentExecutionResult executeInStages(PrestoSparkStaticQueryExecution execution, SubPlan rootFragmentedPlan, TableWriteInfo tableWriteInfo) {
        rootFragmentedPlan.getChildren().stream().map(SubPlan::getChildren).flatMap(Collection::stream).forEach(subPlan -> this.excecuteSubPlanWithUncheckedException(execution, (SubPlan)subPlan, tableWriteInfo, Optional.empty()));
        rootFragmentedPlan.getChildren().stream().forEach(subPlan -> this.excecuteSubPlanWithUncheckedException(execution, (SubPlan)subPlan, tableWriteInfo, Optional.empty()));
        return this.excecuteSubPlanWithUncheckedException(execution, rootFragmentedPlan, tableWriteInfo, Optional.empty());
    }

    private FragmentExecutionResult excecuteSubPlanWithUncheckedException(PrestoSparkStaticQueryExecution execution, SubPlan subPlan, TableWriteInfo tableWriteInfo, Optional<Class<?>> outputType) {
        try {
            return execution.executeFragment(subPlan, tableWriteInfo, outputType);
        }
        catch (Exception e) {
            Throwables.throwIfUnchecked((Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private void assertRddAndMoreEquals(RddAndMore rddAndMore1, RddAndMore rddAndMore2) {
        Assert.assertEquals((int)rddAndMore1.getBroadcastDependencies().size(), (int)rddAndMore2.getBroadcastDependencies().size());
        this.assertRddEquals(rddAndMore1.getRdd().rdd(), rddAndMore2.getRdd().rdd());
    }

    private void assertRddEquals(RDD rdd1, RDD rdd2) {
        Assert.assertEquals((String)rdd1.name(), (String)rdd2.name());
        Assert.assertEquals(rdd1.getClass(), rdd2.getClass());
        Assert.assertEquals((int)rdd1.getDependencies().size(), (int)rdd2.getDependencies().size());
        Assert.assertEquals((int)rdd1.getNumPartitions(), (int)rdd2.getNumPartitions());
        if (rdd1 instanceof PrestoSparkTaskRdd) {
            this.assertPrestoSparkTaskRddEquals((PrestoSparkTaskRdd)rdd1, (PrestoSparkTaskRdd)rdd2);
        } else if (rdd1 instanceof ShuffledRDD) {
            this.assertShuffledRddEquals((ShuffledRDD)rdd1, (ShuffledRDD)rdd2);
        }
    }

    private void assertShuffledRddEquals(ShuffledRDD shuffledRDD1, ShuffledRDD shuffledRDD2) {
        Assert.assertEquals((int)shuffledRDD1.getNumPartitions(), (int)shuffledRDD2.getNumPartitions());
        this.assertRddEquals(shuffledRDD1.prev(), shuffledRDD2.prev());
        for (int i = 0; i < shuffledRDD1.getDependencies().size(); ++i) {
            this.assertRddEquals(((Dependency)shuffledRDD1.getDependencies().apply(i)).rdd(), ((Dependency)shuffledRDD2.getDependencies().apply(i)).rdd());
        }
    }

    private void assertPrestoSparkTaskRddEquals(PrestoSparkTaskRdd prestoSparkTaskRdd1, PrestoSparkTaskRdd prestoSparkTaskRdd2) {
        int i;
        Assert.assertEquals((int)prestoSparkTaskRdd1.getShuffleInputRdds().size(), (int)prestoSparkTaskRdd2.getShuffleInputRdds().size(), (String)"Expected same number of shuffle inputs");
        Assert.assertEquals(prestoSparkTaskRdd1.getShuffleInputFragmentIds().stream().collect(Collectors.toSet()), prestoSparkTaskRdd2.getShuffleInputFragmentIds().stream().collect(Collectors.toSet()), (String)"Expected same input fragment ids");
        Assert.assertEquals((prestoSparkTaskRdd1.getTaskSourceRdd() == null ? 1 : 0) != 0, (prestoSparkTaskRdd2.getTaskSourceRdd() == null ? 1 : 0) != 0, (String)"Expected both RDDs to either contain TaskSourceRdd or not contain it");
        for (i = 0; i < prestoSparkTaskRdd1.getShuffleInputRdds().size(); ++i) {
            this.assertRddEquals((RDD)prestoSparkTaskRdd1.getShuffleInputRdds().get(i), (RDD)prestoSparkTaskRdd2.getShuffleInputRdds().get(i));
        }
        for (i = 0; i < prestoSparkTaskRdd1.getDependencies().size(); ++i) {
            this.assertRddEquals(((Dependency)prestoSparkTaskRdd1.getDependencies().apply(i)).rdd(), ((Dependency)prestoSparkTaskRdd2.getDependencies().apply(i)).rdd());
        }
    }
}

