/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;

public class HoodieRealtimeInputFormatUtils
extends HoodieInputFormatUtils {
    private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);

    public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) throws IOException {
        Map<Path, List<FileSplit>> partitionsToParquetSplits = fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
        Map<Path, HoodieTableMetaClient> partitionsToMetaClient = HoodieRealtimeInputFormatUtils.getTableMetaClientByPartitionPath(conf, partitionsToParquetSplits.keySet());
        HashMap<HoodieTableMetaClient, HoodieTableFileSystemView> fsCache = new HashMap<HoodieTableMetaClient, HoodieTableFileSystemView>();
        ArrayList rtSplits = new ArrayList();
        try {
            HoodieTableMetaClient metaClient;
            HoodieTableConfig tableConfig;
            Option<Object> hoodieVirtualKeyInfo = Option.empty();
            if (partitionsToParquetSplits.size() > 0 && !(tableConfig = (metaClient = partitionsToMetaClient.get(partitionsToParquetSplits.keySet().iterator().next())).getTableConfig()).populateMetaFields()) {
                TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
                try {
                    MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema();
                    hoodieVirtualKeyInfo = Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()), parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp())));
                }
                catch (Exception exception) {
                    throw new HoodieException("Fetching table schema failed with exception ", exception);
                }
            }
            Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
            partitionsToParquetSplits.keySet().forEach(partitionPath -> {
                HoodieTableMetaClient metaClient = (HoodieTableMetaClient)partitionsToMetaClient.get(partitionPath);
                if (!fsCache.containsKey(metaClient)) {
                    HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf);
                    HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf));
                    fsCache.put(metaClient, fsView);
                }
                HoodieTableFileSystemView fsView = (HoodieTableFileSystemView)fsCache.get(metaClient);
                String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
                Option<HoodieInstant> latestCompletedInstant = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
                Stream<FileSlice> latestFileSlices = latestCompletedInstant.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp())).orElse(Stream.empty());
                Map<String, List<FileSplit>> groupedInputSplits = ((List)partitionsToParquetSplits.get(partitionPath)).stream().collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
                String maxCommitTime = metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet("commit", "rollback", "deltacommit", "replacecommit")).filterCompletedInstants().lastInstant().get().getTimestamp();
                latestFileSlices.forEach(fileSlice -> {
                    List dataFileSplits = (List)groupedInputSplits.get(fileSlice.getFileId());
                    dataFileSplits.forEach(split -> {
                        try {
                            List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
                            if (split instanceof BootstrapBaseFileSplit) {
                                BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit)((Object)((Object)((Object)split)));
                                String[] hosts = split.getLocationInfo() != null ? (String[])Arrays.stream(split.getLocationInfo()).filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[]{};
                                String[] inMemoryHosts = split.getLocationInfo() != null ? (String[])Arrays.stream(split.getLocationInfo()).filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[]{};
                                FileSplit baseSplit = new FileSplit(eSplit.getPath(), eSplit.getStart(), eSplit.getLength(), hosts, inMemoryHosts);
                                rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit, metaClient.getBasePath(), logFilePaths, maxCommitTime, eSplit.getBootstrapFileSplit()));
                            } else {
                                rtSplits.add(new HoodieRealtimeFileSplit((FileSplit)split, metaClient.getBasePath(), logFilePaths, maxCommitTime, finalHoodieVirtualKeyInfo));
                            }
                        }
                        catch (IOException e) {
                            throw new HoodieIOException("Error creating hoodie real time split ", e);
                        }
                    });
                });
            });
        }
        catch (Exception e) {
            throw new HoodieException("Error obtaining data file/log file grouping ", e);
        }
        finally {
            fsCache.forEach((k, view) -> view.close());
        }
        LOG.info((Object)("Returning a total splits of " + rtSplits.size()));
        return rtSplits.toArray(new InputSplit[0]);
    }

    public static List<Pair<Option<HoodieBaseFile>, List<String>>> groupLogsByBaseFile(Configuration conf, List<Path> partitionPaths) {
        HashSet<Path> partitionSet = new HashSet<Path>(partitionPaths);
        Map<Path, HoodieTableMetaClient> partitionsToMetaClient = HoodieRealtimeInputFormatUtils.getTableMetaClientByPartitionPath(conf, partitionSet);
        ArrayList<Pair<Option<HoodieBaseFile>, List<String>>> baseAndLogsList = new ArrayList<Pair<Option<HoodieBaseFile>, List<String>>>();
        partitionSet.forEach(partitionPath -> {
            HoodieTableMetaClient metaClient = (HoodieTableMetaClient)partitionsToMetaClient.get(partitionPath);
            HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
            String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
            try {
                Option<HoodieInstant> latestCompletedInstant = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
                Stream<FileSlice> latestFileSlices = latestCompletedInstant.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp())).orElse(Stream.empty());
                latestFileSlices.forEach(fileSlice -> {
                    List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
                    baseAndLogsList.add(Pair.of(fileSlice.getBaseFile(), logFilePaths));
                });
            }
            catch (Exception e) {
                throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
            }
        });
        return baseAndLogsList;
    }

    private static Configuration addProjectionField(Configuration conf, String fieldName, int fieldIndex) {
        String readColNames = conf.get("hive.io.file.readcolumn.names", "");
        String readColIds = conf.get("hive.io.file.readcolumn.ids", "");
        String readColNamesPrefix = readColNames + ",";
        if (readColNames == null || readColNames.isEmpty()) {
            readColNamesPrefix = "";
        }
        String readColIdsPrefix = readColIds + ",";
        if (readColIds == null || readColIds.isEmpty()) {
            readColIdsPrefix = "";
        }
        if (!readColNames.contains(fieldName)) {
            conf.set("hive.io.file.readcolumn.names", readColNamesPrefix + fieldName);
            conf.set("hive.io.file.readcolumn.ids", readColIdsPrefix + fieldIndex);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)String.format("Adding extra column " + fieldName + ", to enable log merging cols (%s) ids (%s) ", conf.get("hive.io.file.readcolumn.names"), conf.get("hive.io.file.readcolumn.ids")));
            }
        }
        return conf;
    }

    public static void addRequiredProjectionFields(Configuration configuration, Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {
        if (!hoodieVirtualKeyInfo.isPresent()) {
            HoodieRealtimeInputFormatUtils.addProjectionField(configuration, "_hoodie_record_key", 2);
            HoodieRealtimeInputFormatUtils.addProjectionField(configuration, "_hoodie_commit_time", 0);
            HoodieRealtimeInputFormatUtils.addProjectionField(configuration, "_hoodie_partition_path", 3);
        } else {
            HoodieVirtualKeyInfo hoodieVirtualKey = hoodieVirtualKeyInfo.get();
            HoodieRealtimeInputFormatUtils.addProjectionField(configuration, hoodieVirtualKey.getRecordKeyField(), hoodieVirtualKey.getRecordKeyFieldIndex());
            HoodieRealtimeInputFormatUtils.addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField(), hoodieVirtualKey.getPartitionPathFieldIndex());
        }
    }

    public static boolean requiredProjectionFieldsExistInConf(Configuration configuration, Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {
        String readColNames = configuration.get("hive.io.file.readcolumn.names", "");
        if (!hoodieVirtualKeyInfo.isPresent()) {
            return readColNames.contains("_hoodie_record_key") && readColNames.contains("_hoodie_commit_time") && readColNames.contains("_hoodie_partition_path");
        }
        return readColNames.contains(hoodieVirtualKeyInfo.get().getRecordKeyField()) && readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField());
    }

    public static boolean canAddProjectionToJobConf(RealtimeSplit realtimeSplit, JobConf jobConf) {
        return jobConf.get("hoodie.read.columns.set") == null || !realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf((Configuration)jobConf, realtimeSplit.getHoodieVirtualKeyInfo());
    }

    public static void cleanProjectionColumnIds(Configuration conf) {
        String columnIds = conf.get("hive.io.file.readcolumn.ids");
        if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
            conf.set("hive.io.file.readcolumn.ids", columnIds.substring(1));
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("The projection Ids: {" + columnIds + "} start with ','. First comma is removed"));
            }
        }
    }
}

