/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestMultiFS
extends HoodieSparkClientTestHarness {
    private static final Logger LOG = LoggerFactory.getLogger(TestMultiFS.class);
    private static final String TABLE_TYPE = HoodieTableType.COPY_ON_WRITE.name();
    private static final String TABLE_NAME = "hoodie_rt";
    private static HdfsTestService hdfsTestService;
    private static FileSystem dfs;
    private String tablePath;
    private String dfsBasePath;

    @BeforeAll
    public static void setUpAll() throws IOException {
        hdfsTestService = new HdfsTestService();
        MiniDFSCluster dfsCluster = hdfsTestService.start(true);
        dfs = dfsCluster.getFileSystem();
    }

    @AfterAll
    public static void cleanUpAll() {
        hdfsTestService.stop();
    }

    @BeforeEach
    public void setUp() throws Exception {
        this.initPath();
        this.initSparkContexts();
        this.initTestDataGenerator();
        this.tablePath = this.baseUri + "/sample-table";
        this.dfsBasePath = dfs.getWorkingDirectory().toString();
        dfs.mkdirs(new Path(this.dfsBasePath));
        this.storageConf = HadoopFSUtils.getStorageConf((Configuration)dfs.getConf());
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.cleanupResources();
    }

    protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
        return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).forTable(TABLE_NAME).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
    }

    @Test
    public void readLocalWriteHDFS() throws Exception {
        HoodieTableMetaClient.withPropertyBuilder().setTableType(TABLE_TYPE).setTableName(TABLE_NAME).setPayloadClass(HoodieAvroPayload.class).initTable(this.storageConf.newInstance(), this.dfsBasePath);
        HoodieWriteConfig cfg = this.getHoodieWriteConfig(this.dfsBasePath);
        HoodieWriteConfig localConfig = this.getHoodieWriteConfig(this.tablePath);
        HoodieTableMetaClient.withPropertyBuilder().setTableType(TABLE_TYPE).setTableName(TABLE_NAME).setPayloadClass(HoodieAvroPayload.class).setRecordKeyFields(localConfig.getProps().getProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())).setPartitionFields(localConfig.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())).initTable(this.storageConf.newInstance(), this.tablePath);
        try (SparkRDDWriteClient hdfsWriteClient = this.getHoodieWriteClient(cfg);
             SparkRDDWriteClient localWriteClient = this.getHoodieWriteClient(localConfig);){
            String readCommitTime = hdfsWriteClient.startCommit();
            LOG.info("Starting commit " + readCommitTime);
            List records = this.dataGen.generateInserts(readCommitTime, Integer.valueOf(10));
            JavaRDD writeRecords = this.jsc.parallelize(records, 2);
            hdfsWriteClient.upsert(writeRecords, readCommitTime);
            FileSystem fs = HadoopFSUtils.getFs((String)this.dfsBasePath, (StorageConfiguration)HoodieTestUtils.getDefaultStorageConf());
            HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)HadoopFSUtils.getStorageConf((Configuration)fs.getConf()), (String)this.dfsBasePath);
            HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
            Dataset<Row> readRecords = HoodieClientTestUtils.readCommit(this.dfsBasePath, this.sqlContext, timeline, readCommitTime);
            Assertions.assertEquals((long)readRecords.count(), (long)records.size());
            HoodieTableMetaClient.withPropertyBuilder().setTableType(TABLE_TYPE).setTableName(TABLE_NAME).setPayloadClass(HoodieAvroPayload.class).initTable(this.storageConf.newInstance(), this.tablePath);
            String writeCommitTime = localWriteClient.startCommit();
            LOG.info("Starting write commit " + writeCommitTime);
            List localRecords = this.dataGen.generateInserts(writeCommitTime, Integer.valueOf(10));
            JavaRDD localWriteRecords = this.jsc.parallelize(localRecords, 2);
            LOG.info("Writing to path: " + this.tablePath);
            localWriteClient.upsert(localWriteRecords, writeCommitTime);
            LOG.info("Reading from path: " + this.tablePath);
            fs = HadoopFSUtils.getFs((String)this.tablePath, (StorageConfiguration)HoodieTestUtils.getDefaultStorageConf());
            metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(fs.getConf()), (String)this.tablePath);
            timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
            Dataset<Row> localReadRecords = HoodieClientTestUtils.readCommit(this.tablePath, this.sqlContext, timeline, writeCommitTime);
            Assertions.assertEquals((long)localReadRecords.count(), (long)localRecords.size());
        }
    }
}

