/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.HoodieHiveUtil;
import org.apache.hudi.hadoop.InputPathHandler;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

@UseFileSplitsFromInputFormat
public class HoodieParquetInputFormat
extends MapredParquetInputFormat
implements Configurable {
    private static final Logger LOG = LogManager.getLogger(HoodieParquetInputFormat.class);
    protected Configuration conf;

    public FileStatus[] listStatus(JobConf job) throws IOException {
        List<Path> snapshotPaths;
        List<String> incrementalTables = HoodieHiveUtil.getIncrementalTableNames((JobContext)Job.getInstance((Configuration)job));
        InputPathHandler inputPathHandler = new InputPathHandler(this.conf, HoodieParquetInputFormat.getInputPaths((JobConf)job), incrementalTables);
        ArrayList<FileStatus> returns = new ArrayList<FileStatus>();
        Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
        for (String table : incrementalTables) {
            List<Path> inputPaths;
            List<FileStatus> result;
            HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
            if (metaClient == null || (result = this.listStatusForIncrementalMode(job, metaClient, inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient))) == null) continue;
            returns.addAll(result);
        }
        List<Path> nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
        if (nonHoodiePaths.size() > 0) {
            HoodieParquetInputFormat.setInputPaths((JobConf)job, (Path[])nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
            FileStatus[] fileStatuses = super.listStatus(job);
            returns.addAll(Arrays.asList(fileStatuses));
        }
        if ((snapshotPaths = inputPathHandler.getSnapshotPaths()).size() > 0) {
            HoodieParquetInputFormat.setInputPaths((JobConf)job, (Path[])snapshotPaths.toArray(new Path[snapshotPaths.size()]));
            FileStatus[] fileStatuses = super.listStatus(job);
            Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus = this.groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
            LOG.info((Object)("Found a total of " + groupedFileStatus.size() + " groups"));
            for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
                List<FileStatus> result = this.filterFileStatusForSnapshotMode(entry.getKey(), entry.getValue());
                if (result == null) continue;
                returns.addAll(result);
            }
        }
        return returns.toArray(new FileStatus[returns.size()]);
    }

    private List<FileStatus> listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
        String tableName = tableMetaClient.getTableConfig().getTableName();
        HoodieTimeline timeline = tableMetaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime((JobContext)Job.getInstance((Configuration)job), tableName);
        Integer maxCommits = HoodieHiveUtil.readMaxCommits((JobContext)Job.getInstance((Configuration)job), tableName);
        LOG.info((Object)("Last Incremental timestamp was set as " + lastIncrementalTs));
        List commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits).getInstants().collect(Collectors.toList());
        HashSet<String> partitionsToList = new HashSet<String>();
        for (HoodieInstant commit : commitsToCheck) {
            HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class);
            partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
        }
        if (partitionsToList.isEmpty()) {
            return null;
        }
        String incrementalInputPaths = partitionsToList.stream().map(s -> tableMetaClient.getBasePath() + "/" + s).filter(s -> {
            for (Path path : inputPaths) {
                if (!path.toString().contains((CharSequence)s)) continue;
                return true;
            }
            return false;
        }).collect(Collectors.joining(","));
        if (StringUtils.isNullOrEmpty(incrementalInputPaths)) {
            return null;
        }
        HoodieParquetInputFormat.setInputPaths((JobConf)job, (String)incrementalInputPaths);
        FileStatus[] fileStatuses = super.listStatus(job);
        HoodieTableFileSystemView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
        List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        List filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
        ArrayList<FileStatus> returns = new ArrayList<FileStatus>();
        for (HoodieBaseFile filteredFile : filteredFiles) {
            LOG.debug((Object)("Processing incremental hoodie file - " + filteredFile.getPath()));
            filteredFile = this.checkFileStatus(filteredFile);
            returns.add(filteredFile.getFileStatus());
        }
        LOG.info((Object)("Total paths to process after hoodie incremental filter " + filteredFiles.size()));
        return returns;
    }

    private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(FileStatus[] fileStatuses, Collection<HoodieTableMetaClient> metaClientList) {
        HashMap<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<HoodieTableMetaClient, List<FileStatus>>();
        HoodieTableMetaClient metadata = null;
        for (FileStatus status : fileStatuses) {
            Path inputPath = status.getPath();
            if (!inputPath.getName().endsWith(".parquet")) continue;
            if (metadata == null || !inputPath.toString().contains(metadata.getBasePath())) {
                for (HoodieTableMetaClient metaClient : metaClientList) {
                    if (!inputPath.toString().contains(metaClient.getBasePath())) continue;
                    metadata = metaClient;
                    if (grouped.containsKey(metadata)) break;
                    grouped.put(metadata, new ArrayList());
                    break;
                }
            }
            ((List)grouped.get(metadata)).add(status);
        }
        return grouped;
    }

    private List<FileStatus> filterFileStatusForSnapshotMode(HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) {
        FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Hoodie Metadata initialized with completed commit Ts as :" + metadata));
        }
        HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
        List filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
        LOG.info((Object)("Total paths to process after hoodie filter " + filteredFiles.size()));
        ArrayList<FileStatus> returns = new ArrayList<FileStatus>();
        for (HoodieBaseFile filteredFile : filteredFiles) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Processing latest hoodie file - " + filteredFile.getPath()));
            }
            filteredFile = this.checkFileStatus(filteredFile);
            returns.add(filteredFile.getFileStatus());
        }
        return returns;
    }

    private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) {
        Path dataPath = dataFile.getFileStatus().getPath();
        try {
            if (dataFile.getFileSize() == 0L) {
                FileSystem fs = dataPath.getFileSystem(this.conf);
                LOG.info((Object)("Refreshing file status " + dataFile.getPath()));
                return new HoodieBaseFile(fs.getFileStatus(dataPath));
            }
            return dataFile;
        }
        catch (IOException e) {
            throw new HoodieIOException("Could not get FileStatus on path " + dataPath);
        }
    }

    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    public Configuration getConf() {
        return this.conf;
    }

    public RecordReader<NullWritable, ArrayWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
        return super.getRecordReader(split, job, reporter);
    }

    protected static HoodieTableMetaClient getTableMetaClient(FileSystem fs, Path dataPath) throws IOException {
        int levels = 3;
        if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
            HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
            metadata.readFromFS();
            levels = metadata.getPartitionDepth();
        }
        Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
        LOG.info((Object)("Reading hoodie metadata from path " + baseDir.toString()));
        return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
    }
}

