/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.io.Serializable;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class DatePartitionPathSelector
extends DFSPathSelector {
    private static volatile Logger LOG = LogManager.getLogger(DatePartitionPathSelector.class);
    private final String dateFormat;
    private final int datePartitionDepth;
    private final int numPrevDaysToList;
    private final int partitionsListParallelism;

    public DatePartitionPathSelector(TypedProperties props, Configuration hadoopConf) {
        super(props, hadoopConf);
        this.dateFormat = props.getString("hoodie.deltastreamer.source.dfs.datepartitioned.date.format", "yyyy-MM-dd");
        this.datePartitionDepth = props.getInteger("hoodie.deltastreamer.source.dfs.datepartitioned.selector.depth", 0);
        this.numPrevDaysToList = props.getInteger("hoodie.deltastreamer.source.dfs.datepartitioned.selector.lookback.days", 2);
        this.partitionsListParallelism = props.getInteger("hoodie.deltastreamer.source.dfs.datepartitioned.selector.parallelism", 20);
    }

    @Override
    public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext, Option<String> lastCheckpointStr, long sourceLimit) {
        FileStatus f2;
        LocalDate currentDate = LocalDate.parse(this.props.getString("hoodie.deltastreamer.source.dfs.datepartitioned.selector.currentdate", LocalDate.now().toString()));
        LOG.info((Object)("Root path => " + this.props.getString("hoodie.deltastreamer.source.dfs.root") + " source limit => " + sourceLimit + " depth of day partition => " + this.datePartitionDepth + " num prev days to list => " + this.numPrevDaysToList + " from current date => " + currentDate));
        long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
        SerializableConfiguration serializedConf = new SerializableConfiguration(this.fs.getConf());
        List<String> prunedPartitionPaths = this.pruneDatePartitionPaths(context, this.fs, this.props.getString("hoodie.deltastreamer.source.dfs.root"), currentDate);
        List eligibleFiles = context.flatMap(prunedPartitionPaths, path -> {
            FileSystem fs = new Path(path).getFileSystem(serializedConf.get());
            return this.listEligibleFiles(fs, new Path(path), lastCheckpointTime).stream();
        }, this.partitionsListParallelism);
        List sortedEligibleFiles = eligibleFiles.stream().sorted(Comparator.comparingLong(FileStatus::getModificationTime)).collect(Collectors.toList());
        long currentBytes = 0L;
        long newCheckpointTime = lastCheckpointTime;
        ArrayList<FileStatus> filteredFiles = new ArrayList<FileStatus>();
        Iterator iterator2 = sortedEligibleFiles.iterator();
        while (iterator2.hasNext() && (currentBytes + (f2 = (FileStatus)iterator2.next()).getLen() < sourceLimit || f2.getModificationTime() <= newCheckpointTime)) {
            newCheckpointTime = f2.getModificationTime();
            currentBytes += f2.getLen();
            filteredFiles.add(f2);
        }
        if (filteredFiles.isEmpty()) {
            return new ImmutablePair<Option<String>, String>(Option.empty(), String.valueOf(newCheckpointTime));
        }
        String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
        return new ImmutablePair<Option<String>, String>(Option.ofNullable(pathStr), String.valueOf(newCheckpointTime));
    }

    public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath, LocalDate currentDate) {
        List<String> partitionPaths = new ArrayList<String>();
        partitionPaths.add(rootPath);
        if (this.datePartitionDepth <= 0) {
            return partitionPaths;
        }
        SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
        for (int i = 0; i < this.datePartitionDepth; ++i) {
            partitionPaths = context.flatMap(partitionPaths, path -> {
                Path subDir = new Path(path);
                FileSystem fileSystem2 = subDir.getFileSystem(serializedConf.get());
                FileStatus[] statuses = fileSystem2.listStatus(subDir, file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith((String)pfx)));
                ArrayList<String> res = new ArrayList<String>();
                for (FileStatus status : statuses) {
                    res.add(status.getPath().toString());
                }
                return res.stream();
            }, this.partitionsListParallelism);
        }
        return context.getJavaSparkContext().parallelize(partitionPaths, this.partitionsListParallelism).filter((Function & Serializable)s -> {
            LocalDate partitionDate;
            LocalDate fromDate = currentDate.minusDays(this.numPrevDaysToList);
            String[] splits = s.split("/");
            String datePartition = splits[splits.length - 1];
            DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern(this.dateFormat);
            if (datePartition.contains("=")) {
                String[] moreSplit = datePartition.split("=");
                ValidationUtils.checkArgument(moreSplit.length == 2, "Partition Field (" + datePartition + ") not in expected format");
                partitionDate = LocalDate.parse(moreSplit[1], dateFormatter);
            } else {
                partitionDate = LocalDate.parse(datePartition, dateFormatter);
            }
            return !(!partitionDate.isEqual(fromDate) && !partitionDate.isAfter(fromDate) || !partitionDate.isEqual(currentDate) && !partitionDate.isBefore(currentDate));
        }).collect();
    }

    public static class Config {
        public static final String DATE_FORMAT = "hoodie.deltastreamer.source.dfs.datepartitioned.date.format";
        public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd";
        public static final String DATE_PARTITION_DEPTH = "hoodie.deltastreamer.source.dfs.datepartitioned.selector.depth";
        public static final int DEFAULT_DATE_PARTITION_DEPTH = 0;
        public static final String LOOKBACK_DAYS = "hoodie.deltastreamer.source.dfs.datepartitioned.selector.lookback.days";
        public static final int DEFAULT_LOOKBACK_DAYS = 2;
        public static final String CURRENT_DATE = "hoodie.deltastreamer.source.dfs.datepartitioned.selector.currentdate";
        public static final String PARTITIONS_LIST_PARALLELISM = "hoodie.deltastreamer.source.dfs.datepartitioned.selector.parallelism";
        public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
    }
}

