/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkSampleWritesUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);

    public static Option<HoodieWriteConfig> getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc, Option<JavaRDD<HoodieRecord>> recordsOpt, HoodieWriteConfig writeConfig) {
        if (!writeConfig.getBoolean(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED).booleanValue()) {
            LOG.debug("Skip overwriting record size estimate as it's disabled.");
            return Option.empty();
        }
        HoodieTableMetaClient metaClient = SparkSampleWritesUtils.getMetaClient(jsc, writeConfig.getBasePath());
        if (metaClient.isTimelineNonEmpty()) {
            LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
            return Option.empty();
        }
        try {
            String instantTime = HoodieInstantTimeGenerator.getInstantFromTemporalAccessor(Instant.now().atZone(ZoneId.systemDefault()));
            Pair<Boolean, String> result2 = SparkSampleWritesUtils.doSampleWrites(jsc, recordsOpt, writeConfig, instantTime);
            if (result2.getLeft().booleanValue()) {
                long avgSize = SparkSampleWritesUtils.getAvgSizeFromSampleWrites(jsc, result2.getRight());
                LOG.info("Overwriting record size estimate to {}", (Object)avgSize);
                TypedProperties props = writeConfig.getProps();
                props.put(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(avgSize));
                return Option.of(HoodieWriteConfig.newBuilder().withProperties(props).build());
            }
        }
        catch (IOException e) {
            LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), (Throwable)e);
        }
        return Option.empty();
    }

    private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, Option<JavaRDD<HoodieRecord>> recordsOpt, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
        String sampleWritesBasePath = SparkSampleWritesUtils.getSampleWritesBasePath(jsc, writeConfig, instantTime);
        HoodieTableMetaClient.newTableBuilder().setTableType(HoodieTableType.COPY_ON_WRITE).setTableName(String.format("%s_samples_%s", writeConfig.getTableName(), instantTime)).setCDCEnabled(false).initTable(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()), sampleWritesBasePath);
        TypedProperties props = writeConfig.getProps();
        props.put(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "false");
        HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder().withProps(props).withTableServicesEnabled(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).withSchemaEvolutionEnable(false).withBulkInsertParallelism(1).withAutoCommit(true).withPath(sampleWritesBasePath).build();
        Pair<Boolean, Object> emptyRes = Pair.of(false, null);
        try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient((HoodieEngineContext)new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty());){
            int size = writeConfig.getIntOrDefault(HoodieStreamerConfig.SAMPLE_WRITES_SIZE);
            Pair<Boolean, Object> pair = recordsOpt.map(records -> {
                List samples = records.coalesce(1).take(size);
                if (samples.isEmpty()) {
                    return emptyRes;
                }
                sampleWriteClient.startCommitWithTime(instantTime);
                JavaRDD writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
                if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0L) {
                    LOG.error("sample writes for table {} failed with errors.", (Object)writeConfig.getTableName());
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Printing out the top 100 errors");
                        writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
                            LOG.trace("Global error :", ws.getGlobalError());
                            ws.getErrors().forEach((key, throwable) -> LOG.trace(String.format("Error for key: %s", key), throwable));
                        });
                    }
                    return emptyRes;
                }
                return Pair.of(true, sampleWritesBasePath);
            }).orElse(emptyRes);
            return pair;
        }
    }

    private static String getSampleWritesBasePath(JavaSparkContext jsc, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
        StoragePath basePath = new StoragePath(writeConfig.getBasePath(), ".hoodie/.aux/.sample_writes/" + instantTime);
        HoodieStorage storage2 = SparkSampleWritesUtils.getMetaClient(jsc, writeConfig.getBasePath()).getStorage();
        if (storage2.exists(basePath)) {
            storage2.deleteDirectory(basePath);
        }
        return basePath.toString();
    }

    private static long getAvgSizeFromSampleWrites(JavaSparkContext jsc, String sampleWritesBasePath) throws IOException {
        HoodieTableMetaClient metaClient = SparkSampleWritesUtils.getMetaClient(jsc, sampleWritesBasePath);
        Option<HoodieInstant> lastInstantOpt = metaClient.getCommitTimeline().filterCompletedInstants().lastInstant();
        ValidationUtils.checkState(lastInstantOpt.isPresent(), "The only completed instant should be present in sample_writes table.");
        HoodieInstant instant = lastInstantOpt.get();
        HoodieCommitMetadata commitMetadata = metaClient.getCommitMetadataSerDe().deserialize(instant, metaClient.getCommitTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
        long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
        long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
        return (long)Math.ceil(1.0 * (double)totalBytesWritten / (double)totalRecordsWritten);
    }

    private static HoodieTableMetaClient getMetaClient(JavaSparkContext jsc, String basePath) {
        FileSystem fs = HadoopFSUtils.getFs(basePath, jsc.hadoopConfiguration());
        return HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(fs.getConf())).setBasePath(basePath).build();
    }
}

