/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop.realtime;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

@UseFileSplitsFromInputFormat
public class HoodieParquetRealtimeInputFormat
extends HoodieParquetInputFormat
implements Configurable {
    private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
    public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
    public static final int HOODIE_RECORD_KEY_COL_POS = 2;
    public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
    public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set";

    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
        Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit)is);
        Map<Path, List<FileSplit>> partitionsToParquetSplits = fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
        HashMap metaClientMap = new HashMap();
        Map partitionsToMetaClient = partitionsToParquetSplits.keySet().stream().collect(Collectors.toMap(Function.identity(), p -> {
            Option<String> matchingBasePath = Option.fromJavaOptional(metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith((String)basePath)).findFirst());
            if (matchingBasePath.isPresent()) {
                return (HoodieTableMetaClient)metaClientMap.get(matchingBasePath.get());
            }
            try {
                HoodieTableMetaClient metaClient = HoodieParquetRealtimeInputFormat.getTableMetaClient(p.getFileSystem(this.conf), p);
                metaClientMap.put(metaClient.getBasePath(), metaClient);
                return metaClient;
            }
            catch (IOException e) {
                throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
            }
        }));
        ArrayList rtSplits = new ArrayList();
        partitionsToParquetSplits.keySet().stream().forEach(partitionPath -> {
            HoodieTableMetaClient metaClient = (HoodieTableMetaClient)partitionsToMetaClient.get(partitionPath);
            HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
            String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
            try {
                Option<HoodieInstant> latestCompletedInstant = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
                Stream<FileSlice> latestFileSlices = latestCompletedInstant.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp())).orElse(Stream.empty());
                Map<String, List<FileSplit>> groupedInputSplits = ((List)partitionsToParquetSplits.get(partitionPath)).stream().collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
                latestFileSlices.forEach(fileSlice -> {
                    List dataFileSplits = (List)groupedInputSplits.get(fileSlice.getFileId());
                    dataFileSplits.forEach(split -> {
                        try {
                            List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
                            String maxCommitTime = metaClient.getActiveTimeline().getTimelineOfActions(Sets.newHashSet((Object[])new String[]{"commit", "rollback", "deltacommit"})).filterCompletedInstants().lastInstant().get().getTimestamp();
                            rtSplits.add(new HoodieRealtimeFileSplit((FileSplit)split, metaClient.getBasePath(), logFilePaths, maxCommitTime));
                        }
                        catch (IOException e) {
                            throw new HoodieIOException("Error creating hoodie real time split ", e);
                        }
                    });
                });
            }
            catch (Exception e) {
                throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
            }
        });
        LOG.info((Object)("Returning a total splits of " + rtSplits.size()));
        return rtSplits.toArray(new InputSplit[rtSplits.size()]);
    }

    @Override
    public FileStatus[] listStatus(JobConf job) throws IOException {
        return super.listStatus(job);
    }

    private static Configuration addProjectionField(Configuration conf, String fieldName, int fieldIndex) {
        String readColNames = conf.get("hive.io.file.readcolumn.names", "");
        String readColIds = conf.get("hive.io.file.readcolumn.ids", "");
        String readColNamesPrefix = readColNames + ",";
        if (readColNames == null || readColNames.isEmpty()) {
            readColNamesPrefix = "";
        }
        String readColIdsPrefix = readColIds + ",";
        if (readColIds == null || readColIds.isEmpty()) {
            readColIdsPrefix = "";
        }
        if (!readColNames.contains(fieldName)) {
            conf.set("hive.io.file.readcolumn.names", readColNamesPrefix + fieldName);
            conf.set("hive.io.file.readcolumn.ids", readColIdsPrefix + fieldIndex);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)String.format("Adding extra column " + fieldName + ", to enable log merging cols (%s) ids (%s) ", conf.get("hive.io.file.readcolumn.names"), conf.get("hive.io.file.readcolumn.ids")));
            }
        }
        return conf;
    }

    private static Configuration addRequiredProjectionFields(Configuration configuration) {
        configuration = HoodieParquetRealtimeInputFormat.addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, 2);
        configuration = HoodieParquetRealtimeInputFormat.addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, 0);
        configuration = HoodieParquetRealtimeInputFormat.addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, 3);
        return configuration;
    }

    private static Configuration cleanProjectionColumnIds(Configuration conf) {
        String columnIds = conf.get("hive.io.file.readcolumn.ids");
        if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
            conf.set("hive.io.file.readcolumn.ids", columnIds.substring(1));
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("The projection Ids: {" + columnIds + "} start with ','. First comma is removed"));
            }
        }
        return conf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public RecordReader<NullWritable, ArrayWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
        if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
            JobConf jobConf = job;
            synchronized (jobConf) {
                LOG.info((Object)("Before adding Hoodie columns, Projections :" + job.get("hive.io.file.readcolumn.names") + ", Ids :" + job.get("hive.io.file.readcolumn.ids")));
                if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
                    this.conf = HoodieParquetRealtimeInputFormat.cleanProjectionColumnIds((Configuration)job);
                    this.conf = HoodieParquetRealtimeInputFormat.addRequiredProjectionFields((Configuration)job);
                    this.conf.set(HOODIE_READ_COLUMNS_PROP, "true");
                }
            }
        }
        LOG.info((Object)("Creating record reader with readCols :" + job.get("hive.io.file.readcolumn.names") + ", Ids :" + job.get("hive.io.file.readcolumn.ids")));
        Preconditions.checkArgument((boolean)(split instanceof HoodieRealtimeFileSplit), (Object)("HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split));
        return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit)split, job, super.getRecordReader(split, job, reporter));
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }
}

