/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi;

import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag(value="functional")
public class TestDataSourceReadWithDeletes
extends SparkClientFunctionalTestHarness {
    String jsonSchema = "{\n  \"type\": \"record\",\n  \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n  \"fields\": [\n    {\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_commit_seqno\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_record_key\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_partition_path\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_file_name\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_operation\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"name\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"age\", \"type\": [\"null\", \"int\"]},\n    {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n    {\"name\": \"part\", \"type\": [\"null\", \"string\"]}\n  ]\n}";
    private Schema schema;
    private HoodieTableMetaClient metaClient;

    @BeforeEach
    public void setUp() {
        this.schema = new Schema.Parser().parse(this.jsonSchema);
    }

    @Test
    public void test() throws Exception {
        HoodieWriteConfig config = this.createHoodieWriteConfig();
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties)config.getProps());
        String[] dataset1 = new String[]{"I,id1,Danny,23,1,par1", "I,id2,Tony,20,1,par1"};
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        String insertTime1 = client.createNewInstantTime();
        List<WriteStatus> writeStatuses1 = this.writeData(client, insertTime1, dataset1);
        client.commit(insertTime1, (Object)this.jsc().parallelize(writeStatuses1));
        String[] dataset2 = new String[]{"I,id1,Danny,30,2,par1", "D,id2,Tony,20,2,par1", "I,id3,Julian,40,2,par1", "D,id4,Stephan,35,2,par1"};
        String insertTime2 = client.createNewInstantTime();
        List<WriteStatus> writeStatuses2 = this.writeData(client, insertTime2, dataset2);
        client.commit(insertTime2, (Object)this.jsc().parallelize(writeStatuses2));
        List rows = this.spark().read().format("org.apache.hudi").option("hoodie.datasource.query.type", "snapshot").load(config.getBasePath()).select("id", new String[]{"name", "age", "ts", "part"}).collectAsList();
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)rows.size());
        Object[] expected = new String[]{"[id1,Danny,30,2,par1]", "[id3,Julian,40,2,par1]"};
        org.junit.jupiter.api.Assertions.assertArrayEquals((Object[])expected, (Object[])rows.stream().map(Row::toString).sorted().toArray(String[]::new));
    }

    private HoodieWriteConfig createHoodieWriteConfig() {
        Properties props = this.getPropertiesForKeyGen(true);
        props.put(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        String basePath = this.basePath();
        return HoodieWriteConfig.newBuilder().forTable("test").withPath(basePath).withSchema(this.jsonSchema).withParallelism(2, 2).withAutoCommit(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(1024L).build()).withLayoutConfig(HoodieLayoutConfig.newBuilder().withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()).withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).withPopulateMetaFields(true).withAllowOperationMetadataField(true).withMarkersType(MarkerType.DIRECT.name()).build();
    }

    private List<WriteStatus> writeData(SparkRDDWriteClient client, String instant, String[] records) {
        List<HoodieRecord> recordList = this.str2HoodieRecord(records);
        JavaRDD writeRecords = this.jsc().parallelize(recordList, 2);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        client.startCommitWithTime(instant);
        List writeStatuses = client.upsert(writeRecords, instant).collect();
        Assertions.assertNoWriteErrors((List)writeStatuses);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        return writeStatuses;
    }

    private List<HoodieRecord> str2HoodieRecord(String[] records) {
        return Stream.of(records).map(rawRecordStr -> {
            String[] parts = rawRecordStr.split(",");
            boolean isDelete = parts[0].equalsIgnoreCase("D");
            GenericData.Record record = new GenericData.Record(this.schema);
            record.put("id", (Object)parts[1]);
            record.put("name", (Object)parts[2]);
            record.put("age", (Object)Integer.parseInt(parts[3]));
            record.put("ts", (Object)Long.parseLong(parts[4]));
            record.put("part", (Object)parts[5]);
            OverwriteWithLatestAvroPayload payload = new OverwriteWithLatestAvroPayload((GenericRecord)record, (Comparable)((Long)record.get("ts")));
            return new HoodieAvroRecord(new HoodieKey((String)record.get("id"), (String)record.get("part")), (HoodieRecordPayload)payload, isDelete ? HoodieOperation.DELETE : HoodieOperation.INSERT);
        }).collect(Collectors.toList());
    }
}

