/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.hive;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.hive.HiveQueryRunner;
import com.facebook.presto.metadata.AllNodes;
import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.spi.security.Identity;
import com.facebook.presto.spi.security.SelectedRole;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.TestingSession;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.tpch.TpchTable;
import io.airlift.units.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.intellij.lang.annotations.Language;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestHiveRecoverableExecution {
    private static final Logger log = Logger.get(TestHiveRecoverableExecution.class);
    private static final int TEST_TIMEOUT = 120000;
    private static final int INVOCATION_COUNT = 1;
    private DistributedQueryRunner queryRunner;
    private ListeningExecutorService executor;

    @BeforeClass
    public void setUp() throws Exception {
        this.queryRunner = this.createQueryRunner();
        this.executor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newCachedThreadPool());
    }

    private DistributedQueryRunner createQueryRunner() throws Exception {
        ImmutableMap.Builder extraPropertiesBuilder = ImmutableMap.builder().put((Object)"exchange.max-error-duration", (Object)"5m").put((Object)"exchange.http-client.request-timeout", (Object)"1s");
        ImmutableMap.Builder extraCoordinatorPropertiesBuilder = ImmutableMap.builder();
        extraCoordinatorPropertiesBuilder.put((Object)"failure-detector.enabled", (Object)"true").put((Object)"failure-detector.heartbeat-interval", (Object)"1s").put((Object)"failure-detector.http-client.request-timeout", (Object)"500ms").put((Object)"failure-detector.exponential-decay-seconds", (Object)"1").put((Object)"failure-detector.threshold", (Object)"0.1").put((Object)"max-failed-task-percentage", (Object)"0.6").put((Object)"scheduler.http-client.request-timeout", (Object)"5s").put((Object)"query.remote-task.max-error-duration", (Object)"1s").put((Object)"use-legacy-scheduler", (Object)"false");
        return HiveQueryRunner.createQueryRunner(ImmutableList.of((Object)TpchTable.ORDERS), (Map<String, String>)extraPropertiesBuilder.build(), (Map<String, String>)extraCoordinatorPropertiesBuilder.build(), Optional.empty());
    }

    @AfterClass(alwaysRun=true)
    public void shutdown() {
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
        this.executor = null;
        if (this.queryRunner != null) {
            this.queryRunner.close();
        }
        this.queryRunner = null;
    }

    @DataProvider(name="testSettings")
    public static Object[][] testSettings() {
        return new Object[][]{{1, true}, {2, false}, {2, true}};
    }

    @Test(timeOut=120000L, dataProvider="testSettings", invocationCount=1)
    public void testCreateBucketedTable(int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled) throws Exception {
        this.testRecoverableGroupedExecution(this.queryRunner, writerConcurrency, optimizedPartitionUpdateSerializationEnabled, (List<String>)ImmutableList.of((Object)"CREATE TABLE create_bucketed_table_1\nWITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT orderkey key1, comment value1 FROM orders", (Object)"CREATE TABLE create_bucketed_table_2\nWITH (bucket_count = 13, bucketed_by = ARRAY['key2']) AS\nSELECT orderkey key2, comment value2 FROM orders", (Object)"CREATE TABLE create_bucketed_table_3\nWITH (bucket_count = 13, bucketed_by = ARRAY['key3']) AS\nSELECT orderkey key3, comment value3 FROM orders"), "CREATE TABLE create_bucketed_table_success WITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT key1, value1, key2, value2, key3, value3\nFROM create_bucketed_table_1\nJOIN create_bucketed_table_2\nON key1 = key2\nJOIN create_bucketed_table_3\nON key2 = key3", "CREATE TABLE create_bucketed_table_failure WITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT key1, value1, key2, value2, key3, value3\nFROM create_bucketed_table_1\nJOIN create_bucketed_table_2\nON key1 = key2\nJOIN create_bucketed_table_3\nON key2 = key3", 15000, (List<String>)ImmutableList.of((Object)"DROP TABLE IF EXISTS create_bucketed_table_1", (Object)"DROP TABLE IF EXISTS create_bucketed_table_2", (Object)"DROP TABLE IF EXISTS create_bucketed_table_3", (Object)"DROP TABLE IF EXISTS create_bucketed_table_success", (Object)"DROP TABLE IF EXISTS create_bucketed_table_failure"));
    }

    @Test(timeOut=120000L, dataProvider="testSettings", invocationCount=1)
    public void testInsertBucketedTable(int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled) throws Exception {
        this.testRecoverableGroupedExecution(this.queryRunner, writerConcurrency, optimizedPartitionUpdateSerializationEnabled, (List<String>)ImmutableList.of((Object)"CREATE TABLE insert_bucketed_table_1\nWITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT orderkey key1, comment value1 FROM orders", (Object)"CREATE TABLE insert_bucketed_table_2\nWITH (bucket_count = 13, bucketed_by = ARRAY['key2']) AS\nSELECT orderkey key2, comment value2 FROM orders", (Object)"CREATE TABLE insert_bucketed_table_3\nWITH (bucket_count = 13, bucketed_by = ARRAY['key3']) AS\nSELECT orderkey key3, comment value3 FROM orders", (Object)"CREATE TABLE insert_bucketed_table_success (key BIGINT, value VARCHAR, partition_key VARCHAR)\nWITH (bucket_count = 13, bucketed_by = ARRAY['key'], partitioned_by = ARRAY['partition_key'])", (Object)"CREATE TABLE insert_bucketed_table_failure (key BIGINT, value VARCHAR, partition_key VARCHAR)\nWITH (bucket_count = 13, bucketed_by = ARRAY['key'], partitioned_by = ARRAY['partition_key'])"), "INSERT INTO insert_bucketed_table_success\nSELECT key1, value1, 'foo'\nFROM insert_bucketed_table_1\nJOIN insert_bucketed_table_2\nON key1 = key2\nJOIN insert_bucketed_table_3\nON key2 = key3", "INSERT INTO insert_bucketed_table_failure\nSELECT key1, value1, 'foo'\nFROM insert_bucketed_table_1\nJOIN insert_bucketed_table_2\nON key1 = key2\nJOIN insert_bucketed_table_3\nON key2 = key3", 15000, (List<String>)ImmutableList.of((Object)"DROP TABLE IF EXISTS insert_bucketed_table_1", (Object)"DROP TABLE IF EXISTS insert_bucketed_table_2", (Object)"DROP TABLE IF EXISTS insert_bucketed_table_3", (Object)"DROP TABLE IF EXISTS insert_bucketed_table_success", (Object)"DROP TABLE IF EXISTS insert_bucketed_table_failure"));
    }

    @Test(timeOut=120000L, dataProvider="testSettings", invocationCount=1)
    public void testCreateUnbucketedTableWithGroupedExecution(int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled) throws Exception {
        this.testRecoverableGroupedExecution(this.queryRunner, writerConcurrency, optimizedPartitionUpdateSerializationEnabled, (List<String>)ImmutableList.of((Object)"CREATE TABLE create_unbucketed_table_with_grouped_execution_1\nWITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT orderkey key1, comment value1 FROM orders", (Object)"CREATE TABLE create_unbucketed_table_with_grouped_execution_2\nWITH (bucket_count = 13, bucketed_by = ARRAY['key2']) AS\nSELECT orderkey key2, comment value2 FROM orders", (Object)"CREATE TABLE create_unbucketed_table_with_grouped_execution_3\nWITH (bucket_count = 13, bucketed_by = ARRAY['key3']) AS\nSELECT orderkey key3, comment value3 FROM orders"), "CREATE TABLE create_unbucketed_table_with_grouped_execution_success AS\nSELECT key1, value1, key2, value2, key3, value3\nFROM create_unbucketed_table_with_grouped_execution_1\nJOIN create_unbucketed_table_with_grouped_execution_2\nON key1 = key2\nJOIN create_unbucketed_table_with_grouped_execution_3\nON key2 = key3", "CREATE TABLE create_unbucketed_table_with_grouped_execution_failure AS\nSELECT key1, value1, key2, value2, key3, value3\nFROM create_unbucketed_table_with_grouped_execution_1\nJOIN create_unbucketed_table_with_grouped_execution_2\nON key1 = key2\nJOIN create_unbucketed_table_with_grouped_execution_3\nON key2 = key3", 15000, (List<String>)ImmutableList.of((Object)"DROP TABLE IF EXISTS create_unbucketed_table_with_grouped_execution_1", (Object)"DROP TABLE IF EXISTS create_unbucketed_table_with_grouped_execution_2", (Object)"DROP TABLE IF EXISTS create_unbucketed_table_with_grouped_execution_3", (Object)"DROP TABLE IF EXISTS create_unbucketed_table_with_grouped_execution_success", (Object)"DROP TABLE IF EXISTS create_unbucketed_table_with_grouped_execution_failure"));
    }

    @Test(timeOut=120000L, dataProvider="testSettings", invocationCount=1)
    public void testInsertUnbucketedTableWithGroupedExecution(int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled) throws Exception {
        this.testRecoverableGroupedExecution(this.queryRunner, writerConcurrency, optimizedPartitionUpdateSerializationEnabled, (List<String>)ImmutableList.of((Object)"CREATE TABLE insert_unbucketed_table_with_grouped_execution_1\nWITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT orderkey key1, comment value1 FROM orders", (Object)"CREATE TABLE insert_unbucketed_table_with_grouped_execution_2\nWITH (bucket_count = 13, bucketed_by = ARRAY['key2']) AS\nSELECT orderkey key2, comment value2 FROM orders", (Object)"CREATE TABLE insert_unbucketed_table_with_grouped_execution_3\nWITH (bucket_count = 13, bucketed_by = ARRAY['key3']) AS\nSELECT orderkey key3, comment value3 FROM orders", (Object)"CREATE TABLE insert_unbucketed_table_with_grouped_execution_success (key BIGINT, value VARCHAR, partition_key VARCHAR)\nWITH (partitioned_by = ARRAY['partition_key'])", (Object)"CREATE TABLE insert_unbucketed_table_with_grouped_execution_failure (key BIGINT, value VARCHAR, partition_key VARCHAR)\nWITH (partitioned_by = ARRAY['partition_key'])"), "INSERT INTO insert_unbucketed_table_with_grouped_execution_success\nSELECT key1, value1, 'foo'\nFROM insert_unbucketed_table_with_grouped_execution_1\nJOIN insert_unbucketed_table_with_grouped_execution_2\nON key1 = key2\nJOIN insert_unbucketed_table_with_grouped_execution_3\nON key2 = key3", "INSERT INTO insert_unbucketed_table_with_grouped_execution_failure\nSELECT key1, value1, 'foo'\nFROM insert_unbucketed_table_with_grouped_execution_1\nJOIN insert_unbucketed_table_with_grouped_execution_2\nON key1 = key2\nJOIN insert_unbucketed_table_with_grouped_execution_3\nON key2 = key3", 15000, (List<String>)ImmutableList.of((Object)"DROP TABLE IF EXISTS insert_unbucketed_table_with_grouped_execution_1", (Object)"DROP TABLE IF EXISTS insert_unbucketed_table_with_grouped_execution_2", (Object)"DROP TABLE IF EXISTS insert_unbucketed_table_with_grouped_execution_3", (Object)"DROP TABLE IF EXISTS insert_unbucketed_table_with_grouped_execution_success", (Object)"DROP TABLE IF EXISTS insert_unbucketed_table_with_grouped_execution_failure"));
    }

    @Test(invocationCount=1)
    public void testCountOnUnbucketedTable() throws Exception {
        this.testRecoverableGroupedExecution(this.queryRunner, 4, true, (List<String>)ImmutableList.of((Object)"CREATE TABLE test_table AS\nSELECT orderkey, comment\nFROM orders\n"), "CREATE TABLE test_success AS\nSELECT count(*) as a, comment FROM test_table group by comment", "create table test_failure AS\nSELECT count(*) as a, comment FROM test_table group by comment", 14995, (List<String>)ImmutableList.of((Object)"DROP TABLE IF EXISTS test_table", (Object)"DROP TABLE IF EXISTS test_success", (Object)"DROP TABLE IF EXISTS test_failure"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testRecoverableGroupedExecution(DistributedQueryRunner queryRunner, int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled, List<String> preQueries, @Language(value="SQL") String queryWithoutFailure, @Language(value="SQL") String queryWithFailure, int expectedUpdateCount, List<String> postQueries) throws Exception {
        TestHiveRecoverableExecution.waitUntilAllNodesAreHealthy(queryRunner, new Duration(10.0, TimeUnit.SECONDS));
        Session recoverableSession = TestHiveRecoverableExecution.createRecoverableSession(writerConcurrency, optimizedPartitionUpdateSerializationEnabled);
        for (String postQuery : postQueries) {
            queryRunner.execute(recoverableSession, postQuery);
        }
        try {
            for (String preQuery : preQueries) {
                queryRunner.execute(recoverableSession, preQuery);
            }
            Stopwatch noRecoveryStopwatch = Stopwatch.createStarted();
            Assert.assertEquals((Object)queryRunner.execute(recoverableSession, queryWithoutFailure).getUpdateCount(), (Object)OptionalLong.of(expectedUpdateCount));
            log.info("Query with no recovery took %sms", new Object[]{noRecoveryStopwatch.elapsed(TimeUnit.MILLISECONDS)});
            TestHiveRecoverableExecution.cancelAllQueries(queryRunner);
            TestHiveRecoverableExecution.cancelAllTasks(queryRunner);
            Stopwatch recoveryStopwatch = Stopwatch.createStarted();
            ListenableFuture result = this.executor.submit(() -> queryRunner.execute(recoverableSession, queryWithFailure));
            List workers = queryRunner.getServers().stream().filter(server -> !server.isCoordinator()).collect(Collectors.toList());
            Collections.shuffle(workers);
            TestingPrestoServer worker1 = (TestingPrestoServer)workers.get(0);
            worker1.stopResponding();
            TestingPrestoServer worker2 = (TestingPrestoServer)workers.get(1);
            Thread.sleep(1000L);
            worker2.stopResponding();
            Assert.assertEquals((Object)((MaterializedResult)result.get(1000L, TimeUnit.SECONDS)).getUpdateCount(), (Object)OptionalLong.of(expectedUpdateCount));
            log.info("Query with recovery took %sms", new Object[]{recoveryStopwatch.elapsed(TimeUnit.MILLISECONDS)});
        }
        finally {
            queryRunner.getServers().forEach(TestingPrestoServer::startResponding);
            TestHiveRecoverableExecution.cancelAllQueries(queryRunner);
            TestHiveRecoverableExecution.cancelAllTasks(queryRunner);
            for (String postQuery : postQueries) {
                queryRunner.execute(recoverableSession, postQuery);
            }
        }
    }

    private static void waitUntilAllNodesAreHealthy(DistributedQueryRunner queryRunner, Duration timeout) throws TimeoutException, InterruptedException {
        TestingPrestoServer coordinator = queryRunner.getCoordinator();
        long deadline = System.currentTimeMillis() + timeout.toMillis();
        while (System.currentTimeMillis() < deadline) {
            AllNodes allNodes = coordinator.refreshNodes();
            if (allNodes.getActiveNodes().size() == queryRunner.getNodeCount()) {
                return;
            }
            Thread.sleep(1000L);
        }
        throw new TimeoutException(String.format("one of the nodes is still missing after: %s", timeout));
    }

    private static void cancelAllQueries(DistributedQueryRunner queryRunner) {
        queryRunner.getQueries().forEach(query -> queryRunner.getCoordinator().getQueryManager().cancelQuery(query.getQueryId()));
    }

    private static void cancelAllTasks(DistributedQueryRunner queryRunner) {
        queryRunner.getServers().forEach(TestHiveRecoverableExecution::cancelAllTasks);
    }

    private static void cancelAllTasks(TestingPrestoServer server) {
        server.getTaskManager().getAllTaskInfo().forEach(task -> server.getTaskManager().cancelTask(task.getTaskId()));
    }

    private static Session createRecoverableSession(int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled) {
        Identity identity = new Identity("hive", Optional.empty(), (Map)Optional.of(new SelectedRole(SelectedRole.Type.ROLE, Optional.of("admin"))).map(selectedRole -> ImmutableMap.of((Object)"hive", (Object)selectedRole)).orElse(ImmutableMap.of()), (Map)ImmutableMap.of(), (Map)ImmutableMap.of(), Optional.empty(), Optional.empty());
        return TestingSession.testSessionBuilder().setIdentity(identity).setSystemProperty("colocated_join", "true").setSystemProperty("grouped_execution", "true").setSystemProperty("concurrent_lifespans_per_task", "1").setSystemProperty("recoverable_grouped_execution", "true").setSystemProperty("scale_writers", "false").setSystemProperty("redistribute_writes", "false").setSystemProperty("task_writer_count", Integer.toString(writerConcurrency)).setSystemProperty("task_partitioned_writer_count", Integer.toString(writerConcurrency)).setSystemProperty("partitioning_provider_catalog", "hive").setSystemProperty("exchange_materialization_strategy", SelectedRole.Type.ALL.name()).setSystemProperty("hash_partition_count", "11").setSystemProperty("max_stage_retries", "4").setCatalogSessionProperty("hive", "virtual_bucket_count", "16").setCatalogSessionProperty("hive", "optimized_partition_update_serialization_enabled", optimizedPartitionUpdateSerializationEnabled + "").setCatalog("hive").setSchema("tpch_bucketed").build();
    }
}

