/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.functional;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@Tag(value="functional")
public class TestConsistentBucketIndex
extends HoodieSparkClientTestHarness {
    private final Random random = new Random(1L);
    private HoodieIndex index;
    private HoodieWriteConfig config;

    private static Stream<Arguments> configParams() {
        Object[][] data = new Object[][]{{true, false}, {false, false}, {true, true}, {false, true}};
        return Stream.of(data).map(Arguments::of);
    }

    private void setUp(boolean populateMetaFields, boolean partitioned) throws Exception {
        this.initPath();
        this.initSparkContexts();
        if (partitioned) {
            this.initTestDataGenerator();
        } else {
            this.initTestDataGenerator(new String[]{""});
        }
        this.initHoodieStorage();
        Properties props = TestConsistentBucketIndex.getPropertiesForKeyGen((boolean)populateMetaFields);
        props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
        this.metaClient = HoodieTestUtils.init((StorageConfiguration)this.storageConf, (String)this.basePath, (HoodieTableType)HoodieTableType.MERGE_ON_READ, (Properties)props);
        this.config = this.getConfigBuilder().withProperties(props).withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withIndexKeyField("_row_key").withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).withBucketNum("8").build()).withAutoCommit(false).build();
        this.writeClient = this.getHoodieWriteClient(this.config);
        this.index = this.writeClient.getIndex();
    }

    @AfterEach
    public void tearDown() throws IOException {
        this.cleanupResources();
    }

    @ParameterizedTest
    @MethodSource(value={"configParams"})
    public void testTagLocation(boolean populateMetaFields, boolean partitioned) throws Exception {
        this.setUp(populateMetaFields, partitioned);
        String newCommitTime = "001";
        int totalRecords = 20 + this.random.nextInt(20);
        List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(totalRecords));
        JavaRDD writeRecords = this.jsc.parallelize(records, 2);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)this.config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        List taggedRecord = this.tagLocation(this.index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable).collect();
        org.junit.jupiter.api.Assertions.assertTrue((boolean)taggedRecord.stream().allMatch(r -> r.isCurrentLocationKnown()));
        List taggedRecord2 = this.tagLocation(this.index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable).collect();
        block0: for (HoodieRecord ref : taggedRecord) {
            for (HoodieRecord record : taggedRecord2) {
                if (!ref.getRecordKey().equals(record.getRecordKey())) continue;
                org.junit.jupiter.api.Assertions.assertEquals((Object)ref.getCurrentLocation(), (Object)record.getCurrentLocation());
                continue block0;
            }
        }
    }

    @ParameterizedTest
    @MethodSource(value={"configParams"})
    public void testWriteData(boolean populateMetaFields, boolean partitioned) throws Exception {
        this.setUp(populateMetaFields, partitioned);
        String newCommitTime = "001";
        int totalRecords = 20 + this.random.nextInt(20);
        List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(totalRecords));
        JavaRDD writeRecords = this.jsc.parallelize(records, 2);
        List<WriteStatus> writeStatues = this.writeData((JavaRDD<HoodieRecord>)writeRecords, newCommitTime, WriteOperationType.UPSERT, true);
        org.junit.jupiter.api.Assertions.assertEquals((long)writeStatues.stream().map(WriteStatus::getFileId).distinct().count(), (long)Arrays.stream(this.dataGen.getPartitionPaths()).mapToInt(p -> Objects.requireNonNull(this.listStatus((String)p, true)).length).sum());
        org.junit.jupiter.api.Assertions.assertEquals((int)totalRecords, (int)this.readRecordsNum(this.dataGen.getPartitionPaths(), populateMetaFields));
        this.writeData((JavaRDD<HoodieRecord>)writeRecords, "002", WriteOperationType.UPSERT, true);
        long numberOfLogFiles = Arrays.stream(this.dataGen.getPartitionPaths()).mapToInt(p -> Arrays.stream(this.listStatus((String)p, true)).mapToInt(fs -> fs instanceof RealtimeFileStatus ? ((RealtimeFileStatus)fs).getDeltaLogFiles().size() : 1).sum()).sum();
        org.junit.jupiter.api.Assertions.assertEquals((long)(writeStatues.stream().map(WriteStatus::getFileId).distinct().count() * 2L), (long)numberOfLogFiles);
        org.junit.jupiter.api.Assertions.assertEquals((int)totalRecords, (int)this.readRecordsNum(this.dataGen.getPartitionPaths(), populateMetaFields));
        this.writeData("003", totalRecords, true);
        org.junit.jupiter.api.Assertions.assertEquals((int)(totalRecords * 2), (int)this.readRecordsNum(this.dataGen.getPartitionPaths(), populateMetaFields));
    }

    @ParameterizedTest
    @MethodSource(value={"configParams"})
    public void testWriteDataWithCompaction(boolean populateMetaFields, boolean partitioned) throws Exception {
        this.setUp(populateMetaFields, partitioned);
        this.writeData(HoodieActiveTimeline.createNewInstantTime(), 200, true);
        this.config.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS, "1");
        this.config.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY, CompactionTriggerStrategy.NUM_COMMITS.name());
        String compactionTime = (String)this.writeClient.scheduleCompaction(Option.empty()).get();
        org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)this.readRecordsNum(this.dataGen.getPartitionPaths(), populateMetaFields));
        this.writeData(HoodieActiveTimeline.createNewInstantTime(), 200, true);
        org.junit.jupiter.api.Assertions.assertEquals((int)400, (int)this.readRecordsNum(this.dataGen.getPartitionPaths(), populateMetaFields));
        HoodieWriteMetadata compactionMetadata = this.writeClient.compact(compactionTime);
        this.writeClient.commitCompaction(compactionTime, (HoodieCommitMetadata)compactionMetadata.getCommitMetadata().get(), Option.empty());
        org.junit.jupiter.api.Assertions.assertEquals((int)400, (int)this.readRecordsNum(this.dataGen.getPartitionPaths(), populateMetaFields));
    }

    @ParameterizedTest
    @MethodSource(value={"configParams"})
    public void testBulkInsertData(boolean populateMetaFields, boolean partitioned) throws Exception {
        this.setUp(populateMetaFields, partitioned);
        String newCommitTime = "001";
        int totalRecords = 20 + this.random.nextInt(20);
        List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(totalRecords));
        JavaRDD writeRecords = this.jsc.parallelize(records, 2);
        List<WriteStatus> writeStatues = this.writeData((JavaRDD<HoodieRecord>)writeRecords, newCommitTime, WriteOperationType.BULK_INSERT, true);
        long numFilesCreated = writeStatues.stream().map(WriteStatus::getFileId).distinct().count();
        org.junit.jupiter.api.Assertions.assertEquals((long)numFilesCreated, (long)Arrays.stream(this.dataGen.getPartitionPaths()).mapToInt(p -> Objects.requireNonNull(this.listStatus((String)p, true)).length).sum());
        this.writeData((JavaRDD<HoodieRecord>)writeRecords, "002", WriteOperationType.UPSERT, true);
        org.junit.jupiter.api.Assertions.assertEquals((long)numFilesCreated, (long)Arrays.stream(this.dataGen.getPartitionPaths()).mapToInt(p -> Objects.requireNonNull(this.listStatus((String)p, true)).length).sum());
        long numberOfLogFiles = Arrays.stream(this.dataGen.getPartitionPaths()).mapToInt(p -> Arrays.stream(this.listStatus((String)p, true)).mapToInt(fs -> fs instanceof RealtimeFileStatus ? ((RealtimeFileStatus)fs).getDeltaLogFiles().size() : 1).sum()).sum();
        org.junit.jupiter.api.Assertions.assertEquals((long)numFilesCreated, (long)numberOfLogFiles);
        this.storageConf.set("hoodie.realtime.merge.skip", "true");
        org.junit.jupiter.api.Assertions.assertEquals((int)(totalRecords * 2), (int)this.readRecordsNum(this.dataGen.getPartitionPaths(), populateMetaFields));
    }

    private int readRecordsNum(String[] partitions, boolean populateMetaFields) {
        return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)this.storageConf, Arrays.stream(partitions).map(p -> Paths.get(this.basePath, p).toString()).collect(Collectors.toList()), (String)this.basePath, (JobConf)new JobConf((Configuration)this.storageConf.unwrap()), (boolean)true, (boolean)populateMetaFields).size();
    }

    private List<WriteStatus> writeData(String commitTime, int totalRecords, boolean doCommit) {
        List records = this.dataGen.generateInserts(commitTime, Integer.valueOf(totalRecords));
        JavaRDD writeRecords = this.jsc.parallelize(records, 2);
        return this.writeData((JavaRDD<HoodieRecord>)writeRecords, commitTime, WriteOperationType.UPSERT, doCommit);
    }

    private List<WriteStatus> writeData(JavaRDD<HoodieRecord> records, String commitTime, WriteOperationType op, boolean doCommit) {
        List writeStatues;
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        this.writeClient.startCommitWithTime(commitTime);
        switch (op) {
            case UPSERT: {
                writeStatues = this.writeClient.upsert(records, commitTime).collect();
                break;
            }
            case BULK_INSERT: {
                writeStatues = this.writeClient.bulkInsert(records, commitTime).collect();
                break;
            }
            default: {
                throw new HoodieException("Unsupported write operations: " + op);
            }
        }
        Assertions.assertNoWriteErrors((List)writeStatues);
        if (doCommit) {
            boolean success = this.writeClient.commitStats(commitTime, this.context.parallelize(writeStatues, 1), writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
            org.junit.jupiter.api.Assertions.assertTrue((boolean)success);
        }
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        return writeStatues;
    }

    private FileStatus[] listStatus(String p, boolean realtime) {
        JobConf jobConf = new JobConf((Configuration)this.storageConf.unwrap());
        FileInputFormat.setInputPaths((JobConf)jobConf, (String)Paths.get(this.basePath, p).toString());
        FileInputFormat format = HoodieInputFormatUtils.getInputFormat((HoodieFileFormat)HoodieFileFormat.PARQUET, (boolean)realtime, (Configuration)jobConf);
        try {
            if (realtime) {
                return ((HoodieParquetRealtimeInputFormat)format).listStatus(jobConf);
            }
            return ((HoodieParquetInputFormat)format).listStatus(jobConf);
        }
        catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    public HoodieWriteConfig.Builder getConfigBuilder() {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withWriteStatusClass(MetadataMergeWriteStatus.class).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0x100000L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(0x100000L).parquetMaxFileSize(0x100000L).build()).forTable("test-trip-table").withEmbeddedTimelineServerEnabled(true);
    }
}

