/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.functional;

import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.HoodieSnapshotExporter;
import org.apache.hudi.utilities.config.SqlTransformerConfig;
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(value="functional")
public class TestHoodieSnapshotExporter
extends SparkClientFunctionalTestHarness {
    static final Logger LOG = LoggerFactory.getLogger(TestHoodieSnapshotExporter.class);
    static final int NUM_RECORDS = 100;
    static final String COMMIT_TIME = "20200101000000";
    static final String PARTITION_PATH = "2020";
    static final String TABLE_NAME = "testing";
    String sourcePath;
    String targetPath;
    HoodieStorage storage;

    @BeforeEach
    public void init() throws Exception {
        this.sourcePath = Paths.get(this.basePath(), "source").toString();
        this.targetPath = Paths.get(this.basePath(), "target").toString();
        this.storage = HoodieStorageUtils.getStorage((String)this.basePath(), (StorageConfiguration)HadoopFSUtils.getStorageConf((Configuration)this.jsc().hadoopConfiguration()));
        HoodieTableMetaClient.newTableBuilder().setTableType(HoodieTableType.COPY_ON_WRITE).setTableName(TABLE_NAME).setPayloadClass(HoodieAvroPayload.class).initTable(HadoopFSUtils.getStorageConfWithCopy((Configuration)this.jsc().hadoopConfiguration()), this.sourcePath);
        HoodieWriteConfig cfg = this.getHoodieWriteConfig(this.sourcePath);
        SparkRDDWriteClient writeClient = this.getHoodieWriteClient(cfg);
        Object object = null;
        try {
            writeClient.startCommitWithTime(COMMIT_TIME);
            HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[]{PARTITION_PATH});
            List records = dataGen.generateInserts(COMMIT_TIME, Integer.valueOf(100));
            JavaRDD recordsRDD = this.jsc().parallelize(records, 1);
            writeClient.bulkInsert(recordsRDD, COMMIT_TIME);
        }
        catch (Throwable dataGen) {
            object = dataGen;
            throw dataGen;
        }
        finally {
            if (writeClient != null) {
                if (object != null) {
                    try {
                        writeClient.close();
                    }
                    catch (Throwable dataGen) {
                        ((Throwable)object).addSuppressed(dataGen);
                    }
                } else {
                    writeClient.close();
                }
            }
        }
        List pathInfoList = this.storage.listFiles(new StoragePath(this.sourcePath));
        for (StoragePathInfo pathInfo : pathInfoList) {
            LOG.info(">>> Prepared test file: " + pathInfo.getPath());
        }
    }

    @AfterEach
    public void cleanUp() throws IOException {
        this.storage.close();
    }

    private HoodieWriteConfig getHoodieWriteConfig(String basePath) {
        return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(false).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withBulkInsertParallelism(2).withDeleteParallelism(2).forTable(TABLE_NAME).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
    }

    @Nested
    public class TestHoodieSnapshotExporterForRepartitioning {
        private HoodieSnapshotExporter.Config cfg;

        @BeforeEach
        public void setUp() {
            this.cfg = new HoodieSnapshotExporter.Config();
            this.cfg.sourceBasePath = TestHoodieSnapshotExporter.this.sourcePath;
            this.cfg.targetOutputPath = TestHoodieSnapshotExporter.this.targetPath;
            this.cfg.outputFormat = "json";
        }

        @Test
        public void testExportWithPartitionField() throws IOException {
            this.cfg.outputPartitionField = "driver";
            new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg);
            Assertions.assertEquals((long)100L, (long)TestHoodieSnapshotExporter.this.sqlContext().read().format("json").load(TestHoodieSnapshotExporter.this.targetPath).count());
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.storage.exists(new StoragePath(TestHoodieSnapshotExporter.this.targetPath + "/_SUCCESS")));
            Assertions.assertTrue((TestHoodieSnapshotExporter.this.storage.listDirectEntries(new StoragePath(TestHoodieSnapshotExporter.this.targetPath)).size() > 1 ? 1 : 0) != 0);
        }

        @Test
        public void testExportForUserDefinedPartitioner() throws IOException {
            this.cfg.outputPartitioner = UserDefinedPartitioner.class.getName();
            new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg);
            Assertions.assertEquals((long)100L, (long)TestHoodieSnapshotExporter.this.sqlContext().read().format("json").load(TestHoodieSnapshotExporter.this.targetPath).count());
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.storage.exists(new StoragePath(TestHoodieSnapshotExporter.this.targetPath + "/_SUCCESS")));
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.storage.exists(new StoragePath(String.format("%s/%s=%s", TestHoodieSnapshotExporter.this.targetPath, "year", TestHoodieSnapshotExporter.PARTITION_PATH))));
        }
    }

    public static class UserDefinedPartitioner
    implements HoodieSnapshotExporter.Partitioner {
        public static final String PARTITION_NAME = "year";

        public DataFrameWriter<Row> partition(Dataset<Row> source) {
            return source.withColumnRenamed(HoodieRecord.PARTITION_PATH_METADATA_FIELD, PARTITION_NAME).repartition(new Column[]{new Column(PARTITION_NAME)}).write().partitionBy(new String[]{PARTITION_NAME});
        }
    }

    @Nested
    public class TestHoodieSnapshotExporterForNonHudi {
        private HoodieSnapshotExporter.Config createConfig(String format) {
            HoodieSnapshotExporter.Config cfg = new HoodieSnapshotExporter.Config();
            cfg.sourceBasePath = TestHoodieSnapshotExporter.this.sourcePath;
            cfg.targetOutputPath = TestHoodieSnapshotExporter.this.targetPath;
            cfg.outputFormat = format;
            return cfg;
        }

        private void assertExport(Dataset<Row> data, int fieldsCount, List<String> fieldNamesToCheck) throws IOException {
            List<String> fieldNames = Arrays.asList(data.schema().fieldNames());
            Assertions.assertEquals((int)fieldsCount, (int)fieldNames.size());
            Assertions.assertTrue((boolean)fieldNames.containsAll(fieldNamesToCheck));
            Assertions.assertEquals((long)100L, (long)data.count());
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.storage.exists(new StoragePath(TestHoodieSnapshotExporter.this.targetPath + "/_SUCCESS")));
        }

        @ParameterizedTest
        @ValueSource(strings={"json", "parquet"})
        public void testExportAsNonHudi(String format) throws IOException {
            HoodieSnapshotExporter.Config cfg = this.createConfig(format);
            if (cfg != null) {
                new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), cfg);
                Dataset data = TestHoodieSnapshotExporter.this.sqlContext().read().format(format).load(TestHoodieSnapshotExporter.this.targetPath);
                this.assertExport((Dataset<Row>)data, 21, Collections.singletonList("fare"));
            }
        }

        @ParameterizedTest
        @ValueSource(strings={"json", "parquet"})
        public void testSqlTransformedExportAsNonHudi(String format) throws IOException {
            HoodieSnapshotExporter.Config cfg = this.createConfig(format);
            if (cfg != null) {
                cfg.transformerClassName = "org.apache.hudi.utilities.transform.SqlQueryBasedTransformer";
                cfg.transformerSql = "SELECT substr(rider,1,10) as rider, trip_type as tripType FROM <SRC> WHERE trip_type = 'BLACK' LIMIT 10";
                new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), cfg);
                this.assertSqlTransformedExport(format);
            }
        }

        @ParameterizedTest
        @ValueSource(strings={"json", "parquet"})
        public void testSqlFileTransformedExportAsNonHudi(String format) throws IOException {
            HoodieSnapshotExporter.Config cfg = this.createConfig(format);
            if (cfg != null) {
                cfg.transformerClassName = "org.apache.hudi.utilities.transform.SqlFileBasedTransformer";
                cfg.transformerSqlFile = TestHoodieSnapshotExporter.this.basePath() + "/sql-file-transformer.sql";
                UtilitiesTestBase.Helpers.copyToDFS("exporter/sql-file-transformer.sql", TestHoodieSnapshotExporter.this.storage, cfg.transformerSqlFile);
                new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), cfg);
                this.assertSqlTransformedExport(format);
            }
        }

        private void assertSqlTransformedExport(String format) throws IOException {
            Dataset transformedData = TestHoodieSnapshotExporter.this.sqlContext().read().format(format).load(TestHoodieSnapshotExporter.this.targetPath);
            transformedData.foreach((ForeachFunction & Serializable)row -> {
                Assertions.assertEquals((Object)"rider-2020", (Object)row.getString(0));
                Assertions.assertEquals((Object)"BLACK", (Object)row.getString(1));
            });
            String[] fieldNames = transformedData.schema().fieldNames();
            Assertions.assertEquals((int)2, (int)fieldNames.length);
            Assertions.assertEquals((Object)"rider", (Object)fieldNames[0]);
            Assertions.assertEquals((Object)"tripType", (Object)fieldNames[1]);
            Assertions.assertEquals((Object)"StructType(StructField(rider,StringType,true),StructField(tripType,StringType,true))", (Object)transformedData.schema().toString());
            Assertions.assertEquals((long)10L, (long)transformedData.count());
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.storage.exists(new StoragePath(TestHoodieSnapshotExporter.this.targetPath + "/_SUCCESS")));
        }

        @ParameterizedTest
        @ValueSource(strings={"json", "parquet"})
        public void testFlattenedExportAsNonHudi(String format) throws IOException {
            HoodieSnapshotExporter.Config cfg = this.createConfig(format);
            if (cfg != null) {
                cfg.transformerClassName = "org.apache.hudi.utilities.transform.FlatteningTransformer";
                new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), cfg);
                Dataset transformedData = TestHoodieSnapshotExporter.this.sqlContext().read().format(format).load(TestHoodieSnapshotExporter.this.targetPath);
                this.assertExport((Dataset<Row>)transformedData, 22, Arrays.asList("fare_amount", "fare_currency"));
            }
        }

        @ParameterizedTest
        @ValueSource(strings={"json", "parquet"})
        public void testAWSDmsTransformedExportAsNonHudi(String format) throws IOException {
            HoodieSnapshotExporter.Config cfg = this.createConfig(format);
            if (cfg != null) {
                cfg.transformerClassName = "org.apache.hudi.utilities.transform.AWSDmsTransformer";
                new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), cfg);
                Dataset transformedData = TestHoodieSnapshotExporter.this.sqlContext().read().format(format).load(TestHoodieSnapshotExporter.this.targetPath);
                this.assertExport((Dataset<Row>)transformedData, 22, Collections.singletonList("Op"));
            }
        }
    }

    @Nested
    public class TestHoodieSnapshotExporterForEarlyAbort {
        private HoodieSnapshotExporter.Config cfg;

        @BeforeEach
        public void setUp() {
            this.cfg = new HoodieSnapshotExporter.Config();
            this.cfg.sourceBasePath = TestHoodieSnapshotExporter.this.sourcePath;
            this.cfg.targetOutputPath = TestHoodieSnapshotExporter.this.targetPath;
            this.cfg.outputFormat = "hudi";
        }

        @Test
        public void testExportWhenTargetPathExists() throws IOException {
            TestHoodieSnapshotExporter.this.storage.createDirectory(new StoragePath(TestHoodieSnapshotExporter.this.targetPath));
            Throwable thrown = Assertions.assertThrows(HoodieSnapshotExporterException.class, () -> new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg));
            Assertions.assertEquals((Object)"The target output path already exists.", (Object)thrown.getMessage());
        }

        @Test
        public void testExportDatasetWithNoCommit() throws IOException {
            List commitFiles = TestHoodieSnapshotExporter.this.storage.listDirectEntries(new StoragePath(TestHoodieSnapshotExporter.this.sourcePath + "/.hoodie/timeline/")).stream().map(StoragePathInfo::getPath).filter(filePath -> filePath.getName().endsWith(".commit")).collect(Collectors.toList());
            for (StoragePath p : commitFiles) {
                TestHoodieSnapshotExporter.this.storage.deleteFile(p);
            }
            Throwable thrown = Assertions.assertThrows(HoodieSnapshotExporterException.class, () -> new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg));
            Assertions.assertEquals((Object)"No commits present. Nothing to snapshot.", (Object)thrown.getMessage());
        }

        @Test
        public void testExportDatasetWithNoPartition() throws IOException {
            TestHoodieSnapshotExporter.this.storage.deleteDirectory(new StoragePath(TestHoodieSnapshotExporter.this.sourcePath + "/" + TestHoodieSnapshotExporter.PARTITION_PATH));
            TestHoodieSnapshotExporter.this.storage.deleteDirectory(new StoragePath(this.cfg.sourceBasePath + "/.hoodie/metadata"));
            Throwable thrown = Assertions.assertThrows(HoodieSnapshotExporterException.class, () -> new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg));
            Assertions.assertEquals((Object)"The source dataset has 0 partition to snapshot.", (Object)thrown.getMessage());
        }

        @Test
        public void testExportTransformedDatasetWithNoTransformerSql() {
            this.cfg.outputFormat = "json";
            this.cfg.transformerClassName = "org.apache.hudi.utilities.transform.SqlQueryBasedTransformer";
            Throwable thrown = Assertions.assertThrows(IllegalArgumentException.class, () -> new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg));
            Assertions.assertEquals((Object)("Property " + SqlTransformerConfig.TRANSFORMER_SQL.key() + " not found"), (Object)thrown.getMessage());
            this.cfg.transformerClassName = "org.apache.hudi.utilities.transform.SqlFileBasedTransformer";
            thrown = Assertions.assertThrows(IllegalArgumentException.class, () -> new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg));
            Assertions.assertEquals((Object)("Property " + SqlTransformerConfig.TRANSFORMER_SQL_FILE.key() + " not found"), (Object)thrown.getMessage());
        }

        @Test
        public void testTransformerOptionsValidity() {
            HoodieSnapshotExporter.Config config = new HoodieSnapshotExporter.Config();
            Assertions.assertTrue((boolean)HoodieSnapshotExporter.areTransformerOptionsValid((HoodieSnapshotExporter.Config)config));
            config.transformerClassName = "org.apache.hudi.utilities.transform.FlatteningTransformer";
            Assertions.assertTrue((boolean)HoodieSnapshotExporter.areTransformerOptionsValid((HoodieSnapshotExporter.Config)config));
            config.transformerClassName = "org.apache.hudi.utilities.transform.SqlQueryBasedTransformer";
            Assertions.assertFalse((boolean)HoodieSnapshotExporter.areTransformerOptionsValid((HoodieSnapshotExporter.Config)config));
            config.transformerClassName = "org.apache.hudi.utilities.transform.SqlFileBasedTransformer";
            Assertions.assertFalse((boolean)HoodieSnapshotExporter.areTransformerOptionsValid((HoodieSnapshotExporter.Config)config));
            config.transformerSqlFile = "filepath";
            Assertions.assertTrue((boolean)HoodieSnapshotExporter.areTransformerOptionsValid((HoodieSnapshotExporter.Config)config));
            config.transformerClassName = "org.apache.hudi.utilities.transform.SqlQueryBasedTransformer";
            config.transformerSql = "query";
            Assertions.assertTrue((boolean)HoodieSnapshotExporter.areTransformerOptionsValid((HoodieSnapshotExporter.Config)config));
        }
    }

    @Nested
    public class TestHoodieSnapshotExporterForHudi {
        private HoodieSnapshotExporter.Config cfg;

        @BeforeEach
        public void setUp() {
            this.cfg = new HoodieSnapshotExporter.Config();
            this.cfg.sourceBasePath = TestHoodieSnapshotExporter.this.sourcePath;
            this.cfg.targetOutputPath = TestHoodieSnapshotExporter.this.targetPath;
            this.cfg.outputFormat = "hudi";
        }

        @Test
        public void testExportAsHudi() throws IOException {
            new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg);
            StoragePath completeInstantPath = HoodieTestUtils.getCompleteInstantPath((HoodieStorage)TestHoodieSnapshotExporter.this.storage, (StoragePath)new StoragePath(new StoragePath(TestHoodieSnapshotExporter.this.targetPath, ".hoodie"), "timeline"), (String)TestHoodieSnapshotExporter.COMMIT_TIME, (String)"commit");
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.storage.exists(completeInstantPath));
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.storage.exists(new StoragePath(TestHoodieSnapshotExporter.this.targetPath + "/.hoodie/timeline/" + TestHoodieSnapshotExporter.COMMIT_TIME + ".commit.requested")));
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.storage.exists(new StoragePath(TestHoodieSnapshotExporter.this.targetPath + "/.hoodie/timeline/" + TestHoodieSnapshotExporter.COMMIT_TIME + ".inflight")));
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.storage.exists(new StoragePath(TestHoodieSnapshotExporter.this.targetPath + "/.hoodie/hoodie.properties")));
            String partition = TestHoodieSnapshotExporter.this.targetPath + "/" + TestHoodieSnapshotExporter.PARTITION_PATH;
            long numParquetFiles = TestHoodieSnapshotExporter.this.storage.listDirectEntries(new StoragePath(partition)).stream().filter(fileStatus -> fileStatus.getPath().toString().endsWith(".parquet")).count();
            Assertions.assertTrue((numParquetFiles >= 1L ? 1 : 0) != 0, (String)"There should exist at least 1 parquet file.");
            Assertions.assertEquals((long)100L, (long)TestHoodieSnapshotExporter.this.sqlContext().read().parquet(partition).count());
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.storage.exists(new StoragePath(partition + "/.hoodie_partition_metadata")));
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.storage.exists(new StoragePath(TestHoodieSnapshotExporter.this.targetPath + "/_SUCCESS")));
        }
    }
}

