/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.multitable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.utilities.multitable.HoodieMultiTableServicesMain;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TestHoodieMultiTableServicesMain
extends HoodieCommonTestHarness
implements SparkProvider {
    private static final Logger LOG = LoggerFactory.getLogger(TestHoodieMultiTableServicesMain.class);
    protected boolean initialized = false;
    private static SparkSession spark;
    private static SQLContext sqlContext;
    private static JavaSparkContext jsc;
    private static HoodieSparkEngineContext context;
    protected transient HoodieTestDataGenerator dataGen = null;

    TestHoodieMultiTableServicesMain() {
    }

    @BeforeEach
    public void init() throws IOException, ExecutionException, InterruptedException {
        boolean initialized;
        boolean bl = initialized = spark != null;
        if (!initialized) {
            SparkConf sparkConf = this.conf();
            HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
            SparkRDDReadClient.addHoodieSupport((SparkConf)sparkConf);
            spark = SparkSession.builder().config(sparkConf).getOrCreate();
            sqlContext = spark.sqlContext();
            jsc = new JavaSparkContext(spark.sparkContext());
            context = new HoodieSparkEngineContext(jsc);
        }
        this.initPath();
        this.prepareData();
    }

    @Test
    public void testRunAllServices() throws IOException, ExecutionException, InterruptedException {
        HoodieMultiTableServicesMain.Config cfg = this.getHoodieMultiServiceConfig();
        cfg.batch = true;
        HoodieTableMetaClient metaClient1 = this.getMetaClient("table1");
        HoodieTableMetaClient metaClient2 = this.getMetaClient("table2");
        HoodieMultiTableServicesMain main = new HoodieMultiTableServicesMain(jsc, cfg);
        main.startServices();
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)metaClient1.reloadActiveTimeline().getCleanerTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)metaClient2.reloadActiveTimeline().getCleanerTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)metaClient1.reloadActiveTimeline().getDeltaCommitTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)metaClient2.reloadActiveTimeline().getDeltaCommitTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)metaClient1.reloadActiveTimeline().getCompletedReplaceTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)metaClient2.reloadActiveTimeline().getCompletedReplaceTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)4, (int)metaClient1.reloadActiveTimeline().getCommitsTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)4, (int)metaClient2.reloadActiveTimeline().getCommitsTimeline().countInstants());
    }

    @Test
    public void testRunAllServicesForSingleTable() throws IOException, ExecutionException, InterruptedException {
        HoodieMultiTableServicesMain.Config cfg = this.getHoodieMultiServiceConfig();
        HoodieTableMetaClient metaClient1 = this.getMetaClient("table1");
        cfg.batch = true;
        cfg.basePath = Collections.singletonList(metaClient1.getBasePath().toString());
        HoodieMultiTableServicesMain main = new HoodieMultiTableServicesMain(jsc, cfg);
        main.startServices();
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)metaClient1.reloadActiveTimeline().getCleanerTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)metaClient1.reloadActiveTimeline().getDeltaCommitTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)metaClient1.reloadActiveTimeline().getCompletedReplaceTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)4, (int)metaClient1.reloadActiveTimeline().getCommitsTimeline().countInstants());
    }

    @Test
    public void testStreamRunAllServices() throws IOException, ExecutionException, InterruptedException {
        HoodieMultiTableServicesMain.Config cfg = this.getHoodieMultiServiceConfig();
        HoodieMultiTableServicesMain main = new HoodieMultiTableServicesMain(jsc, cfg);
        new Thread(() -> {
            try {
                Thread.sleep(10000L);
                LOG.info("Shutdown the table services");
                main.cancel();
            }
            catch (InterruptedException e) {
                LOG.warn("InterruptedException: ", (Throwable)e);
            }
        }).start();
        main.startServices();
        HoodieTableMetaClient metaClient1 = this.getMetaClient("table1");
        HoodieTableMetaClient metaClient2 = this.getMetaClient("table2");
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)metaClient1.reloadActiveTimeline().getCleanerTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)metaClient2.reloadActiveTimeline().getCleanerTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)4, (int)metaClient1.reloadActiveTimeline().getCommitsTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)4, (int)metaClient2.reloadActiveTimeline().getCommitsTimeline().countInstants());
    }

    @Test
    public void testRunMultiTableServicesWithOneWrongPath() throws IOException {
        HoodieMultiTableServicesMain.Config cfg = this.getHoodieMultiServiceConfig();
        cfg.autoDiscovery = false;
        cfg.batch = true;
        HoodieTableMetaClient metaClient1 = this.getMetaClient("table1");
        cfg.configs.add(String.format("%s=%s", "hoodie.tableservice.skipNonHudiTable", "true"));
        cfg.configs.add(String.format("%s=%s", "hoodie.tableservice.tablesToServe", metaClient1.getBasePath() + ",file:///fakepath"));
        HoodieMultiTableServicesMain main = new HoodieMultiTableServicesMain(jsc, cfg);
        try {
            main.startServices();
        }
        catch (Exception e) {
            org.junit.jupiter.api.Assertions.assertFalse((boolean)(e instanceof TableNotFoundException));
        }
        cfg.batch = false;
        new Thread(() -> {
            try {
                Thread.sleep(10000L);
                LOG.info("Shutdown the table services");
                main.cancel();
            }
            catch (InterruptedException e) {
                LOG.warn("InterruptedException: ", (Throwable)e);
            }
        }).start();
        try {
            main.startServices();
        }
        catch (Exception e) {
            org.junit.jupiter.api.Assertions.assertFalse((boolean)(e instanceof TableNotFoundException));
        }
        cfg.batch = true;
        cfg.configs.add(String.format("%s=%s", "hoodie.tableservice.skipNonHudiTable", "false"));
        try {
            main.startServices();
        }
        catch (Exception e) {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)(e instanceof TableNotFoundException));
        }
    }

    private void prepareData() throws IOException {
        this.initTestDataGenerator();
        HoodieTableMetaClient metaClient1 = this.getMetaClient("table1");
        HoodieTableMetaClient metaClient2 = this.getMetaClient("table2");
        String instant1 = InProcessTimeGenerator.createNewInstantTime((long)0L);
        this.writeToTable(metaClient1.getBasePath(), instant1, false);
        this.writeToTable(metaClient2.getBasePath(), instant1, false);
        String instant2 = InProcessTimeGenerator.createNewInstantTime((long)1L);
        this.writeToTable(metaClient1.getBasePath(), instant2, true);
        this.writeToTable(metaClient2.getBasePath(), instant2, true);
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)metaClient1.reloadActiveTimeline().getCleanerTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)metaClient2.reloadActiveTimeline().getCleanerTimeline().countInstants());
    }

    private void writeToTable(StoragePath basePath, String instant, boolean update) throws IOException {
        String tableName = "test";
        HoodieWriteConfig.Builder writeConfigBuilder = this.getWriteConfigBuilder(basePath, tableName);
        HoodieWriteConfig writeConfig = writeConfigBuilder.build();
        SparkRDDWriteClient writeClient = new SparkRDDWriteClient((HoodieEngineContext)context, writeConfig);
        writeClient.startCommitWithTime(instant);
        List records = update ? this.dataGen.generateUpdates(instant, Integer.valueOf(100)) : this.dataGen.generateInserts(instant, Integer.valueOf(100));
        JavaRDD result = writeClient.upsert(jsc.parallelize(records, 8), instant);
        List statuses = result.collect();
        Assertions.assertNoWriteErrors((List)statuses);
    }

    private HoodieWriteConfig.Builder getWriteConfigBuilder(StoragePath basePath, String tableName) {
        Properties properties = new Properties();
        properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(4, 4).withBulkInsertParallelism(4).withFinalizeWriteParallelism(2).withProps((Map)this.makeIndexConfig(HoodieIndex.IndexType.BUCKET)).withTableServicesEnabled(false).withLayoutConfig(HoodieLayoutConfig.newBuilder().withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner("org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).forTable(tableName);
    }

    protected HoodieTableMetaClient getMetaClient(String tableName) throws IOException {
        String rootPathStr = "file://" + this.tempDir.toAbsolutePath() + "/" + tableName;
        Path rootPath = new Path(rootPathStr);
        rootPath.getFileSystem(jsc.hadoopConfiguration()).mkdirs(rootPath);
        Properties props = new Properties();
        props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
        props.setProperty(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "_row_key");
        return HoodieTestUtils.init((StorageConfiguration)HadoopFSUtils.getStorageConf((Configuration)jsc.hadoopConfiguration()), (String)rootPathStr, (HoodieTableType)this.getTableType(), (Properties)props);
    }

    private Properties makeIndexConfig(HoodieIndex.IndexType indexType) {
        Properties props = new Properties();
        HoodieIndexConfig.Builder indexConfig = HoodieIndexConfig.newBuilder().withIndexType(indexType);
        if (indexType.equals((Object)HoodieIndex.IndexType.BUCKET)) {
            props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
            indexConfig.fromProperties(props).withIndexKeyField("_row_key").withBucketNum("1").withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE);
            props.putAll((Map<?, ?>)indexConfig.build().getProps());
            props.putAll((Map<?, ?>)HoodieLayoutConfig.newBuilder().fromProperties(props).withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build().getProps());
        }
        return props;
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.MERGE_ON_READ;
    }

    private HoodieMultiTableServicesMain.Config getHoodieMultiServiceConfig() {
        HoodieMultiTableServicesMain.Config cfg = new HoodieMultiTableServicesMain.Config();
        cfg.autoDiscovery = true;
        cfg.enableCompaction = true;
        cfg.enableClustering = true;
        cfg.enableClean = true;
        cfg.enableArchive = true;
        ArrayList<String> configs = new ArrayList<String>();
        configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS));
        configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), "false"));
        configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key(), "1"));
        configs.add(String.format("%s=%s", HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "0"));
        cfg.configs = configs;
        cfg.compactionRunningMode = "scheduleandexecute";
        cfg.compactionStrategyClassName = LogFileSizeBasedCompactionStrategy.class.getName();
        cfg.clusteringRunningMode = "scheduleandexecute";
        cfg.basePath = Collections.singletonList(this.tempDir.toAbsolutePath().toString());
        cfg.scheduleDelay = 50000;
        return cfg;
    }

    protected void initTestDataGenerator() {
        this.dataGen = new HoodieTestDataGenerator();
    }

    public HoodieEngineContext context() {
        return context;
    }

    public SparkSession spark() {
        return spark;
    }

    public SQLContext sqlContext() {
        return sqlContext;
    }

    public JavaSparkContext jsc() {
        return jsc;
    }

    @AfterAll
    public static synchronized void cleanUpAfterAll() {
        if (spark != null) {
            spark.close();
            spark = null;
        }
    }
}

