/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

public class TestSparkSortAndSizeClustering
extends HoodieSparkClientTestHarness {
    private HoodieWriteConfig config;
    private HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0L);

    public void setup(int maxFileSize) throws IOException {
        this.setup(maxFileSize, Collections.emptyMap());
    }

    public void setup(int maxFileSize, Map<String, String> options) throws IOException {
        this.initPath();
        this.initSparkContexts();
        this.initTestDataGenerator();
        this.initHoodieStorage();
        Properties props = TestSparkSortAndSizeClustering.getPropertiesForKeyGen((boolean)true);
        props.putAll(options);
        props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
        this.metaClient = HoodieTestUtils.init((StorageConfiguration)this.storageConf, (String)this.basePath, (HoodieTableType)HoodieTableType.COPY_ON_WRITE, (Properties)props);
        this.config = this.getConfigBuilder().withProps((Map)props).withAutoCommit(false).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize((long)maxFileSize).build()).withClusteringConfig(HoodieClusteringConfig.newBuilder().withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.RECENT_DAYS).build()).build();
        this.writeClient = this.getHoodieWriteClient(this.config);
    }

    @AfterEach
    public void tearDown() throws IOException {
        this.cleanupResources();
    }

    @Test
    public void testClusteringWithRDD() throws IOException {
        this.writeAndClustering(false);
    }

    @Test
    public void testClusteringWithRow() throws IOException {
        this.writeAndClustering(true);
    }

    public void writeAndClustering(boolean isRow) throws IOException {
        this.setup(102400);
        this.config.setValue("hoodie.datasource.write.row.writer.enable", String.valueOf(isRow));
        this.config.setValue("hoodie.metadata.enable", "false");
        this.config.setValue("hoodie.clustering.plan.strategy.daybased.lookback.partitions", "1");
        this.config.setValue("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(0x100000));
        this.config.setValue("hoodie.clustering.plan.strategy.max.bytes.per.group", String.valueOf(0x200000));
        int numRecords = 1000;
        this.writeData(this.writeClient.createNewInstantTime(), numRecords, true);
        String clusteringTime = (String)this.writeClient.scheduleClustering(Option.empty()).get();
        HoodieClusteringPlan plan = (HoodieClusteringPlan)ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)this.metaClient, (HoodieInstant)HoodieTestUtils.INSTANT_GENERATOR.getClusteringCommitRequestedInstant(clusteringTime)).map(Pair::getRight).get();
        List inputGroups = plan.getInputGroups();
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)inputGroups.size(), (String)"Clustering plan will contain 1 input group");
        Integer outputFileGroups = ((HoodieClusteringGroup)plan.getInputGroups().get(0)).getNumOutputFileGroups();
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (Integer)outputFileGroups, (String)"Clustering plan will generate 2 output groups");
        HoodieWriteMetadata writeMetadata = this.writeClient.cluster(clusteringTime, true);
        List writeStats = (List)writeMetadata.getWriteStats().get();
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)writeStats.size(), (String)"Clustering should write 2 files");
        List<Row> rows = this.readRecords();
        org.junit.jupiter.api.Assertions.assertEquals((int)numRecords, (int)rows.size());
        this.validateDecimalTypeAfterClustering(writeStats);
    }

    private void validateDecimalTypeAfterClustering(List<HoodieWriteStat> writeStats) {
        writeStats.stream().map(writeStat -> new StoragePath(this.metaClient.getBasePath(), writeStat.getPath())).forEach(writtenPath -> {
            MessageType schema = ParquetUtils.readMetadata((HoodieStorage)this.storage, (StoragePath)writtenPath).getFileMetaData().getSchema();
            int index = schema.getFieldIndex("height");
            Type decimalType = (Type)schema.getFields().get(index);
            org.junit.jupiter.api.Assertions.assertEquals((Object)"DECIMAL", (Object)decimalType.getOriginalType().toString());
            org.junit.jupiter.api.Assertions.assertEquals((Object)"FIXED_LEN_BYTE_ARRAY", (Object)decimalType.asPrimitiveType().getPrimitiveTypeName().toString());
        });
    }

    private List<WriteStatus> writeData(String commitTime, int totalRecords, boolean doCommit) {
        List records = this.dataGen.generateInserts(commitTime, Integer.valueOf(totalRecords));
        JavaRDD writeRecords = this.jsc.parallelize(records);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        this.writeClient.startCommitWithTime(commitTime);
        List writeStatues = this.writeClient.insert(writeRecords, commitTime).collect();
        Assertions.assertNoWriteErrors((List)writeStatues);
        if (doCommit) {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.writeClient.commitStats(commitTime, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType()));
        }
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        return writeStatues;
    }

    private List<Row> readRecords() {
        Dataset roViewDF = this.sparkSession.read().format("hudi").load(this.basePath);
        roViewDF.createOrReplaceTempView("clutering_table");
        return this.sparkSession.sqlContext().sql("select * from clutering_table").collectAsList();
    }

    public HoodieWriteConfig.Builder getConfigBuilder() {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withWriteStatusClass(MetadataMergeWriteStatus.class).forTable("clustering-table").withEmbeddedTimelineServerEnabled(true);
    }
}

