/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.source.kafka.hadoop;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kylin.shaded.com.google.common.base.Preconditions;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.apache.kylin.source.kafka.hadoop.KafkaInputRecordReader;
import org.apache.kylin.source.kafka.hadoop.KafkaInputSplit;
import org.apache.kylin.source.kafka.util.KafkaClient;

public class KafkaInputFormat
extends InputFormat<LongWritable, BytesWritable> {
    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
        String brokers = conf.get("kafka.brokers");
        String inputTopic = conf.get("kafka.topic");
        String consumerGroup = conf.get("kafka.consumer.group");
        Integer partitionMin = Integer.valueOf(conf.get("kafka.partition.min"));
        Integer partitionMax = Integer.valueOf(conf.get("kafka.partition.max"));
        Integer spiltRows = Integer.valueOf(conf.get("kafka.split.rows"));
        HashMap startOffsetMap = Maps.newHashMap();
        HashMap endOffsetMap = Maps.newHashMap();
        for (int i = partitionMin.intValue(); i <= partitionMax; ++i) {
            String start = conf.get("kafka.partition.start." + i);
            String end = conf.get("kafka.partition.end." + i);
            if (start == null || end == null) continue;
            startOffsetMap.put(i, Long.valueOf(start));
            endOffsetMap.put(i, Long.valueOf(end));
        }
        Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
        ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
        try (KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties);){
            List partitionInfos = consumer.partitionsFor(inputTopic);
            Preconditions.checkArgument((partitionInfos.size() == startOffsetMap.size() ? 1 : 0) != 0, (Object)"partition number mismatch with server side");
            block10: for (int i = 0; i < partitionInfos.size(); ++i) {
                PartitionInfo partition = (PartitionInfo)partitionInfos.get(i);
                int partitionId = partition.partition();
                if (!startOffsetMap.containsKey(partitionId)) {
                    throw new IllegalStateException("Partition '" + partitionId + "' not exists.");
                }
                long end = (Long)endOffsetMap.get(partitionId);
                for (long new_start = ((Long)startOffsetMap.get(partitionId)).longValue(); end > new_start; new_start += (long)spiltRows.intValue()) {
                    KafkaInputSplit split;
                    if (end - new_start <= (long)spiltRows.intValue() && end > new_start) {
                        split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end);
                        splits.add(split);
                        continue block10;
                    }
                    split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + (long)spiltRows.intValue());
                    splits.add(split);
                }
            }
        }
        return splits;
    }

    public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
        return new KafkaInputRecordReader();
    }
}

