/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.bootstrap;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkValidatorUtils;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.BootstrapUtils;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.bootstrap.MetadataBootstrapHandlerFactory;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;

public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>>> {
    private static final Logger LOG = LogManager.getLogger(SparkBootstrapCommitActionExecutor.class);
    protected String bootstrapSchema = null;
    private transient FileSystem bootstrapSourceFileSystem;

    public SparkBootstrapCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, Option<Map<String, String>> extraMetadata) {
        super(context, new HoodieWriteConfig.Builder().withProps(config.getProps()).withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class).withBulkInsertParallelism(config.getBootstrapParallelism()).build(), table, "00000000000001", WriteOperationType.BOOTSTRAP, extraMetadata);
        this.bootstrapSourceFileSystem = FSUtils.getFs(config.getBootstrapSourceBasePath(), this.hadoopConf);
    }

    private void validate() {
        ValidationUtils.checkArgument(this.config.getBootstrapSourceBasePath() != null, "Ensure Bootstrap Source Path is set");
        ValidationUtils.checkArgument(this.config.getBootstrapModeSelectorClass() != null, "Ensure Bootstrap Partition Selector is set");
    }

    @Override
    public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> execute() {
        this.validate();
        try {
            HoodieTableMetaClient metaClient = this.table.getMetaClient();
            Option<HoodieInstant> completedInstant = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
            ValidationUtils.checkArgument(!completedInstant.isPresent(), "Active Timeline is expected to be empty for bootstrap to be performed. If you want to re-bootstrap, please rollback bootstrap first !!");
            Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = this.listAndProcessSourcePartitions();
            Option metadataResult = this.metadataBootstrap(partitionSelections.get((Object)BootstrapMode.METADATA_ONLY));
            Option fullBootstrapResult = this.fullBootstrap(partitionSelections.get((Object)BootstrapMode.FULL_RECORD));
            WriteMarkersFactory.get(this.config.getMarkersType(), this.table, this.instantTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
            return new HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>>(metadataResult, fullBootstrapResult);
        }
        catch (IOException ioe) {
            throw new HoodieIOException(ioe.getMessage(), ioe);
        }
    }

    @Override
    protected String getSchemaToStoreInCommit() {
        return this.bootstrapSchema;
    }

    protected Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
        if (null == partitionFilesList || partitionFilesList.isEmpty()) {
            return Option.empty();
        }
        HoodieTableMetaClient metaClient = this.table.getMetaClient();
        metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, metaClient.getCommitActionType(), "00000000000001"));
        this.table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED, metaClient.getCommitActionType(), "00000000000001"), Option.empty());
        HoodieData<BootstrapWriteStatus> bootstrapWriteStatuses = this.runMetadataBootstrap(partitionFilesList);
        HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<HoodieData<WriteStatus>>();
        this.updateIndexAndCommitIfNeeded(bootstrapWriteStatuses.map(w -> w), result);
        return Option.of(result);
    }

    private void updateIndexAndCommitIfNeeded(HoodieData<WriteStatus> writeStatuses, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
        writeStatuses.persist(this.config.getString(HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE));
        Instant indexStartTime = Instant.now();
        HoodieData<WriteStatus> statuses = this.table.getIndex().updateLocation(writeStatuses, this.context, this.table);
        result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
        result.setWriteStatuses(statuses);
        this.commitOnAutoCommit(result);
    }

    @Override
    public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
        return null;
    }

    @Override
    protected void setCommitMetadata(HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
        result.setCommitMetadata(Option.of(new HoodieCommitMetadata()));
    }

    @Override
    protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
        Map<String, List<Pair>> bootstrapSourceAndStats = result.getWriteStatuses().collectAsList().stream().map(w -> {
            BootstrapWriteStatus ws = (BootstrapWriteStatus)w;
            return Pair.of(ws.getBootstrapSourceFileMapping(), ws.getStat());
        }).collect(Collectors.groupingBy(w -> ((BootstrapFileMapping)w.getKey()).getPartitionPath()));
        HoodieTableMetaClient metaClient = this.table.getMetaClient();
        try (BootstrapIndex.IndexWriter indexWriter = BootstrapIndex.getBootstrapIndex(metaClient).createWriter(metaClient.getTableConfig().getBootstrapBasePath().get());){
            LOG.info((Object)("Starting to write bootstrap index for source " + this.config.getBootstrapSourceBasePath() + " in table " + this.config.getBasePath()));
            indexWriter.begin();
            bootstrapSourceAndStats.forEach((key, value) -> indexWriter.appendNextPartition((String)key, value.stream().map(Pair::getKey).collect(Collectors.toList())));
            indexWriter.finish();
            LOG.info((Object)("Finished writing bootstrap index for source " + this.config.getBootstrapSourceBasePath() + " in table " + this.config.getBasePath()));
        }
        this.commit(extraMetadata, result, bootstrapSourceAndStats.values().stream().flatMap(f -> f.stream().map(Pair::getValue)).collect(Collectors.toList()));
        LOG.info((Object)"Committing metadata bootstrap !!");
    }

    protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result, List<HoodieWriteStat> stats) {
        String actionType = this.table.getMetaClient().getCommitActionType();
        LOG.info((Object)("Committing " + this.instantTime + ", action Type " + actionType));
        HoodieSparkTable table = HoodieSparkTable.create(this.config, this.context);
        HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
        HoodieCommitMetadata metadata = new HoodieCommitMetadata();
        result.setCommitted(true);
        stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), (HoodieWriteStat)stat));
        result.setWriteStats(stats);
        this.finalizeWrite(this.instantTime, stats, result);
        if (extraMetadata.isPresent()) {
            extraMetadata.get().forEach(metadata::addMetadata);
        }
        metadata.addMetadata("schema", this.getSchemaToStoreInCommit());
        metadata.setOperationType(this.operationType);
        this.writeTableMetadata(metadata, actionType);
        try {
            activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, this.instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
            LOG.info((Object)("Committed " + this.instantTime));
        }
        catch (IOException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + this.instantTime, e);
        }
        result.setCommitMetadata(Option.of(metadata));
    }

    protected Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
        if (null == partitionFilesList || partitionFilesList.isEmpty()) {
            return Option.empty();
        }
        TypedProperties properties = new TypedProperties();
        properties.putAll((Map<?, ?>)this.config.getProps());
        FullRecordBootstrapDataProvider inputProvider = (FullRecordBootstrapDataProvider)ReflectionUtils.loadClass(this.config.getFullBootstrapInputProvider(), properties, this.context);
        JavaRDD inputRecordsRDD = (JavaRDD)inputProvider.generateInputRecords("bootstrap_source", this.config.getBootstrapSourceBasePath(), partitionFilesList);
        HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, this.table.getMetaClient().getCommitActionType(), "00000000000002");
        this.table.getActiveTimeline().createNewInstant(requested);
        return Option.of(this.getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute());
    }

    protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
        return new SparkBulkInsertCommitActionExecutor((HoodieSparkEngineContext)this.context, new HoodieWriteConfig.Builder().withProps(this.config.getProps()).withSchema(this.bootstrapSchema).build(), this.table, "00000000000002", inputRecordsRDD, Option.empty(), this.extraMetadata);
    }

    private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndProcessSourcePartitions() throws IOException {
        List<Pair<String, List<HoodieFileStatus>>> folders = BootstrapUtils.getAllLeafFoldersWithFiles(this.table.getMetaClient(), this.bootstrapSourceFileSystem, this.config.getBootstrapSourceBasePath(), this.context);
        LOG.info((Object)"Fetching Bootstrap Schema !!");
        HoodieSparkBootstrapSchemaProvider sourceSchemaProvider = new HoodieSparkBootstrapSchemaProvider(this.config);
        this.bootstrapSchema = sourceSchemaProvider.getBootstrapSchema(this.context, folders).toString();
        LOG.info((Object)("Bootstrap Schema :" + this.bootstrapSchema));
        BootstrapModeSelector selector = (BootstrapModeSelector)ReflectionUtils.loadClass(this.config.getBootstrapModeSelectorClass(), this.config);
        Map<BootstrapMode, List<String>> result = selector.select(folders);
        Map<String, List> partitionToFiles = folders.stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        ValidationUtils.checkArgument(partitionToFiles.keySet().equals(result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));
        return result.entrySet().stream().map(e -> Pair.of(e.getKey(), ((List)e.getValue()).stream().map(p -> Pair.of(p, partitionToFiles.get(p))).collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
    }

    private HoodieData<BootstrapWriteStatus> runMetadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitions) {
        KeyGenerator keyGenerator;
        if (null == partitions || partitions.isEmpty()) {
            return this.context.emptyHoodieData();
        }
        TypedProperties properties = new TypedProperties();
        properties.putAll((Map<?, ?>)this.config.getProps());
        try {
            keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(properties);
        }
        catch (IOException e) {
            throw new HoodieKeyGeneratorException("Init keyGenerator failed ", e);
        }
        BootstrapPartitionPathTranslator translator = (BootstrapPartitionPathTranslator)ReflectionUtils.loadClass(this.config.getBootstrapPartitionPathTranslatorClass(), properties);
        List bootstrapPaths = partitions.stream().flatMap(p -> {
            String translatedPartitionPath = translator.getBootstrapTranslatedPath((String)p.getKey());
            return ((List)p.getValue()).stream().map(f -> Pair.of(p.getKey(), Pair.of(translatedPartitionPath, f)));
        }).collect(Collectors.toList());
        this.context.setJobStatus(this.getClass().getSimpleName(), "Bootstrap metadata table.");
        return this.context.parallelize(bootstrapPaths, this.config.getBootstrapParallelism()).map(partitionFsPair -> MetadataBootstrapHandlerFactory.getMetadataHandler(this.config, this.table, (HoodieFileStatus)((Object)((Object)((Pair)partitionFsPair.getRight()).getRight()))).runMetadataBootstrap((String)partitionFsPair.getLeft(), (String)((Pair)partitionFsPair.getRight()).getLeft(), keyGenerator));
    }

    @Override
    protected Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
        throw new UnsupportedOperationException("Should not called in bootstrap code path");
    }

    @Override
    protected Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
        throw new UnsupportedOperationException("Should not called in bootstrap code path");
    }

    @Override
    protected void runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
        SparkValidatorUtils.runValidators(this.config, writeMetadata, this.context, this.table, this.instantTime);
    }
}

