/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.hive.utils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.hive.HiveConnectorOptions;
import org.apache.paimon.hive.mapred.PaimonInputSplit;
import org.apache.paimon.hive.utils.HiveUtils;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FallbackReadFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BinPacking;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveSplitGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(HiveSplitGenerator.class);

    public static InputSplit[] generateSplits(FileStoreTable table, JobConf jobConf, int numSplits) {
        ArrayList<Predicate> predicates = new ArrayList<Predicate>();
        HiveUtils.createPredicate(table.schema(), jobConf, false).ifPresent(predicates::add);
        String locations = jobConf.get("mapreduce.input.fileinputformat.inputdir");
        String tagToPartField = table.coreOptions().tagToPartitionField();
        TagPreview tagPreview = TagPreview.create(table.coreOptions());
        ArrayList splits = new ArrayList();
        for (String location : locations.split(",")) {
            List<DataSplit> dataSplits;
            DataTableScan scan;
            if (!location.startsWith(table.location().toUri().toString())) continue;
            if (tagToPartField != null) {
                String tag = HiveUtils.extractTagName(location, tagToPartField);
                Map<String, String> dynamicOptions = tagPreview == null ? Collections.singletonMap(CoreOptions.SCAN_TAG_NAME.key(), tag) : tagPreview.timeTravel(table, tag);
                scan = table.copy((Map)dynamicOptions).newScan();
                if (predicates.size() > 0) {
                    scan.withFilter(PredicateBuilder.and(predicates));
                }
            } else {
                ArrayList<Predicate> predicatePerPartition = new ArrayList<Predicate>(predicates);
                HiveSplitGenerator.createPartitionPredicate(table.schema().logicalRowType(), table.schema().partitionKeys(), location, table.coreOptions().partitionDefaultName()).ifPresent(predicatePerPartition::add);
                scan = table.newScan();
                if (predicatePerPartition.size() > 0) {
                    scan.withFilter(PredicateBuilder.and(predicatePerPartition));
                }
            }
            List<DataSplit> packed = dataSplits = scan.dropStats().plan().splits().stream().map(s -> (DataSplit)s).collect(Collectors.toList());
            if (jobConf.getBoolean(HiveConnectorOptions.HIVE_PAIMON_RESPECT_MINMAXSPLITSIZE_ENABLED.key(), false)) {
                packed = HiveSplitGenerator.packSplits(table, jobConf, dataSplits, numSplits);
            }
            packed.forEach(ss -> splits.add(new PaimonInputSplit(location, (DataSplit)ss, table)));
        }
        return splits.toArray(new InputSplit[0]);
    }

    private static Optional<Predicate> createPartitionPredicate(RowType rowType, List<String> partitionKeys, String partitionDir, String defaultPartName) {
        HashSet<String> partitionKeySet = new HashSet<String>(partitionKeys);
        LinkedHashMap<String, String> partition = new LinkedHashMap<String, String>();
        for (String s : partitionDir.split("/")) {
            String[] kv;
            if ((s = s.trim()).isEmpty() || (kv = s.split("=")).length != 2 || !partitionKeySet.contains(kv[0])) continue;
            partition.put(kv[0], kv[1]);
        }
        if (partition.isEmpty() || partition.size() != partitionKeys.size()) {
            return Optional.empty();
        }
        return Optional.ofNullable(PartitionPredicate.createPartitionPredicate(partition, rowType, defaultPartName));
    }

    private static List<DataSplit> packSplits(FileStoreTable table, JobConf jobConf, List<DataSplit> splits, int numSplits) {
        if (table.coreOptions().deletionVectorsEnabled()) {
            return splits;
        }
        long openCostInBytes = jobConf.getLong(HiveConnectorOptions.HIVE_PAIMON_SPLIT_OPENFILECOST.key(), table.coreOptions().splitOpenFileCost());
        long splitSize = HiveSplitGenerator.computeSplitSize(jobConf, splits, numSplits, openCostInBytes);
        ArrayList<DataSplit> dataSplits = new ArrayList<DataSplit>();
        ArrayList<DataSplit> toPack = new ArrayList<DataSplit>();
        int numFiles = 0;
        for (DataSplit split : splits) {
            if (split instanceof FallbackReadFileStoreTable.FallbackDataSplit) {
                dataSplits.add(split);
                continue;
            }
            if (split.beforeFiles().isEmpty() && split.rawConvertible()) {
                numFiles += split.dataFiles().size();
                toPack.add(split);
                continue;
            }
            dataSplits.add(split);
        }
        Function<DataFileMeta, Long> weightFunc = file -> Math.max(file.fileSize(), openCostInBytes);
        DataSplit current = null;
        ArrayList<DataFileMeta> bin = new ArrayList<DataFileMeta>();
        int numFilesAfterPacked = 0;
        for (DataSplit split : toPack) {
            if (current == null || current.partition().equals(split.partition()) && current.bucket() == split.bucket()) {
                current = split;
                bin.addAll(split.dataFiles());
                continue;
            }
            List<List<DataFileMeta>> splitGroups = BinPacking.packForOrdered(bin, weightFunc, splitSize);
            for (List<DataFileMeta> fileGroups : splitGroups) {
                DataSplit newSplit = HiveSplitGenerator.buildDataSplit(current, fileGroups);
                numFilesAfterPacked += newSplit.dataFiles().size();
                dataSplits.add(newSplit);
            }
            current = split;
            bin.clear();
        }
        if (!bin.isEmpty()) {
            List<List<DataFileMeta>> splitGroups = BinPacking.packForOrdered(bin, weightFunc, splitSize);
            for (List<DataFileMeta> fileGroups : splitGroups) {
                DataSplit newSplit = HiveSplitGenerator.buildDataSplit(current, fileGroups);
                numFilesAfterPacked += newSplit.dataFiles().size();
                dataSplits.add(newSplit);
            }
        }
        LOG.info("The origin number of data files before pack: {}", (Object)numFiles);
        LOG.info("The current number of data files after pack: {}", (Object)numFilesAfterPacked);
        return dataSplits;
    }

    private static DataSplit buildDataSplit(DataSplit current, List<DataFileMeta> fileGroups) {
        return DataSplit.builder().withSnapshot(current.snapshotId()).withPartition(current.partition()).withBucket(current.bucket()).withTotalBuckets(current.totalBuckets()).withDataFiles(fileGroups).rawConvertible(current.rawConvertible()).withBucketPath(current.bucketPath()).build();
    }

    private static Long computeSplitSize(JobConf jobConf, List<DataSplit> splits, int numSplits, long openCostInBytes) {
        long maxSize = HiveConf.getLongVar((Configuration)jobConf, (HiveConf.ConfVars)HiveConf.ConfVars.MAPREDMAXSPLITSIZE);
        long minSize = HiveConf.getLongVar((Configuration)jobConf, (HiveConf.ConfVars)HiveConf.ConfVars.MAPREDMINSPLITSIZE);
        long totalSize = 0L;
        for (DataSplit split : splits) {
            totalSize += split.dataFiles().stream().map(f -> Math.max(f.fileSize(), openCostInBytes)).reduce(Long::sum).orElse(0L).longValue();
        }
        long avgSize = totalSize / (long)numSplits;
        long splitSize = Math.min(maxSize, Math.max(avgSize, minSize));
        LOG.info("Currently, minSplitSize: {}, maxSplitSize: {}, avgSize: {}, finalSplitSize: {}.", new Object[]{minSize, maxSize, avgSize, splitSize});
        return splitSize;
    }
}

