/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMetadataWriter {
    private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadataWriter.class);
    protected HoodieWriteConfig metadataWriteConfig;
    protected HoodieWriteConfig datasetWriteConfig;
    protected String tableName;
    protected HoodieBackedTableMetadata metadata;
    protected HoodieTableMetaClient metaClient;
    protected Option<HoodieMetadataMetrics> metrics;
    protected boolean enabled;
    protected SerializableConfiguration hadoopConf;
    protected final transient HoodieEngineContext engineContext;

    protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
        this.datasetWriteConfig = writeConfig;
        this.engineContext = engineContext;
        this.hadoopConf = new SerializableConfiguration(hadoopConf);
        if (writeConfig.useFileListingMetadata()) {
            this.tableName = writeConfig.getTableName() + "_metadata";
            this.metadataWriteConfig = this.createMetadataWriteConfig(writeConfig);
            this.enabled = true;
            ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), "Cleaning is controlled internally for Metadata table.");
            ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(), "Compaction is controlled internally for metadata table.");
            ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), "Auto commit is required for Metadata Table");
            ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(), "File listing cannot be used for Metadata Table");
            this.initRegistry();
            HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(this.datasetWriteConfig.getBasePath()).build();
            this.initialize(engineContext, datasetMetaClient);
            if (this.enabled) {
                HoodieTimer timer = new HoodieTimer().startTimer();
                this.syncFromInstants(datasetMetaClient);
                this.metrics.ifPresent(m -> m.updateMetrics("sync", timer.endTimer()));
            }
        } else {
            this.enabled = false;
            this.metrics = Option.empty();
        }
    }

    protected abstract void initRegistry();

    private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfig) {
        int parallelism = writeConfig.getMetadataInsertParallelism();
        HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled()).withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs()).withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs()).withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks()).build()).withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).withAutoCommit(true).withAvroSchemaValidate(true).withEmbeddedTimelineServerEnabled(false).withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath())).withSchema(HoodieMetadataRecord.getClassSchema().toString()).forTable(this.tableName).withCompactionConfig(HoodieCompactionConfig.newBuilder().withAsyncClean(writeConfig.isMetadataAsyncClean()).withAutoClean(false).withCleanerParallelism(parallelism).withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).retainCommits(writeConfig.getMetadataCleanerCommitsRetained()).archiveCommitsWith(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMetadataMaxCommitsToKeep()).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build()).withParallelism(parallelism, parallelism).withDeleteParallelism(parallelism).withRollbackParallelism(parallelism).withFinalizeWriteParallelism(parallelism);
        if (writeConfig.isMetricsOn()) {
            HoodieMetricsConfig.Builder metricsConfig = HoodieMetricsConfig.newBuilder().withReporterType(writeConfig.getMetricsReporterType().toString()).withExecutorMetrics(writeConfig.isExecutorMetricsEnabled()).on(true);
            switch (writeConfig.getMetricsReporterType()) {
                case GRAPHITE: {
                    metricsConfig.onGraphitePort(writeConfig.getGraphiteServerPort()).toGraphiteHost(writeConfig.getGraphiteServerHost()).usePrefix(writeConfig.getGraphiteMetricPrefix());
                    break;
                }
                case JMX: {
                    metricsConfig.onJmxPort(writeConfig.getJmxPort()).toJmxHost(writeConfig.getJmxHost());
                    break;
                }
                case DATADOG: {
                    break;
                }
                case CONSOLE: 
                case INMEMORY: {
                    break;
                }
                default: {
                    throw new HoodieMetadataException("Unsupported Metrics Reporter type " + (Object)((Object)writeConfig.getMetricsReporterType()));
                }
            }
            builder.withMetricsConfig(metricsConfig.build());
        }
        return builder.build();
    }

    public HoodieWriteConfig getWriteConfig() {
        return this.metadataWriteConfig;
    }

    public HoodieBackedTableMetadata metadata() {
        return this.metadata;
    }

    protected abstract void initialize(HoodieEngineContext var1, HoodieTableMetaClient var2);

    protected void initTableMetadata() {
        try {
            if (this.metadata != null) {
                this.metadata.close();
            }
            this.metadata = new HoodieBackedTableMetadata(this.engineContext, this.datasetWriteConfig.getMetadataConfig(), this.datasetWriteConfig.getBasePath(), this.datasetWriteConfig.getSpillableMapBasePath());
            this.metaClient = this.metadata.getMetaClient();
        }
        catch (Exception e) {
            throw new HoodieException("Error initializing metadata table for reads", e);
        }
    }

    protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException {
        HoodieTimer timer = new HoodieTimer().startTimer();
        boolean exists = datasetMetaClient.getFs().exists(new Path(this.metadataWriteConfig.getBasePath(), ".hoodie"));
        boolean rebootstrap = false;
        if (exists) {
            HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.hadoopConf.get()).setBasePath(this.metadataWriteConfig.getBasePath()).build();
            Option<HoodieInstant> latestMetadataInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
            if (!latestMetadataInstant.isPresent()) {
                LOG.warn((Object)"Metadata Table will need to be re-bootstrapped as no instants were found");
                rebootstrap = true;
            } else if (datasetMetaClient.getActiveTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp())) {
                LOG.warn((Object)("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived.latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp() + ", latestDatasetInstant=" + datasetMetaClient.getActiveTimeline().firstInstant().get().getTimestamp()));
                rebootstrap = true;
            }
        }
        if (rebootstrap) {
            LOG.info((Object)"Deleting Metadata Table directory so that it can be re-bootstrapped");
            datasetMetaClient.getFs().delete(new Path(this.metadataWriteConfig.getBasePath()), true);
            exists = false;
        }
        if (!exists) {
            this.bootstrapFromFilesystem(engineContext, datasetMetaClient);
            this.metrics.ifPresent(m -> m.updateMetrics("initialize", timer.endTimer()));
        }
    }

    private void bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException {
        ValidationUtils.checkState(this.enabled, "Metadata table cannot be initialized as it is not enabled");
        Option<HoodieInstant> latestInstant = Option.empty();
        boolean foundNonComplete = false;
        for (HoodieInstant instant : datasetMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList())) {
            if (!instant.isCompleted()) {
                foundNonComplete = true;
                continue;
            }
            if (foundNonComplete) continue;
            latestInstant = Option.of(instant);
        }
        String createInstantTime = latestInstant.map(HoodieInstant::getTimestamp).orElse("0000000000000");
        LOG.info((Object)("Creating a new metadata table in " + this.metadataWriteConfig.getBasePath() + " at instant " + createInstantTime));
        HoodieTableMetaClient.withPropertyBuilder().setTableType(HoodieTableType.MERGE_ON_READ).setTableName(this.tableName).setArchiveLogFolder("archived").setPayloadClassName(HoodieMetadataPayload.class.getName()).setBaseFileFormat(HoodieFileFormat.HFILE.toString()).initTable(this.hadoopConf.get(), this.metadataWriteConfig.getBasePath());
        this.initTableMetadata();
        LOG.info((Object)("Initializing metadata table by using file listings in " + this.datasetWriteConfig.getBasePath()));
        Map<String, List<FileStatus>> partitionToFileStatus = this.getPartitionsToFilesMapping(datasetMetaClient);
        int[] stats = new int[]{0};
        HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
        partitionToFileStatus.forEach((partition, statuses) -> {
            statuses.stream().filter(status -> {
                String filename = status.getPath().getName();
                return !HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN, createInstantTime);
            }).forEach(status -> {
                HoodieWriteStat writeStat = new HoodieWriteStat();
                writeStat.setPath(partition + "/" + status.getPath().getName());
                writeStat.setPartitionPath((String)partition);
                writeStat.setTotalWriteBytes(status.getLen());
                commitMetadata.addWriteStat((String)partition, writeStat);
                stats[0] = stats[0] + 1;
            });
            if (commitMetadata.getWriteStats((String)partition) == null) {
                HoodieWriteStat writeStat = new HoodieWriteStat();
                writeStat.setPartitionPath((String)partition);
                commitMetadata.addWriteStat((String)partition, writeStat);
            }
        });
        LOG.info((Object)("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata"));
        this.update(commitMetadata, createInstantTime);
    }

    private Map<String, List<FileStatus>> getPartitionsToFilesMapping(HoodieTableMetaClient datasetMetaClient) {
        LinkedList<Path> pathsToList = new LinkedList<Path>();
        pathsToList.add(new Path(this.datasetWriteConfig.getBasePath()));
        HashMap<String, List<FileStatus>> partitionToFileStatus = new HashMap<String, List<FileStatus>>();
        int fileListingParallelism = this.metadataWriteConfig.getFileListingParallelism();
        SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf());
        String dirFilterRegex = this.datasetWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
        while (!pathsToList.isEmpty()) {
            int listingParallelism = Math.min(fileListingParallelism, pathsToList.size());
            List<Pair> dirToFileListing = this.engineContext.map(pathsToList, path -> {
                FileSystem fs = path.getFileSystem(conf.get());
                return Pair.of(path, fs.listStatus(path));
            }, listingParallelism);
            pathsToList.clear();
            dirToFileListing.forEach(p -> {
                if (!dirFilterRegex.isEmpty() && ((Path)p.getLeft()).getName().matches(dirFilterRegex)) {
                    LOG.info((Object)("Ignoring directory " + p.getLeft() + " which matches the filter regex " + dirFilterRegex));
                    return;
                }
                List filesInDir = ((Stream)Arrays.stream((Object[])p.getRight()).parallel()).filter(fs -> !fs.getPath().getName().equals(".hoodie_partition_metadata")).collect(Collectors.toList());
                if (((FileStatus[])p.getRight()).length > filesInDir.size()) {
                    String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetMetaClient.getBasePath()), (Path)p.getLeft());
                    partitionToFileStatus.put(partitionName, filesInDir);
                } else {
                    pathsToList.addAll(Arrays.stream((Object[])p.getRight()).filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(".hoodie")).map(fs -> fs.getPath()).collect(Collectors.toList()));
                }
            });
        }
        return partitionToFileStatus;
    }

    private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
        ValidationUtils.checkState(this.enabled, "Metadata table cannot be synced as it is not enabled");
        this.initTableMetadata();
        try {
            List<HoodieInstant> instantsToSync = this.metadata.findInstantsToSync();
            if (instantsToSync.isEmpty()) {
                return;
            }
            LOG.info((Object)("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync));
            for (HoodieInstant instant : instantsToSync) {
                LOG.info((Object)("Syncing instant " + instant + " to metadata table"));
                Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, this.metadata.getSyncedInstantTime());
                if (!records.isPresent()) continue;
                this.commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp());
            }
            this.initTableMetadata();
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Unable to sync instants from data to metadata table.", ioe);
        }
    }

    @Override
    public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
        if (this.enabled) {
            List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime);
            this.commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
        }
    }

    @Override
    public void update(HoodieCleanerPlan cleanerPlan, String instantTime) {
        if (this.enabled) {
            List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(cleanerPlan, instantTime);
            this.commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
        }
    }

    @Override
    public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
        if (this.enabled) {
            List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime);
            this.commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
        }
    }

    @Override
    public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
        if (this.enabled) {
            List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(restoreMetadata, instantTime, this.metadata.getSyncedInstantTime());
            this.commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
        }
    }

    @Override
    public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) {
        if (this.enabled) {
            List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(rollbackMetadata, instantTime, this.metadata.getSyncedInstantTime());
            this.commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
        }
    }

    @Override
    public void close() throws Exception {
        if (this.metadata != null) {
            this.metadata.close();
        }
    }

    protected abstract void commit(List<HoodieRecord> var1, String var2, String var3);
}

