/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableQueryType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.HiveHoodieTableFileIndex;
import org.apache.hudi.hadoop.HoodieTableInputFormat;
import org.apache.hudi.hadoop.InputPathHandler;
import org.apache.hudi.hadoop.PathWithBootstrapFileStatus;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieCopyOnWriteTableInputFormat
extends HoodieTableInputFormat {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieCopyOnWriteTableInputFormat.class);

    @Override
    protected boolean isSplitable(FileSystem fs, Path filename) {
        return !(filename instanceof PathWithBootstrapFileStatus);
    }

    @Override
    protected FileSplit makeSplit(Path file, long start2, long length, String[] hosts) {
        FileSplit split = new FileSplit(file, start2, length, hosts);
        if (file instanceof PathWithBootstrapFileStatus) {
            return this.makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
        }
        return split;
    }

    @Override
    protected FileSplit makeSplit(Path file, long start2, long length, String[] hosts, String[] inMemoryHosts) {
        FileSplit split = new FileSplit(file, start2, length, hosts, inMemoryHosts);
        if (file instanceof PathWithBootstrapFileStatus) {
            return this.makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
        }
        return split;
    }

    @Override
    public FileStatus[] listStatus(JobConf job) throws IOException {
        List<Path> snapshotPaths;
        List<String> incrementalTables = HoodieHiveUtils.getIncrementalTableNames((JobContext)Job.getInstance((Configuration)job));
        InputPathHandler inputPathHandler = new InputPathHandler(this.conf, HoodieCopyOnWriteTableInputFormat.getInputPaths((JobConf)job), incrementalTables);
        ArrayList<FileStatus> returns = new ArrayList<FileStatus>();
        Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
        for (String table : incrementalTables) {
            List<Path> inputPaths;
            List<FileStatus> result2;
            HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
            if (metaClient == null || (result2 = this.listStatusForIncrementalMode(job, metaClient, inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient), table)) == null) continue;
            returns.addAll(result2);
        }
        List<Path> nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
        if (nonHoodiePaths.size() > 0) {
            HoodieCopyOnWriteTableInputFormat.setInputPaths((JobConf)job, (Path[])nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
            FileStatus[] fileStatuses = this.listStatusForNonHoodiePaths(job);
            returns.addAll(Arrays.asList(fileStatuses));
        }
        if ((snapshotPaths = inputPathHandler.getSnapshotPaths()).size() > 0) {
            returns.addAll(this.listStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths));
        }
        return returns.toArray(new FileStatus[0]);
    }

    public RecordReader<NullWritable, ArrayWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter2) throws IOException {
        throw new UnsupportedEncodingException("not implemented");
    }

    protected final FileStatus[] doListStatus(JobConf job) throws IOException {
        return super.listStatus(job);
    }

    public FileStatus[] listStatusForNonHoodiePaths(JobConf job) throws IOException {
        return this.doListStatus(job);
    }

    protected List<FileStatus> listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths, String incrementalTable) throws IOException {
        Job jobContext = Job.getInstance((Configuration)job);
        Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline((JobContext)jobContext, tableMetaClient);
        if (!timeline.isPresent()) {
            return null;
        }
        Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, incrementalTable, timeline.get());
        if (!commitsToCheck.isPresent()) {
            return null;
        }
        Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
        if (!incrementalInputPaths.isPresent()) {
            return null;
        }
        HoodieCopyOnWriteTableInputFormat.setInputPaths((JobConf)job, (String)incrementalInputPaths.get());
        FileStatus[] fileStatuses = this.doListStatus(job);
        return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
    }

    protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, Option<HoodieInstant> latestCompletedInstantOpt, String tableBasePath, HoodieTableMetaClient metaClient) {
        Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
        if (baseFileOpt.isPresent()) {
            return HoodieCopyOnWriteTableInputFormat.getFileStatusUnchecked(baseFileOpt.get());
        }
        throw new IllegalStateException("Invalid state: base-file has to be present");
    }

    private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) {
        try {
            LOG.info("Making external data split for " + (Object)((Object)file));
            FileStatus externalFileStatus = file.getBootstrapFileStatus();
            FileSplit externalFileSplit = this.makeSplit(externalFileStatus.getPath(), 0L, externalFileStatus.getLen(), new String[0], new String[0]);
            return new BootstrapBaseFileSplit(split, externalFileSplit);
        }
        catch (IOException e) {
            throw new HoodieIOException(e.getMessage(), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    private List<FileStatus> listStatusForSnapshotMode(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path> snapshotPaths) {
        HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(HadoopFSUtils.getStorageConf((Configuration)job));
        ArrayList<FileStatus> targetFiles = new ArrayList<FileStatus>();
        TypedProperties props = new TypedProperties(new Properties());
        Map<HoodieTableMetaClient, List<Path>> groupedPaths = HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
        for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
            HoodieTableMetaClient tableMetaClient = entry.getKey();
            List<Path> partitionPaths = entry.getValue();
            Option<String> queryCommitInstant = HoodieHiveUtils.getMaxCommit(job, tableMetaClient.getTableConfig().getTableName());
            boolean shouldIncludePendingCommits = HoodieHiveUtils.shouldIncludePendingCommits(job, tableMetaClient.getTableConfig().getTableName());
            if (HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient) || this.conf.getBoolean(HoodieMetadataConfig.ENABLE.key(), HoodieMetadataConfig.ENABLE.defaultValue().booleanValue())) {
                HiveHoodieTableFileIndex fileIndex = new HiveHoodieTableFileIndex(engineContext, tableMetaClient, props, HoodieTableQueryType.SNAPSHOT, partitionPaths.stream().map(HadoopFSUtils::convertToStoragePath).collect(Collectors.toList()), queryCommitInstant, shouldIncludePendingCommits);
                Map<String, List<FileSlice>> partitionedFileSlices = fileIndex.listFileSlices();
                targetFiles.addAll(partitionedFileSlices.values().stream().flatMap(Collection::stream).filter(fileSlice -> this.checkIfValidFileSlice((FileSlice)fileSlice)).map(fileSlice -> this.createFileStatusUnchecked((FileSlice)fileSlice, fileIndex.getLatestCompletedInstant(), fileIndex.getBasePath().toString(), tableMetaClient)).collect(Collectors.toList()));
                continue;
            }
            String basePath = tableMetaClient.getBasePath().toString();
            HashMap<HoodieTableMetaClient, HoodieTableFileSystemView> fsViewCache = new HashMap<HoodieTableMetaClient, HoodieTableFileSystemView>();
            HoodieTimeline timeline = HoodieCopyOnWriteTableInputFormat.getActiveTimeline(tableMetaClient, shouldIncludePendingCommits);
            Option<String> queryInstant = queryCommitInstant.or(() -> timeline.lastInstant().map(HoodieInstant::requestedTime));
            HoodieCopyOnWriteTableInputFormat.validateInstant(timeline, queryInstant);
            try {
                HoodieTableFileSystemView fsView2 = fsViewCache.computeIfAbsent(tableMetaClient, hoodieTableMetaClient -> FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, hoodieTableMetaClient, HoodieInputFormatUtils.buildMetadataConfig((Configuration)job), timeline));
                ArrayList filteredFileSlices = new ArrayList();
                for (Path p : entry.getValue()) {
                    String relativePartitionPath = HadoopFSUtils.getRelativePartitionPath(new Path(basePath), p);
                    List fileSlices = queryInstant.map(instant -> fsView2.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, (String)instant)).orElseGet(() -> fsView2.getLatestFileSlices(relativePartitionPath)).collect(Collectors.toList());
                    filteredFileSlices.addAll(fileSlices);
                }
                targetFiles.addAll(filteredFileSlices.stream().filter(fileSlice -> this.checkIfValidFileSlice((FileSlice)fileSlice)).map(fileSlice -> this.createFileStatusUnchecked((FileSlice)fileSlice, timeline.filterCompletedInstants().lastInstant(), basePath, tableMetaClient)).collect(Collectors.toList()));
            }
            finally {
                fsViewCache.forEach((metaClient, fsView) -> fsView.close());
            }
        }
        return targetFiles;
    }

    private static HoodieTimeline getActiveTimeline(HoodieTableMetaClient metaClient, boolean shouldIncludePendingCommits) {
        HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline();
        if (shouldIncludePendingCommits) {
            return timeline;
        }
        return timeline.filterCompletedAndCompactionInstants();
    }

    private static void validateInstant(HoodieTimeline activeTimeline, Option<String> queryInstant) {
        if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) {
            throw new HoodieIOException(String.format("Query instant (%s) not found in the timeline", queryInstant.get()));
        }
    }

    protected boolean checkIfValidFileSlice(FileSlice fileSlice) {
        Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
        Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
        if (baseFileOpt.isPresent()) {
            return true;
        }
        if (latestLogFileOpt.isPresent()) {
            return false;
        }
        throw new IllegalStateException("Invalid state: base-file has to be present for " + fileSlice.getFileId());
    }

    @Nonnull
    protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
        try {
            return HoodieInputFormatUtils.getFileStatus(baseFile);
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Failed to get file-status", ioe);
        }
    }
}

