/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.testutils;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.providers.HoodieMetaClientProvider;
import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;

public class SparkClientFunctionalTestHarness
implements SparkProvider,
HoodieMetaClientProvider,
HoodieWriteClientProvider {
    protected static int timelineServicePort = (Integer)FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue();
    private static transient SparkSession spark;
    private static transient SQLContext sqlContext;
    private static transient JavaSparkContext jsc;
    private static transient HoodieSparkEngineContext context;
    private static transient TimelineService timelineService;
    private HoodieStorage storage;
    private FileSystem fileSystem;
    protected boolean initialized = false;
    @TempDir
    protected Path tempDir;

    public static Map<String, String> getSparkSqlConf() {
        HashMap<String, String> sqlConf = new HashMap<String, String>();
        sqlConf.put("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
        sqlConf.put("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog");
        return sqlConf;
    }

    public String basePath() {
        return this.tempDir.toAbsolutePath().toUri().toString();
    }

    @Override
    public SparkSession spark() {
        return spark;
    }

    @Override
    public SQLContext sqlContext() {
        return sqlContext;
    }

    @Override
    public JavaSparkContext jsc() {
        return jsc;
    }

    public StorageConfiguration<Configuration> storageConf() {
        return HadoopFSUtils.getStorageConf((Configuration)jsc.hadoopConfiguration());
    }

    public HoodieStorage hoodieStorage() {
        if (this.storage == null) {
            this.storage = HoodieStorageUtils.getStorage((String)this.basePath(), this.storageConf());
        }
        return this.storage;
    }

    public FileSystem fs() {
        if (this.fileSystem == null) {
            this.fileSystem = (FileSystem)this.hoodieStorage().getFileSystem();
        }
        return this.fileSystem;
    }

    public HoodieSparkEngineContext context() {
        return context;
    }

    public HoodieTableMetaClient getHoodieMetaClient(HoodieTableType tableType) throws IOException {
        return this.getHoodieMetaClient(tableType, new Properties());
    }

    public HoodieTableMetaClient getHoodieMetaClient(HoodieTableType tableType, Properties props) throws IOException {
        return this.getHoodieMetaClient(this.storageConf(), this.basePath(), tableType, props);
    }

    public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> storageConf, String basePath, HoodieTableType tableType, Properties props) throws IOException {
        return HoodieTableMetaClient.newTableBuilder().setTableName("raw_trips").setTableType(tableType).fromProperties(props).initTable(storageConf.newInstance(), basePath);
    }

    public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> storageConf, String basePath) throws IOException {
        return this.getHoodieMetaClient(storageConf, basePath, this.getPropertiesForKeyGen(true));
    }

    public HoodieTableMetaClient getHoodieMetaClientWithTableVersion(StorageConfiguration<?> storageConf, String basePath, String tableVersion) throws IOException {
        Properties props = this.getPropertiesForKeyGen(true);
        props.put(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), tableVersion);
        return this.getHoodieMetaClient(storageConf, basePath, props);
    }

    public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> storageConf, String basePath, Properties props) throws IOException {
        return this.getHoodieMetaClient(storageConf, basePath, props, HoodieTableType.COPY_ON_WRITE);
    }

    public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> storageConf, String basePath, Properties props, HoodieTableType tableType) throws IOException {
        return HoodieTableMetaClient.newTableBuilder().setTableName("raw_trips").setTableType(tableType).setPayloadClass(HoodieAvroPayload.class).setTableVersion(ConfigUtils.getIntWithAltKeys((TypedProperties)new TypedProperties(props), (ConfigProperty)HoodieWriteConfig.WRITE_TABLE_VERSION)).fromProperties(props).initTable(storageConf.newInstance(), basePath);
    }

    public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException {
        return new SparkRDDWriteClient((HoodieEngineContext)this.context(), cfg);
    }

    @BeforeEach
    public synchronized void runBeforeEach() {
        boolean bl = this.initialized = spark != null;
        if (!this.initialized) {
            SparkConf sparkConf = this.conf();
            HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
            SparkRDDReadClient.addHoodieSupport((SparkConf)sparkConf);
            spark = SparkSession.builder().config(sparkConf).getOrCreate();
            sqlContext = spark.sqlContext();
            HoodieClientTestUtils.overrideSparkHadoopConfiguration(spark.sparkContext());
            jsc = new JavaSparkContext(spark.sparkContext());
            context = new HoodieSparkEngineContext(jsc);
            timelineService = HoodieClientTestUtils.initTimelineService((HoodieEngineContext)context, this.basePath(), this.incrementTimelineServicePortToUse());
            timelineServicePort = timelineService.getServerPort();
        }
    }

    @AfterAll
    public static synchronized void resetSpark() {
        if (spark != null) {
            spark.close();
            spark = null;
        }
        if (timelineService != null) {
            timelineService.close();
        }
    }

    @AfterEach
    public void closeFileSystem() throws IOException {
        if (this.fileSystem != null) {
            this.fileSystem.close();
            this.fileSystem = null;
        }
    }

    protected JavaRDD<HoodieRecord> tagLocation(HoodieIndex index, JavaRDD<HoodieRecord> records, HoodieTable table) {
        return HoodieJavaRDD.getJavaRDD((HoodieData)index.tagLocation((HoodieData)HoodieJavaRDD.of(records), (HoodieEngineContext)context, table));
    }

    protected JavaRDD<WriteStatus> updateLocation(HoodieIndex index, JavaRDD<WriteStatus> writeStatus, HoodieTable table) {
        return HoodieJavaRDD.getJavaRDD((HoodieData)index.updateLocation((HoodieData)HoodieJavaRDD.of(writeStatus), (HoodieEngineContext)context, table));
    }

    protected Stream<HoodieBaseFile> insertRecordsToMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
        return this.insertRecordsToMORTable(metaClient, records, client, cfg, commitTime, false);
    }

    protected Stream<HoodieBaseFile> insertRecordsToMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime, boolean doExplicitCommit) throws IOException {
        HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
        JavaRDD writeRecords = this.jsc().parallelize(records, 1);
        JavaRDD statusesRdd = client.insert(writeRecords, commitTime);
        List statuses = statusesRdd.collect();
        Assertions.assertNoWriteErrors((List)statuses);
        if (doExplicitCommit) {
            client.commit(commitTime, (Object)statusesRdd);
        }
        Assertions.assertFileSizesEqual((List)statuses, status -> FSUtils.getFileSize((HoodieStorage)reloadedMetaClient.getStorage(), (StoragePath)new StoragePath(reloadedMetaClient.getBasePath(), status.getStat().getPath())));
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)reloadedMetaClient);
        Option deltaCommit = reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
        org.junit.jupiter.api.Assertions.assertTrue((boolean)deltaCommit.isPresent());
        org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime, (Object)((HoodieInstant)deltaCommit.get()).requestedTime(), (String)"Delta commit should be specified value");
        Option commit = reloadedMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant();
        org.junit.jupiter.api.Assertions.assertFalse((boolean)commit.isPresent());
        List<StoragePathInfo> allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
        HoodieTableFileSystemView roView = this.getHoodieTableFileSystemView(reloadedMetaClient, reloadedMetaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
        Stream dataFilesToRead = roView.getLatestBaseFiles();
        org.junit.jupiter.api.Assertions.assertTrue((!dataFilesToRead.findAny().isPresent() ? 1 : 0) != 0);
        roView = this.getHoodieTableFileSystemView(reloadedMetaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
        dataFilesToRead = roView.getLatestBaseFiles();
        return dataFilesToRead;
    }

    protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
        this.updateRecordsInMORTable(metaClient, records, client, cfg, commitTime, true);
    }

    protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime, boolean doExplicitCommit) throws IOException {
        HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
        HashMap<HoodieKey, HoodieRecord> recordsMap = new HashMap<HoodieKey, HoodieRecord>();
        for (HoodieRecord rec : records) {
            if (recordsMap.containsKey(rec.getKey())) continue;
            recordsMap.put(rec.getKey(), rec);
        }
        JavaRDD statusesRdd = client.upsert(this.jsc().parallelize(records, 1), commitTime);
        List statuses = statusesRdd.collect();
        Assertions.assertNoWriteErrors((List)statuses);
        if (doExplicitCommit) {
            client.commit(commitTime, (Object)statusesRdd);
        }
        Assertions.assertFileSizesEqual((List)statuses, status -> FSUtils.getFileSize((HoodieStorage)reloadedMetaClient.getStorage(), (StoragePath)new StoragePath(reloadedMetaClient.getBasePath(), status.getStat().getPath())));
        Option deltaCommit = reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
        org.junit.jupiter.api.Assertions.assertTrue((boolean)deltaCommit.isPresent());
        org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime, (Object)((HoodieInstant)deltaCommit.get()).requestedTime(), (String)"Latest Delta commit should match specified time");
        Option commit = reloadedMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
        org.junit.jupiter.api.Assertions.assertFalse((boolean)commit.isPresent());
    }

    protected List<StoragePathInfo> listAllBaseFilesInPath(HoodieTable table) throws IOException {
        return HoodieTestTable.of((HoodieTableMetaClient)table.getMetaClient()).listAllBaseFiles(table.getBaseFileExtension());
    }

    protected Properties getPropertiesForKeyGen() {
        return this.getPropertiesForKeyGen(false);
    }

    protected Properties getPropertiesForKeyGen(boolean populateMetaFields) {
        Properties properties = new Properties();
        properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields));
        properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
        properties.put("hoodie.datasource.write.partitionpath.field", "partition_path");
        properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
        properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path");
        return properties;
    }

    protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) {
        configBuilder.withProperties(this.getPropertiesForKeyGen(populateMetaFields));
        if (!populateMetaFields) {
            configBuilder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
        }
    }

    protected HoodieWriteConfig getConfig(Boolean autoCommit) {
        return this.getConfigBuilder(autoCommit).build();
    }

    protected HoodieWriteConfig getConfig(Boolean autoCommit, Boolean rollbackUsingMarkers) {
        return this.getConfigBuilder(autoCommit, rollbackUsingMarkers, HoodieIndex.IndexType.BLOOM).build();
    }

    protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
        return this.getConfigBuilder(autoCommit, HoodieIndex.IndexType.BLOOM);
    }

    protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, HoodieIndex.IndexType indexType) {
        return this.getConfigBuilder(autoCommit, false, indexType);
    }

    protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
        return this.getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig);
    }

    protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) {
        return this.getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 0x40000000L, HoodieClusteringConfig.newBuilder().build());
    }

    protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withAutoCommit(autoCommit.booleanValue()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize).withInlineCompaction(Boolean.valueOf(false)).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(0x40000000L).parquetMaxFileSize(0x40000000L).build()).withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table").withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).withEnableBackupForRemoteFileSystemView(false).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).withClusteringConfig(clusteringConfig).withRollbackUsingMarkers(rollbackUsingMarkers.booleanValue());
    }

    protected Dataset<Row> toDataset(List<HoodieRecord> records, Schema schema) {
        List avroRecords = records.stream().map(r -> {
            HoodieRecordPayload payload = (HoodieRecordPayload)r.getData();
            try {
                return (GenericRecord)payload.getInsertValue(schema).get();
            }
            catch (IOException e) {
                throw new HoodieIOException("Failed to extract Avro payload", e);
            }
        }).collect(Collectors.toList());
        JavaRDD jrdd = jsc.parallelize(avroRecords, 2);
        return AvroConversionUtils.createDataFrame((RDD)jrdd.rdd(), (String)schema.toString(), (SparkSession)spark);
    }

    protected int incrementTimelineServicePortToUse() {
        timelineServicePort = (timelineServicePort + 1 - 1024) % 64512 + 1024;
        return timelineServicePort;
    }

    public static boolean areDataframesEqual(Dataset<Row> expectedDf, Dataset<Row> actualDf, Set<String> validateColumns) {
        Dataset df2Sorted;
        String[] sortedColumnNames = (String[])Arrays.stream(expectedDf.columns()).filter(validateColumns::contains).sorted().toArray(String[]::new);
        Dataset df1Normalized = expectedDf.selectExpr(sortedColumnNames);
        Dataset df2Normalized = actualDf.selectExpr(sortedColumnNames);
        Dataset df1Sorted = df1Normalized.sort("_row_key", new String[0]);
        return df1Sorted.except(df2Sorted = df2Normalized.sort("_row_key", new String[0])).isEmpty() && df2Sorted.except(df1Sorted).isEmpty();
    }
}

