/*
 * Decompiled with CFR 0.152.
 */
package io.druid.indexer.hadoop;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import io.druid.indexer.hadoop.DatasourceInputSplit;
import io.druid.indexer.hadoop.DatasourceRecordReader;
import io.druid.indexer.hadoop.WindowedDataSegment;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class DatasourceInputFormat
extends InputFormat<NullWritable, InputRow> {
    private static final Logger logger = new Logger(DatasourceInputFormat.class);
    public static final String CONF_INPUT_SEGMENTS = "druid.segments";
    public static final String CONF_DRUID_SCHEMA = "druid.datasource.schema";
    public static final String CONF_TRANSFORM_SPEC = "druid.datasource.transformSpec";
    public static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.split.max.size";
    private Supplier<org.apache.hadoop.mapred.InputFormat> supplier = new Supplier<org.apache.hadoop.mapred.InputFormat>(){

        public org.apache.hadoop.mapred.InputFormat get() {
            return new TextInputFormat(){

                protected boolean isSplitable(FileSystem fs, Path file) {
                    return false;
                }

                protected FileStatus[] listStatus(JobConf job) throws IOException {
                    ArrayList statusList = Lists.newArrayList();
                    for (Path path : FileInputFormat.getInputPaths((JobConf)job)) {
                        statusList.add(path.getFileSystem((Configuration)job).getFileStatus(path));
                    }
                    return statusList.toArray(new FileStatus[statusList.size()]);
                }
            };
        }
    };

    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
        String segmentsStr = (String)Preconditions.checkNotNull((Object)conf.get(CONF_INPUT_SEGMENTS), (Object)"No segments found to read");
        List segments = (List)HadoopDruidIndexerConfig.JSON_MAPPER.readValue(segmentsStr, (TypeReference)new TypeReference<List<WindowedDataSegment>>(){});
        if (segments == null || segments.size() == 0) {
            throw new ISE("No segments found to read", new Object[0]);
        }
        logger.info("segments to read [%s]", new Object[]{segmentsStr});
        long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0L);
        if (maxSize < 0L) {
            long totalSize = 0L;
            for (WindowedDataSegment segment : segments) {
                totalSize += segment.getSegment().getSize();
            }
            int mapTask = ((JobConf)conf).getNumMapTasks();
            if (mapTask > 0) {
                maxSize = totalSize / (long)mapTask;
            }
        }
        if (maxSize > 0L) {
            Collections.sort(segments, new Comparator<WindowedDataSegment>(){

                @Override
                public int compare(WindowedDataSegment s1, WindowedDataSegment s2) {
                    return Long.compare(s1.getSegment().getSize(), s2.getSegment().getSize());
                }
            });
        }
        ArrayList splits = Lists.newArrayList();
        ArrayList list = new ArrayList();
        long size = 0L;
        JobConf dummyConf = new JobConf();
        org.apache.hadoop.mapred.InputFormat fio = (org.apache.hadoop.mapred.InputFormat)this.supplier.get();
        for (WindowedDataSegment segment : segments) {
            if (size + segment.getSegment().getSize() > maxSize && size > 0L) {
                splits.add(this.toDataSourceSplit(list, fio, dummyConf));
                list = Lists.newArrayList();
                size = 0L;
            }
            list.add((WindowedDataSegment)segment);
            size += segment.getSegment().getSize();
        }
        if (list.size() > 0) {
            splits.add(this.toDataSourceSplit(list, fio, dummyConf));
        }
        logger.info("Number of splits [%d]", new Object[]{splits.size()});
        return splits;
    }

    public RecordReader<NullWritable, InputRow> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new DatasourceRecordReader();
    }

    @VisibleForTesting
    DatasourceInputFormat setSupplier(Supplier<org.apache.hadoop.mapred.InputFormat> supplier) {
        this.supplier = supplier;
        return this;
    }

    private DatasourceInputSplit toDataSourceSplit(List<WindowedDataSegment> segments, org.apache.hadoop.mapred.InputFormat fio, JobConf conf) {
        String[] locations = DatasourceInputFormat.getFrequentLocations(DatasourceInputFormat.getLocations(segments, fio, conf));
        return new DatasourceInputSplit(segments, locations);
    }

    @VisibleForTesting
    static Stream<String> getLocations(List<WindowedDataSegment> segments, org.apache.hadoop.mapred.InputFormat fio, JobConf conf) {
        return ((Stream)segments.stream().sequential()).flatMap(segment -> {
            FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{new Path(JobHelper.getURIFromSegment(segment.getSegment()))});
            try {
                return Arrays.stream(fio.getSplits(conf, 1)).flatMap(split -> {
                    try {
                        return Arrays.stream(split.getLocations());
                    }
                    catch (IOException e) {
                        logger.error((Throwable)e, "Exception getting locations", new Object[0]);
                        return Stream.empty();
                    }
                });
            }
            catch (IOException e) {
                logger.error((Throwable)e, "Exception getting splits", new Object[0]);
                return Stream.empty();
            }
        });
    }

    @VisibleForTesting
    static String[] getFrequentLocations(Stream<String> locations) {
        Map<String, Long> locationCountMap = locations.collect(Collectors.groupingBy(location -> location, Collectors.counting()));
        Comparator valueComparator = Map.Entry.comparingByValue(Comparator.reverseOrder());
        Comparator keyComparator = Map.Entry.comparingByKey();
        return (String[])locationCountMap.entrySet().stream().sorted(valueComparator.thenComparing(keyComparator)).limit(3L).map(Map.Entry::getKey).toArray(String[]::new);
    }
}

