/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.source.kafka.hadoop;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Locale;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.apache.kylin.source.kafka.hadoop.KafkaInputSplit;
import org.apache.kylin.source.kafka.util.KafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaInputRecordReader
extends RecordReader<LongWritable, BytesWritable> {
    static Logger log = LoggerFactory.getLogger(KafkaInputRecordReader.class);
    public static final long DEFAULT_KAFKA_CONSUMER_POLL_TIMEOUT = 60000L;
    private Configuration conf;
    private KafkaInputSplit split;
    private Consumer consumer;
    private String brokers;
    private String topic;
    private int partition;
    private long earliestOffset;
    private long watermark;
    private long latestOffset;
    private ConsumerRecords<String, String> messages;
    private Iterator<ConsumerRecord<String, String>> iterator;
    private LongWritable key;
    private BytesWritable value;
    private long timeOut = 60000L;
    private long numProcessedMessages = 0L;

    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.initialize(split, context.getConfiguration());
    }

    public void initialize(InputSplit split, Configuration conf) throws IOException, InterruptedException {
        this.conf = conf;
        this.split = (KafkaInputSplit)split;
        this.brokers = this.split.getBrokers();
        this.topic = this.split.getTopic();
        this.partition = this.split.getPartition();
        this.watermark = this.split.getOffsetStart();
        if (conf.get("kafka.connect.timeout") != null) {
            this.timeOut = Long.parseLong(conf.get("kafka.connect.timeout"));
        }
        String consumerGroup = conf.get("kafka.consumer.group");
        Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
        this.consumer = KafkaClient.getKafkaConsumer(this.brokers, consumerGroup, kafkaProperties);
        this.earliestOffset = this.split.getOffsetStart();
        this.latestOffset = this.split.getOffsetEnd();
        TopicPartition topicPartition = new TopicPartition(this.topic, this.partition);
        this.consumer.assign(Arrays.asList(topicPartition));
        log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}", new Object[]{this.split, this.topic, this.split.getBrokers(), this.partition, this.earliestOffset, this.latestOffset});
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (this.key == null) {
            this.key = new LongWritable();
        }
        if (this.value == null) {
            this.value = new BytesWritable();
        }
        if (this.watermark >= this.latestOffset) {
            log.info("Reach the end offset, stop reading.");
            return false;
        }
        if (this.messages == null) {
            log.info("{} fetching offset {} ", (Object)(this.topic + ":" + this.split.getBrokers() + ":" + this.partition), (Object)this.watermark);
            TopicPartition topicPartition = new TopicPartition(this.topic, this.partition);
            this.consumer.seek(topicPartition, this.watermark);
            this.messages = this.consumer.poll(this.timeOut);
            this.iterator = this.messages.iterator();
            if (!this.iterator.hasNext()) {
                log.info("No more messages, stop");
                throw new IOException(String.format(Locale.ROOT, "Unexpected ending of stream, expected ending offset %d, but end at %d", this.latestOffset, this.watermark));
            }
        }
        if (this.iterator.hasNext()) {
            ConsumerRecord<String, String> message = this.iterator.next();
            this.key.set(message.offset());
            byte[] valuebytes = Bytes.toBytes((String)((String)message.value()));
            this.value.set(valuebytes, 0, valuebytes.length);
            this.watermark = message.offset() + 1L;
            ++this.numProcessedMessages;
            if (!this.iterator.hasNext()) {
                this.messages = null;
                this.iterator = null;
            }
            return true;
        }
        log.error("Unexpected iterator end.");
        throw new IOException(String.format(Locale.ROOT, "Unexpected ending of stream, expected ending offset %d, but end at %d", this.latestOffset, this.watermark));
    }

    public LongWritable getCurrentKey() throws IOException, InterruptedException {
        return this.key;
    }

    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    public float getProgress() throws IOException, InterruptedException {
        if (this.watermark >= this.latestOffset || this.earliestOffset == this.latestOffset) {
            return 1.0f;
        }
        return Math.min(1.0f, (float)(this.watermark - this.earliestOffset) / (float)(this.latestOffset - this.earliestOffset));
    }

    public void close() throws IOException {
        log.info("{} num. processed messages {} ", (Object)(this.topic + ":" + this.split.getBrokers() + ":" + this.partition), (Object)this.numProcessedMessages);
        this.consumer.close();
    }
}

