/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.functional;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@Tag(value="functional")
public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFunctionalTestHarness {
    private HoodieTestDataGenerator dataGen;
    private SparkRDDWriteClient client;
    private HoodieTableMetaClient metaClient;

    private static Stream<Arguments> writeLogTest() {
        Object[][] data = new Object[][]{{true, true}, {true, false}, {false, true}, {false, false}};
        return Stream.of(data).map(Arguments::of);
    }

    private static Stream<Arguments> writePayloadTest() {
        return Stream.of(DefaultHoodieRecordPayload.class.getName(), PartialUpdateAvroPayload.class.getName()).map(xva$0 -> Arguments.of((Object[])new Object[]{xva$0}));
    }

    @BeforeEach
    public void setup() {
        this.dataGen = new HoodieTestDataGenerator();
    }

    @AfterEach
    public void teardown() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    @ParameterizedTest
    @MethodSource(value={"writePayloadTest"})
    public void testWriteDuringCompaction(String payloadClass) throws IOException {
        Properties props = this.getPropertiesForKeyGen(true);
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().forTable("test-trip-table").withPath(this.basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withAutoCommit(false).withWritePayLoad(payloadClass).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(1024L).build()).withLayoutConfig(HoodieLayoutConfig.newBuilder().withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()).withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
        props.putAll((Map<?, ?>)config.getProps());
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
        this.client = this.getHoodieWriteClient(config);
        this.writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true);
        this.writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true);
        org.junit.jupiter.api.Assertions.assertEquals((long)200L, (long)this.readTableTotalRecordsNum());
        String compactionTime = (String)this.client.scheduleCompaction(Option.empty()).get();
        String insertTime = HoodieActiveTimeline.createNewInstantTime();
        List<WriteStatus> writeStatuses = this.writeData(insertTime, 100, false);
        org.junit.jupiter.api.Assertions.assertEquals((long)200L, (long)this.readTableTotalRecordsNum());
        this.client.commitStats(insertTime, this.context().parallelize(writeStatuses, 1), writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        org.junit.jupiter.api.Assertions.assertEquals((long)300L, (long)this.readTableTotalRecordsNum());
        config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, "true");
        this.client.compact(compactionTime);
        org.junit.jupiter.api.Assertions.assertEquals((long)300L, (long)this.readTableTotalRecordsNum());
    }

    @ParameterizedTest
    @MethodSource(value={"writeLogTest"})
    public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException {
        Properties props = this.getPropertiesForKeyGen(true);
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().forTable("test-trip-table").withPath(this.basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withAutoCommit(true).withEmbeddedTimelineServerEnabled(enableTimelineServer).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withLayoutConfig(HoodieLayoutConfig.newBuilder().withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()).withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
        props.putAll((Map<?, ?>)config.getProps());
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
        this.client = this.getHoodieWriteClient(config);
        List records = this.dataGen.generateInserts("001", Integer.valueOf(100));
        JavaRDD writeRecords = this.jsc().parallelize(records, 2);
        this.client.upsert(writeRecords, this.client.startCommit());
        this.client.upsert(writeRecords, this.client.startCommit());
        this.client.scheduleCompaction(Option.empty());
        List toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50L).collect(Collectors.toList());
        JavaRDD deleteRecords = this.jsc().parallelize(toBeDeleted, 2);
        this.client.delete(deleteRecords, this.client.startCommit());
        this.client.upsert(writeRecords, this.client.startCommit());
        org.junit.jupiter.api.Assertions.assertEquals((long)100L, (long)this.readTableTotalRecordsNum());
    }

    private long readTableTotalRecordsNum() {
        return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(this.storageConf(), Arrays.stream(this.dataGen.getPartitionPaths()).map(p -> Paths.get(this.basePath(), p).toString()).collect(Collectors.toList()), (String)this.basePath()).size();
    }

    private List<WriteStatus> writeData(String instant, int numRecords, boolean doCommit) {
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        JavaRDD records = this.jsc().parallelize(this.dataGen.generateInserts(instant, Integer.valueOf(numRecords)), 2);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        this.client.startCommitWithTime(instant);
        List writeStatuses = this.client.upsert(records, instant).collect();
        Assertions.assertNoWriteErrors((List)writeStatuses);
        if (doCommit) {
            List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
            boolean committed = this.client.commitStats(instant, this.context().parallelize(writeStatuses, 1), writeStats, Option.empty(), this.metaClient.getCommitActionType());
            org.junit.jupiter.api.Assertions.assertTrue((boolean)committed);
        }
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        return writeStatuses;
    }
}

