/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.ChainedSchemaPostProcessor;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor;
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry;
import org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UtilHelpers {
    public static final String EXECUTE = "execute";
    public static final String SCHEDULE = "schedule";
    public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
    private static final Logger LOG = LoggerFactory.getLogger(UtilHelpers.class);

    public static HoodieRecordMerger createRecordMerger(Properties props) {
        List<String> recordMergerImplClasses = ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue()));
        HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(null, EngineType.SPARK, recordMergerImplClasses, props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()));
        return recordMerger;
    }

    public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) throws IOException {
        try {
            try {
                return (Source)ReflectionUtils.loadClass(sourceClass, new Class[]{TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class, HoodieIngestionMetrics.class}, new Object[]{cfg, jssc, sparkSession, schemaProvider, metrics});
            }
            catch (HoodieException e) {
                return (Source)ReflectionUtils.loadClass(sourceClass, new Class[]{TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, new Object[]{cfg, jssc, sparkSession, schemaProvider});
            }
        }
        catch (Throwable e) {
            throw new IOException("Could not load source class " + sourceClass, e);
        }
    }

    public static JsonKafkaSourcePostProcessor createJsonKafkaSourcePostProcessor(String postProcessorClassNames, TypedProperties props) throws IOException {
        if (StringUtils.isNullOrEmpty(postProcessorClassNames)) {
            return null;
        }
        try {
            ArrayList<JsonKafkaSourcePostProcessor> processors = new ArrayList<JsonKafkaSourcePostProcessor>();
            for (String className : postProcessorClassNames.split(",")) {
                processors.add((JsonKafkaSourcePostProcessor)ReflectionUtils.loadClass(className, props));
            }
            return new ChainedJsonKafkaSourcePostProcessor(processors, props);
        }
        catch (Throwable e) {
            throw new HoodieSourcePostProcessException("Could not load postProcessorClassNames class(es) " + postProcessorClassNames, e);
        }
    }

    public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg, JavaSparkContext jssc) throws IOException {
        try {
            return StringUtils.isNullOrEmpty(schemaProviderClass) ? null : (SchemaProvider)ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc);
        }
        catch (Throwable e) {
            throw new IOException("Could not load schema provider class " + schemaProviderClass, e);
        }
    }

    public static SchemaPostProcessor createSchemaPostProcessor(String schemaPostProcessorClassNames, TypedProperties cfg, JavaSparkContext jssc) {
        if (StringUtils.isNullOrEmpty(schemaPostProcessorClassNames)) {
            return null;
        }
        try {
            ArrayList<SchemaPostProcessor> processors = new ArrayList<SchemaPostProcessor>();
            for (String className : schemaPostProcessorClassNames.split(",")) {
                processors.add((SchemaPostProcessor)ReflectionUtils.loadClass(className, cfg, jssc));
            }
            return new ChainedSchemaPostProcessor(cfg, jssc, processors);
        }
        catch (Throwable e) {
            throw new HoodieSchemaPostProcessException("Could not load schemaPostProcessorClassNames class(es) " + schemaPostProcessorClassNames, e);
        }
    }

    public static StructType getSourceSchema(SchemaProvider schemaProvider) {
        if (schemaProvider != null && schemaProvider.getSourceSchema() != null && schemaProvider.getSourceSchema() != InputBatch.NULL_SCHEMA) {
            return AvroConversionUtils.convertAvroSchemaToStructType(schemaProvider.getSourceSchema());
        }
        return null;
    }

    public static Option<Transformer> createTransformer(Option<List<String>> classNamesOpt, Option<Schema> sourceSchema, boolean isErrorTableWriterEnabled) throws IOException {
        try {
            Function<List, Transformer> chainedTransformerFunction = classNames -> isErrorTableWriterEnabled ? new ErrorTableAwareChainedTransformer((List<String>)classNames, sourceSchema) : new ChainedTransformer((List<String>)classNames, sourceSchema);
            return classNamesOpt.map(classNames -> classNames.isEmpty() ? null : (Transformer)chainedTransformerFunction.apply((List)classNames));
        }
        catch (Throwable e) {
            throw new IOException("Could not load transformer class(es) " + classNamesOpt.get(), e);
        }
    }

    public static InitialCheckPointProvider createInitialCheckpointProvider(String className, TypedProperties props) throws IOException {
        try {
            return (InitialCheckPointProvider)ReflectionUtils.loadClass(className, new Class[]{TypedProperties.class}, new Object[]{props});
        }
        catch (Throwable e) {
            throw new IOException("Could not load initial checkpoint provider class " + className, e);
        }
    }

    public static DFSPropertiesConfiguration readConfig(Configuration hadoopConfig, Path cfgPath, List<String> overriddenProps) {
        DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath);
        try {
            if (!overriddenProps.isEmpty()) {
                LOG.info("Adding overridden properties to file properties.");
                conf.addPropsFromStream(new BufferedReader(new StringReader(String.join((CharSequence)"\n", overriddenProps))), cfgPath);
            }
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Unexpected error adding config overrides", ioe);
        }
        return conf;
    }

    public static DFSPropertiesConfiguration getConfig(List<String> overriddenProps) {
        DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration();
        try {
            if (!overriddenProps.isEmpty()) {
                LOG.info("Adding overridden properties to file properties.");
                conf.addPropsFromStream(new BufferedReader(new StringReader(String.join((CharSequence)"\n", overriddenProps))), null);
            }
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Unexpected error adding config overrides", ioe);
        }
        return conf;
    }

    public static TypedProperties buildProperties(List<String> props) {
        TypedProperties properties = DFSPropertiesConfiguration.getGlobalProps();
        props.forEach(x -> {
            String[] kv = x.split("=", 2);
            ValidationUtils.checkArgument(kv.length == 2);
            properties.setProperty(kv[0], kv[1]);
        });
        return properties;
    }

    public static void validateAndAddProperties(String[] configs, SparkLauncher sparkLauncher) {
        Arrays.stream(configs).filter(config -> config.contains("=") && config.split("=", 2).length == 2).forEach(xva$0 -> sparkLauncher.addAppArgs(new String[]{xva$0}));
    }

    public static String parseSchema(FileSystem fs, String schemaFile) throws Exception {
        Path p = new Path(schemaFile);
        if (!fs.exists(p)) {
            throw new Exception(String.format("Could not find - %s - schema file.", schemaFile));
        }
        long len = fs.getFileStatus(p).getLen();
        ByteBuffer buf = ByteBuffer.allocate((int)len);
        try (FSDataInputStream inputStream = fs.open(p);){
            inputStream.readFully(0L, buf.array(), 0, buf.array().length);
        }
        return new String(buf.array());
    }

    public static SparkConf buildSparkConf(String appName, String defaultMaster) {
        return UtilHelpers.buildSparkConf(appName, defaultMaster, new HashMap<String, String>());
    }

    private static SparkConf buildSparkConf(String appName, String defaultMaster, Map<String, String> additionalConfigs) {
        SparkConf sparkConf = new SparkConf().setAppName(appName);
        String master = sparkConf.get("spark.master", defaultMaster);
        sparkConf.setMaster(master);
        if (master.startsWith("yarn")) {
            sparkConf.set("spark.eventLog.overwrite", "true");
            sparkConf.set("spark.eventLog.enabled", "true");
        }
        sparkConf.set("spark.ui.port", "8090");
        sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar");
        sparkConf.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
        sparkConf.set("spark.hadoop.mapred.output.compress", "true");
        sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
        sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
        sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
        sparkConf.set("spark.driver.allowMultipleContexts", "true");
        additionalConfigs.forEach((arg_0, arg_1) -> ((SparkConf)sparkConf).set(arg_0, arg_1));
        return sparkConf;
    }

    private static SparkConf buildSparkConf(String appName, Map<String, String> additionalConfigs) {
        SparkConf sparkConf = new SparkConf().setAppName(appName);
        sparkConf.set("spark.ui.port", "8090");
        sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar");
        sparkConf.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
        sparkConf.set("spark.hadoop.mapred.output.compress", "true");
        sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
        sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
        sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
        additionalConfigs.forEach((arg_0, arg_1) -> ((SparkConf)sparkConf).set(arg_0, arg_1));
        return sparkConf;
    }

    public static JavaSparkContext buildSparkContext(String appName, Map<String, String> configs) {
        return new JavaSparkContext(UtilHelpers.buildSparkConf(appName, configs));
    }

    public static JavaSparkContext buildSparkContext(String appName, String defaultMaster, Map<String, String> configs) {
        return new JavaSparkContext(UtilHelpers.buildSparkConf(appName, defaultMaster, configs));
    }

    public static JavaSparkContext buildSparkContext(String appName, String defaultMaster) {
        return new JavaSparkContext(UtilHelpers.buildSparkConf(appName, defaultMaster));
    }

    public static JavaSparkContext buildSparkContext(String appName, String sparkMaster, String sparkMemory) {
        SparkConf sparkConf = UtilHelpers.buildSparkConf(appName, sparkMaster);
        if (sparkMemory != null) {
            sparkConf.set("spark.executor.memory", sparkMemory);
        }
        return new JavaSparkContext(sparkConf);
    }

    public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
        HoodieCompactionConfig compactionConfig = compactionStrategyClass.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false).withCompactionStrategy((CompactionStrategy)ReflectionUtils.loadClass(strategy)).build()).orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withParallelism(parallelism, parallelism).withBulkInsertParallelism(parallelism).withDeleteParallelism(parallelism).withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).withProps(properties).build();
        return new SparkRDDWriteClient<HoodieRecordPayload>(new HoodieSparkEngineContext(jsc), config);
    }

    public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD<WriteStatus> writeResponse) {
        LongAccumulator errors2 = jsc.sc().longAccumulator();
        writeResponse.foreach((VoidFunction & Serializable)writeStatus -> {
            if (writeStatus.hasErrors()) {
                errors2.add(1L);
                LOG.error(String.format("Error processing records :writeStatus:%s", writeStatus.getStat().toString()));
            }
        });
        if (errors2.value() == 0L) {
            LOG.info(String.format("Table imported into hoodie with %s instant time.", instantTime));
            return 0;
        }
        LOG.error(String.format("Import failed with %d errors.", errors2.value()));
        return -1;
    }

    public static int handleErrors(HoodieCommitMetadata metadata, String instantTime) {
        List<HoodieWriteStat> writeStats = metadata.getWriteStats();
        long errorsCount = writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
        if (errorsCount == 0L) {
            LOG.info(String.format("Finish job with %s instant time.", instantTime));
            return 0;
        }
        LOG.error(String.format("Job failed with %d errors.", errorsCount));
        return -1;
    }

    private static Connection createConnectionFactory(Map<String, String> options) throws SQLException {
        String driverClass = options.get(JDBCOptions.JDBC_DRIVER_CLASS());
        DriverRegistry.register((String)driverClass);
        Enumeration<Driver> drivers = DriverManager.getDrivers();
        Driver driver = null;
        while (drivers.hasMoreElements()) {
            Driver d = drivers.nextElement();
            if (d instanceof DriverWrapper) {
                if (((DriverWrapper)d).wrapped().getClass().getCanonicalName().equals(driverClass)) {
                    driver = d;
                }
            } else if (d.getClass().getCanonicalName().equals(driverClass)) {
                driver = d;
            }
            if (driver == null) continue;
            break;
        }
        Objects.requireNonNull(driver, String.format("Did not find registered driver with class %s", driverClass));
        Properties properties = new Properties();
        properties.putAll(options);
        String url2 = options.get(JDBCOptions.JDBC_URL());
        Connection connect = driver.connect(url2, properties);
        Objects.requireNonNull(connect, String.format("The driver could not open a JDBC connection. Check the URL: %s", url2));
        return connect;
    }

    private static Boolean tableExists(Connection conn, Map<String, String> options) throws SQLException {
        JdbcDialect dialect = JdbcDialects.get((String)options.get(JDBCOptions.JDBC_URL()));
        try (PreparedStatement statement = conn.prepareStatement(dialect.getTableExistsQuery(options.get(JDBCOptions.JDBC_TABLE_NAME())));){
            statement.setQueryTimeout(Integer.parseInt(options.get(JDBCOptions.JDBC_QUERY_TIMEOUT())));
            statement.executeQuery();
        }
        return true;
    }

    /*
     * Exception decompiling
     */
    public static Schema getJDBCSchema(Map<String, String> options) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 3 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public static SchemaProvider getOriginalSchemaProvider(SchemaProvider schemaProvider) {
        SchemaProvider originalProvider = schemaProvider;
        if (schemaProvider instanceof SchemaProviderWithPostProcessor) {
            originalProvider = ((SchemaProviderWithPostProcessor)schemaProvider).getOriginalSchemaProvider();
        } else if (schemaProvider instanceof DelegatingSchemaProvider) {
            originalProvider = ((DelegatingSchemaProvider)schemaProvider).getSourceSchemaProvider();
        }
        return originalProvider;
    }

    public static SchemaProvider wrapSchemaProviderWithPostProcessor(SchemaProvider provider, TypedProperties cfg, JavaSparkContext jssc, List<String> transformerClassNames) {
        if (provider == null) {
            return null;
        }
        if (provider instanceof SchemaProviderWithPostProcessor) {
            return provider;
        }
        String schemaPostProcessorClass = ConfigUtils.getStringWithAltKeys(cfg, SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR, true);
        boolean enableSparkAvroPostProcessor = ConfigUtils.getBooleanWithAltKeys(cfg, HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE);
        if (transformerClassNames != null && !transformerClassNames.isEmpty() && enableSparkAvroPostProcessor && StringUtils.isNullOrEmpty(schemaPostProcessorClass)) {
            schemaPostProcessorClass = SparkAvroPostProcessor.class.getName();
        }
        if (schemaPostProcessorClass == null || schemaPostProcessorClass.isEmpty()) {
            return provider;
        }
        return new SchemaProviderWithPostProcessor(provider, Option.ofNullable(UtilHelpers.createSchemaPostProcessor(schemaPostProcessorClass, cfg, jssc)));
    }

    public static SchemaProvider getSchemaProviderForKafkaSource(SchemaProvider provider, TypedProperties cfg, JavaSparkContext jssc) {
        if (KafkaOffsetPostProcessor.Config.shouldAddOffsets(cfg)) {
            return new SchemaProviderWithPostProcessor(provider, Option.ofNullable(new KafkaOffsetPostProcessor(cfg, jssc)));
        }
        return provider;
    }

    public static SchemaProvider createRowBasedSchemaProvider(StructType structType, TypedProperties cfg, JavaSparkContext jssc) {
        RowBasedSchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
        return UtilHelpers.wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, null);
    }

    public static Option<Schema> getLatestTableSchema(JavaSparkContext jssc, FileSystem fs, String basePath, HoodieTableMetaClient tableMetaClient) {
        try {
            if (FSUtils.isTableExists(basePath, fs)) {
                TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(tableMetaClient);
                return tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false);
            }
        }
        catch (Exception e) {
            LOG.warn("Failed to fetch latest table's schema", (Throwable)e);
        }
        return Option.empty();
    }

    public static HoodieTableMetaClient createMetaClient(JavaSparkContext jsc, String basePath, boolean shouldLoadActiveTimelineOnLoad) {
        return HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).setLoadActiveTimelineOnLoad(shouldLoadActiveTimelineOnLoad).build();
    }

    public static void addLockOptions(String basePath, TypedProperties props) {
        if (!props.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
            props.putAll((Map<?, ?>)FileSystemBasedLockProvider.getLockConfig(basePath));
        }
    }

    public static int retry(int maxRetryCount, CheckedSupplier<Integer> supplier, String errorMessage) {
        int ret = -1;
        try {
            while ((ret = supplier.get().intValue()) != 0 && maxRetryCount-- > 0) {
            }
        }
        catch (Throwable t) {
            LOG.error(errorMessage, t);
        }
        return ret;
    }

    public static String getSchemaFromLatestInstant(HoodieTableMetaClient metaClient) throws Exception {
        TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
        Schema schema = schemaResolver.getTableAvroSchema(false);
        return schema.toString();
    }

    @FunctionalInterface
    public static interface CheckedSupplier<T> {
        public T get() throws Throwable;
    }
}

