/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.offlinejob;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.offlinejob.HoodieOfflineJobTestBase;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestHoodieClusteringJob
extends HoodieOfflineJobTestBase {
    @Test
    public void testHoodieClusteringJobWithClean() throws Exception {
        String tableBasePath = basePath + "/asyncClustering";
        Properties props = this.getPropertiesForKeyGen(true);
        HoodieWriteConfig config = this.getWriteConfig(tableBasePath);
        props.putAll((Map<?, ?>)config.getProps());
        Properties metaClientProps = HoodieTableMetaClient.withPropertyBuilder().setTableType(HoodieTableType.COPY_ON_WRITE).setPayloadClass(HoodieAvroPayload.class).fromProperties(props).build();
        this.metaClient = HoodieTableMetaClient.initTableAndGetMetaClient((StorageConfiguration)HadoopFSUtils.getStorageConfWithCopy((Configuration)jsc.hadoopConfiguration()), (String)tableBasePath, (Properties)metaClientProps);
        this.client = new SparkRDDWriteClient((HoodieEngineContext)context, config);
        this.writeData(false, HoodieActiveTimeline.createNewInstantTime(), 100, true);
        this.writeData(false, HoodieActiveTimeline.createNewInstantTime(), 100, true);
        HoodieClusteringJob hoodieCluster = this.init(tableBasePath, true, "scheduleAndExecute", false);
        hoodieCluster.cluster(0);
        HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, tableBasePath);
        HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath);
        this.writeData(false, HoodieActiveTimeline.createNewInstantTime(), 100, true);
        this.writeData(false, HoodieActiveTimeline.createNewInstantTime(), 100, true);
        hoodieCluster = this.init(tableBasePath, true, "scheduleAndExecute", true);
        hoodieCluster.cluster(0);
        HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(2, tableBasePath);
        HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(1, tableBasePath);
    }

    @Test
    public void testPurgePendingInstants() throws Exception {
        String tableBasePath = basePath + "/purgePendingClustering";
        Properties props = this.getPropertiesForKeyGen(true);
        HoodieWriteConfig config = this.getWriteConfig(tableBasePath);
        props.putAll((Map<?, ?>)config.getProps());
        Properties metaClientProps = HoodieTableMetaClient.withPropertyBuilder().setTableType(HoodieTableType.COPY_ON_WRITE).setPayloadClass(HoodieAvroPayload.class).fromProperties(props).build();
        this.metaClient = HoodieTableMetaClient.initTableAndGetMetaClient((StorageConfiguration)HadoopFSUtils.getStorageConfWithCopy((Configuration)jsc.hadoopConfiguration()), (String)tableBasePath, (Properties)metaClientProps);
        this.client = new SparkRDDWriteClient((HoodieEngineContext)context, config);
        this.writeData(false, HoodieActiveTimeline.createNewInstantTime(), 100, true);
        this.writeData(false, HoodieActiveTimeline.createNewInstantTime(), 100, true);
        HoodieClusteringJob hoodieCluster = this.init(tableBasePath, true, "scheduleAndExecute", false);
        hoodieCluster.cluster(0);
        HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, tableBasePath);
        HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath);
        HoodieInstant latestClusteringInstant = (HoodieInstant)this.metaClient.getActiveTimeline().filterCompletedInstantsOrRewriteTimeline().getCompletedReplaceTimeline().getInstants().get(0);
        String completedFilePath = tableBasePath + "/" + ".hoodie" + "/" + latestClusteringInstant.getFileName();
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, completedFilePath);
        hoodieCluster = this.getClusteringConfigForPurge(tableBasePath, true, "purge_pending_instant", false, latestClusteringInstant.getTimestamp());
        hoodieCluster.cluster(0);
        HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(0, tableBasePath);
        String[] fullPartitionPaths = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < fullPartitionPaths.length; ++i) {
            fullPartitionPaths[i] = String.format("%s/%s/*", tableBasePath, this.dataGen.getPartitionPaths()[i]);
        }
        Assertions.assertEquals((long)0L, (long)HoodieClientTestUtils.read((JavaSparkContext)jsc, (String)tableBasePath, (SQLContext)sqlContext, (HoodieStorage)storage, (String[])fullPartitionPaths).filter("_hoodie_commit_time = " + latestClusteringInstant.getTimestamp()).count(), (String)"Must not contain any records w/ clustering instant time");
    }

    private void deleteCommitMetaFile(String instantTime, String suffix) throws IOException {
        String targetPath = basePath + "/" + ".hoodie" + "/" + instantTime + suffix;
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetPath);
    }

    private HoodieClusteringJob init(String tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean isAutoClean) {
        HoodieClusteringJob.Config clusterConfig = this.buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute, isAutoClean);
        clusterConfig.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
        return new HoodieClusteringJob(jsc, clusterConfig);
    }

    private HoodieClusteringJob getClusteringConfigForPurge(String tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean isAutoClean, String pendingInstant) {
        HoodieClusteringJob.Config clusterConfig = this.buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute, isAutoClean);
        clusterConfig.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
        clusterConfig.clusteringInstantTime = pendingInstant;
        return new HoodieClusteringJob(jsc, clusterConfig);
    }

    private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, boolean runSchedule, String runningMode, boolean isAutoClean) {
        HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
        config.basePath = basePath;
        config.runSchedule = runSchedule;
        config.runningMode = runningMode;
        config.configs.add("hoodie.metadata.enable=false");
        config.configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), isAutoClean));
        config.configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), 1));
        config.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), 1));
        return config;
    }

    private HoodieWriteConfig getWriteConfig(String tableBasePath) {
        return HoodieWriteConfig.newBuilder().forTable("asyncClustering").withPath(tableBasePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).withAutoCommit(false).withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(Boolean.valueOf(false)).withScheduleInlineClustering(Boolean.valueOf(false)).withAsyncClustering(Boolean.valueOf(false)).build()).withStorageConfig(HoodieStorageConfig.newBuilder().logFileMaxSize(1024L).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAutoClean(Boolean.valueOf(false)).withAsyncClean(Boolean.valueOf(false)).build()).build();
    }
}

