/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.HiveHoodieTableFileIndex;
import org.apache.hudi.hadoop.HoodieCopyOnWriteTableInputFormat;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimePath;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;

public class HoodieMergeOnReadTableInputFormat
extends HoodieCopyOnWriteTableInputFormat
implements Configurable {
    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
        List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit)is).collect(Collectors.toList());
        return (InputSplit[])(HoodieMergeOnReadTableInputFormat.containsIncrementalQuerySplits(fileSplits) ? HoodieMergeOnReadTableInputFormat.filterIncrementalQueryFileSplits(fileSplits) : fileSplits).toArray(new FileSplit[0]);
    }

    @Override
    protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
        Option baseFileOpt = fileSlice.getBaseFile();
        Option latestLogFileOpt = fileSlice.getLatestLogFile();
        Stream logFiles = fileSlice.getLogFiles();
        Option latestCompletedInstantOpt = fileIndex.getLatestCompletedInstant();
        String tableBasePath = fileIndex.getBasePath();
        if (baseFileOpt.isPresent()) {
            return HoodieMergeOnReadTableInputFormat.createRealtimeFileStatusUnchecked((HoodieBaseFile)baseFileOpt.get(), (Stream<HoodieLogFile>)logFiles, tableBasePath, (Option<HoodieInstant>)latestCompletedInstantOpt, virtualKeyInfoOpt);
        }
        if (latestLogFileOpt.isPresent()) {
            return HoodieMergeOnReadTableInputFormat.createRealtimeFileStatusUnchecked((HoodieLogFile)latestLogFileOpt.get(), (Stream<HoodieLogFile>)logFiles, tableBasePath, (Option<HoodieInstant>)latestCompletedInstantOpt, virtualKeyInfoOpt);
        }
        throw new IllegalStateException("Invalid state: either base-file or log-file has to be present");
    }

    @Override
    protected List<FileStatus> listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths, String incrementalTableName) throws IOException {
        ArrayList<FileStatus> result = new ArrayList<FileStatus>();
        Job jobContext = Job.getInstance((Configuration)job);
        Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline((JobContext)jobContext, tableMetaClient);
        if (!timeline.isPresent()) {
            return result;
        }
        HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery((JobContext)jobContext, incrementalTableName, (HoodieTimeline)timeline.get());
        Option commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
        if (!commitsToCheck.isPresent()) {
            return result;
        }
        ((List)commitsToCheck.get()).sort(HoodieInstant::compareTo);
        List<HoodieCommitMetadata> metadataList = ((List)commitsToCheck.get()).stream().map(instant -> {
            try {
                return HoodieInputFormatUtils.getCommitMetadata(instant, commitsTimelineToReturn);
            }
            catch (IOException e) {
                throw new HoodieException(String.format("cannot get metadata for instant: %s", instant));
            }
        }).collect(Collectors.toList());
        List<FileStatus> affectedFileStatus = Arrays.asList(HoodieInputFormatUtils.listAffectedFilesForCommits((Configuration)job, new Path(tableMetaClient.getBasePath()), metadataList));
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
        Path basePath = new Path(tableMetaClient.getBasePath());
        List affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream().filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
        if (affectedPartition.isEmpty()) {
            return result;
        }
        List<HoodieFileGroup> fileGroups = affectedPartition.stream().flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
        HoodieMergeOnReadTableInputFormat.setInputPaths((JobConf)job, (String)affectedPartition.stream().map(p -> p.isEmpty() ? basePath.toString() : new Path(basePath, p).toString()).collect(Collectors.joining(",")));
        FileStatus[] fileStatuses = this.doListStatus(job);
        HashMap<String, FileStatus> candidateFileStatus = new HashMap<String, FileStatus>();
        for (int i = 0; i < fileStatuses.length; ++i) {
            String key = fileStatuses[i].getPath().toString();
            candidateFileStatus.put(key, fileStatuses[i]);
        }
        Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = HoodieMergeOnReadTableInputFormat.getHoodieVirtualKeyInfo(tableMetaClient);
        String maxCommitTime = ((HoodieInstant)fsView.getLastInstant().get()).getTimestamp();
        result.addAll(HoodieMergeOnReadTableInputFormat.collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus, virtualKeyInfoOpt));
        return result;
    }

    @Override
    protected boolean isSplitable(FileSystem fs, Path filename) {
        if (filename instanceof HoodieRealtimePath) {
            return ((HoodieRealtimePath)filename).isSplitable();
        }
        return super.isSplitable(fs, filename);
    }

    @Override
    protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
        if (file instanceof HoodieRealtimePath) {
            return this.doMakeSplitForRealtimePath((HoodieRealtimePath)file, start, length, hosts, null);
        }
        return super.makeSplit(file, start, length, hosts);
    }

    @Override
    protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
        if (file instanceof HoodieRealtimePath) {
            return this.doMakeSplitForRealtimePath((HoodieRealtimePath)file, start, length, hosts, inMemoryHosts);
        }
        return super.makeSplit(file, start, length, hosts, inMemoryHosts);
    }

    private static List<FileStatus> collectAllIncrementalFiles(List<HoodieFileGroup> fileGroups, String maxCommitTime, String basePath, Map<String, FileStatus> candidateFileStatus, Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
        ArrayList<FileStatus> result = new ArrayList<FileStatus>();
        fileGroups.stream().forEach(f -> {
            try {
                List logFileStatus;
                List baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList());
                if (!baseFiles.isEmpty()) {
                    FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus((HoodieBaseFile)((FileSlice)baseFiles.get(0)).getBaseFile().get());
                    String baseFilePath = baseFileStatus.getPath().toUri().toString();
                    if (!candidateFileStatus.containsKey(baseFilePath)) {
                        throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath);
                    }
                    List<HoodieLogFile> deltaLogFiles = ((FileSlice)f.getLatestFileSlice().get()).getLogFiles().collect(Collectors.toList());
                    RealtimeFileStatus fileStatus = new RealtimeFileStatus((FileStatus)candidateFileStatus.get(baseFilePath), basePath, deltaLogFiles, true, virtualKeyInfoOpt);
                    fileStatus.setMaxCommitTime(maxCommitTime);
                    if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
                        fileStatus.setBootStrapFileStatus(baseFileStatus);
                    }
                    result.add(fileStatus);
                }
                if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty() && (logFileStatus = ((FileSlice)f.getLatestFileSlice().get()).getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList())).size() > 0) {
                    List<HoodieLogFile> deltaLogFiles = logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), Long.valueOf(l.getLen()))).collect(Collectors.toList());
                    RealtimeFileStatus fileStatus = new RealtimeFileStatus((FileStatus)logFileStatus.get(0), basePath, deltaLogFiles, true, virtualKeyInfoOpt);
                    fileStatus.setMaxCommitTime(maxCommitTime);
                    result.add(fileStatus);
                }
            }
            catch (IOException e) {
                throw new HoodieException("Error obtaining data file/log file grouping ", (Throwable)e);
            }
        });
        return result;
    }

    private FileSplit doMakeSplitForRealtimePath(HoodieRealtimePath path, long start, long length, String[] hosts, String[] inMemoryHosts) {
        if (path.includeBootstrapFilePath()) {
            FileSplit bf = inMemoryHosts == null ? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts) : super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts);
            return HoodieMergeOnReadTableInputFormat.createRealtimeBoostrapBaseFileSplit((BootstrapBaseFileSplit)bf, path.getBasePath(), path.getDeltaLogFiles(), path.getMaxCommitTime(), path.getBelongsToIncrementalQuery(), path.getVirtualKeyInfo());
        }
        return HoodieMergeOnReadTableInputFormat.createRealtimeFileSplit(path, start, length, hosts);
    }

    private static boolean containsIncrementalQuerySplits(List<FileSplit> fileSplits) {
        return fileSplits.stream().anyMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery);
    }

    private static List<FileSplit> filterIncrementalQueryFileSplits(List<FileSplit> fileSplits) {
        return fileSplits.stream().filter(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery).collect(Collectors.toList());
    }

    private static HoodieRealtimeFileSplit createRealtimeFileSplit(HoodieRealtimePath path, long start, long length, String[] hosts) {
        try {
            return new HoodieRealtimeFileSplit(new FileSplit((Path)path, start, length, hosts), path);
        }
        catch (IOException e) {
            throw new HoodieIOException(String.format("Failed to create instance of %s", HoodieRealtimeFileSplit.class.getName()), e);
        }
    }

    private static HoodieRealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit(BootstrapBaseFileSplit split, String basePath, List<HoodieLogFile> logFiles, String maxInstantTime, boolean belongsToIncrementalQuery, Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
        try {
            String[] hosts = split.getLocationInfo() != null ? (String[])Arrays.stream(split.getLocationInfo()).filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[]{};
            String[] inMemoryHosts = split.getLocationInfo() != null ? (String[])Arrays.stream(split.getLocationInfo()).filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[]{};
            FileSplit baseSplit = new FileSplit(split.getPath(), split.getStart(), split.getLength(), hosts, inMemoryHosts);
            return new HoodieRealtimeBootstrapBaseFileSplit(baseSplit, basePath, logFiles, maxInstantTime, split.getBootstrapFileSplit(), belongsToIncrementalQuery, virtualKeyInfoOpt);
        }
        catch (IOException e) {
            throw new HoodieIOException("Error creating hoodie real time split ", e);
        }
    }

    private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile, Stream<HoodieLogFile> logFiles, String basePath, Option<HoodieInstant> latestCompletedInstantOpt, Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
        FileStatus baseFileStatus = HoodieMergeOnReadTableInputFormat.getFileStatusUnchecked(baseFile);
        List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
        try {
            RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus, basePath, sortedLogFiles, false, virtualKeyInfoOpt);
            if (latestCompletedInstantOpt.isPresent()) {
                HoodieInstant latestCompletedInstant = (HoodieInstant)latestCompletedInstantOpt.get();
                ValidationUtils.checkState((boolean)latestCompletedInstant.isCompleted());
                rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
            }
            if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
                rtFileStatus.setBootStrapFileStatus(baseFileStatus);
            }
            return rtFileStatus;
        }
        catch (IOException e) {
            throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
        }
    }

    private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile, Stream<HoodieLogFile> logFiles, String basePath, Option<HoodieInstant> latestCompletedInstantOpt, Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
        List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
        try {
            RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus(), basePath, sortedLogFiles, false, virtualKeyInfoOpt);
            if (latestCompletedInstantOpt.isPresent()) {
                HoodieInstant latestCompletedInstant = (HoodieInstant)latestCompletedInstantOpt.get();
                ValidationUtils.checkState((boolean)latestCompletedInstant.isCompleted());
                rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
            }
            return rtFileStatus;
        }
        catch (IOException e) {
            throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
        }
    }
}

