/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.partitioner.profile;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.SmallFile;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WriteProfile {
    private static final Logger LOG = LoggerFactory.getLogger(WriteProfile.class);
    protected final HoodieWriteConfig config;
    private final Path basePath;
    protected final HoodieTableMetaClient metaClient;
    private long avgSize = -1L;
    private long recordsPerBucket;
    private final Map<String, List<SmallFile>> smallFilesMap;
    private long reloadedCheckpointId;
    protected SyncableFileSystemView fsView;
    private final Map<String, HoodieCommitMetadata> metadataCache;
    private final HoodieFlinkEngineContext context;

    public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
        this.config = config;
        this.context = context;
        this.basePath = new Path(config.getBasePath());
        this.smallFilesMap = new HashMap<String, List<SmallFile>>();
        this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
        this.metaClient = StreamerUtil.createMetaClient(config.getBasePath(), context.getHadoopConf().get());
        this.metadataCache = new HashMap<String, HoodieCommitMetadata>();
        this.fsView = this.getFileSystemView();
        this.recordProfile();
    }

    public long getAvgSize() {
        return this.avgSize;
    }

    public long getRecordsPerBucket() {
        return this.recordsPerBucket;
    }

    public HoodieTableMetaClient getMetaClient() {
        return this.metaClient;
    }

    protected HoodieTable<?, ?, ?, ?> getTable() {
        return HoodieFlinkTable.create(this.config, this.context);
    }

    private long averageBytesPerRecord() {
        long avgSize = this.config.getCopyOnWriteRecordSizeEstimate();
        long fileSizeThreshold = (long)(this.config.getRecordSizeEstimationThreshold() * (double)this.config.getParquetSmallFileLimit());
        HoodieTimeline commitTimeline = this.metaClient.getCommitsTimeline().filterCompletedInstants();
        if (!commitTimeline.empty()) {
            Iterator instants = commitTimeline.getReverseOrderedInstants().iterator();
            while (instants.hasNext()) {
                HoodieInstant instant = (HoodieInstant)instants.next();
                HoodieCommitMetadata commitMetadata = this.metadataCache.computeIfAbsent(instant.getTimestamp(), k -> WriteProfiles.getCommitMetadataSafely(this.config.getTableName(), this.basePath, instant, commitTimeline).orElse(null));
                if (commitMetadata == null) continue;
                long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
                long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
                if (totalBytesWritten <= fileSizeThreshold || totalRecordsWritten <= 0L) continue;
                avgSize = (long)Math.ceil(1.0 * (double)totalBytesWritten / (double)totalRecordsWritten);
                break;
            }
        }
        LOG.info("Refresh average bytes per record => " + avgSize);
        return avgSize;
    }

    public synchronized List<SmallFile> getSmallFiles(String partitionPath) {
        if (this.smallFilesMap.containsKey(partitionPath)) {
            return this.smallFilesMap.get(partitionPath);
        }
        ArrayList<SmallFile> smallFiles = new ArrayList();
        if (this.config.getParquetSmallFileLimit() <= 0) {
            this.smallFilesMap.put(partitionPath, smallFiles);
            return smallFiles;
        }
        smallFiles = this.smallFilesProfile(partitionPath);
        this.smallFilesMap.put(partitionPath, smallFiles);
        return smallFiles;
    }

    protected List<SmallFile> smallFilesProfile(String partitionPath) {
        ArrayList<SmallFile> smallFileLocations = new ArrayList<SmallFile>();
        HoodieTimeline commitTimeline = this.metaClient.getCommitsTimeline().filterCompletedInstants();
        if (!commitTimeline.empty()) {
            HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
            List allFiles = this.fsView.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
            for (HoodieBaseFile file : allFiles) {
                if (file.getFileSize() >= (long)this.config.getParquetSmallFileLimit() || file.getFileSize() <= 0L) continue;
                SmallFile sf = new SmallFile();
                sf.location = new HoodieRecordLocation(file.getCommitTime(), file.getFileId());
                sf.sizeBytes = file.getFileSize();
                smallFileLocations.add(sf);
            }
        }
        return smallFileLocations;
    }

    protected SyncableFileSystemView getFileSystemView() {
        return (SyncableFileSystemView)this.getTable().getBaseFileOnlyView();
    }

    private void cleanMetadataCache(Stream<HoodieInstant> instants) {
        Set timestampSet = instants.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
        this.metadataCache.keySet().retainAll(timestampSet);
    }

    private void recordProfile() {
        this.avgSize = this.averageBytesPerRecord();
        if (this.config.shouldAllowMultiWriteOnSameInstant()) {
            this.recordsPerBucket = this.config.getParquetMaxFileSize() / this.avgSize;
            LOG.info("Refresh insert records per bucket => " + this.recordsPerBucket);
        }
    }

    public synchronized void reload(long checkpointId) {
        if (this.reloadedCheckpointId >= checkpointId) {
            return;
        }
        this.metaClient.reloadActiveTimeline();
        SyncableFileSystemView oldFsView = this.fsView;
        this.fsView = this.getFileSystemView();
        oldFsView.close();
        this.recordProfile();
        this.cleanMetadataCache(this.metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream());
        this.smallFilesMap.clear();
        this.reloadedCheckpointId = checkpointId;
    }

    @VisibleForTesting
    public Map<String, HoodieCommitMetadata> getMetadataCache() {
        return this.metadataCache;
    }
}

