/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.kafka;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.kafka.KafkaInputSplit;
import org.apache.hadoop.hive.kafka.KafkaRecordIterator;
import org.apache.hadoop.hive.kafka.KafkaRecordReader;
import org.apache.hadoop.hive.kafka.KafkaSerDe;
import org.apache.hadoop.hive.kafka.KafkaTableProperties;
import org.apache.hadoop.hive.kafka.KafkaUtils;
import org.apache.hadoop.hive.kafka.KafkaWritable;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorAssignRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.kafkaesqueesque.clients.consumer.ConsumerRecord;
import org.apache.kafkaesqueesque.clients.consumer.KafkaConsumer;
import org.apache.kafkaesqueesque.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class VectorizedKafkaRecordReader
implements RecordReader<NullWritable, VectorizedRowBatch> {
    private static final Logger LOG = LoggerFactory.getLogger(VectorizedKafkaRecordReader.class);
    private final KafkaConsumer<byte[], byte[]> consumer;
    private final Iterator<ConsumerRecord<byte[], byte[]>> recordsCursor;
    private long totalNumberRecords = 0L;
    private long consumedRecords = 0L;
    private long readBytes = 0L;
    private final VectorizedRowBatchCtx rbCtx;
    private final int[] projectedColumns;
    private final KafkaSerDe serDe;
    private final VectorAssignRow vectorAssignRow = new VectorAssignRow();
    private final KafkaWritable kafkaWritable = new KafkaWritable();
    final Object[] row;

    VectorizedKafkaRecordReader(KafkaInputSplit inputSplit, Configuration jobConf) {
        this.rbCtx = Utilities.getVectorizedRowBatchCtx((Configuration)jobConf);
        if (this.rbCtx.getDataColumnNums() != null) {
            this.projectedColumns = this.rbCtx.getDataColumnNums();
        } else {
            this.projectedColumns = new int[this.rbCtx.getRowColumnTypeInfos().length];
            for (int i = 0; i < this.projectedColumns.length; ++i) {
                this.projectedColumns[i] = i;
            }
        }
        this.serDe = VectorizedKafkaRecordReader.createAndInitializeSerde(jobConf);
        try {
            this.vectorAssignRow.init((StructObjectInspector)this.serDe.getObjectInspector());
        }
        catch (HiveException e) {
            throw new RuntimeException(e);
        }
        long startOffset = inputSplit.getStartOffset();
        long endOffset = inputSplit.getEndOffset();
        TopicPartition topicPartition = new TopicPartition(inputSplit.getTopic(), inputSplit.getPartition());
        Preconditions.checkState((startOffset >= 0L && startOffset <= endOffset ? 1 : 0) != 0, (String)"Start [%s] has to be positive and Less than or equal to End [%s]", (Object[])new Object[]{startOffset, endOffset});
        this.totalNumberRecords += endOffset - startOffset;
        Properties properties = KafkaUtils.consumerProperties(jobConf);
        this.consumer = new KafkaConsumer(properties);
        long pollTimeout = jobConf.getLong(KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName(), -1L);
        LOG.debug("Consumer poll timeout [{}] ms", (Object)pollTimeout);
        this.recordsCursor = startOffset == endOffset ? new KafkaRecordReader.EmptyIterator() : new KafkaRecordIterator(this.consumer, topicPartition, startOffset, endOffset, pollTimeout);
        this.row = new Object[((StructObjectInspector)this.serDe.getObjectInspector()).getAllStructFieldRefs().size()];
    }

    public boolean next(NullWritable nullWritable, VectorizedRowBatch vectorizedRowBatch) throws IOException {
        vectorizedRowBatch.reset();
        try {
            return this.readNextBatch(vectorizedRowBatch, this.recordsCursor) > 0;
        }
        catch (SerDeException e) {
            throw new IOException("Serde exception", e);
        }
    }

    private void cleanRowBoat() {
        for (int i = 0; i < this.row.length; ++i) {
            this.row[i] = null;
        }
    }

    public NullWritable createKey() {
        return NullWritable.get();
    }

    public VectorizedRowBatch createValue() {
        return this.rbCtx.createVectorizedRowBatch();
    }

    public long getPos() throws IOException {
        return -1L;
    }

    public float getProgress() {
        if (this.consumedRecords >= this.totalNumberRecords) {
            return 1.0f;
        }
        if (this.consumedRecords == 0L) {
            return 0.0f;
        }
        return (float)this.consumedRecords * 1.0f / (float)this.totalNumberRecords;
    }

    public void close() {
        LOG.trace("total read bytes [{}]", (Object)this.readBytes);
        if (this.consumer != null) {
            this.consumer.wakeup();
            this.consumer.close();
        }
    }

    private int readNextBatch(VectorizedRowBatch vectorizedRowBatch, Iterator<ConsumerRecord<byte[], byte[]>> recordIterator) throws SerDeException {
        int rowsCount;
        for (rowsCount = 0; recordIterator.hasNext() && rowsCount < vectorizedRowBatch.getMaxSize(); ++rowsCount) {
            ConsumerRecord<byte[], byte[]> kRecord = recordIterator.next();
            this.kafkaWritable.set(kRecord);
            this.readBytes += (long)(kRecord.serializedKeySize() + kRecord.serializedValueSize());
            if (this.projectedColumns.length <= 0) continue;
            this.serDe.deserializeKWritable(this.kafkaWritable, this.row);
            for (int i : this.projectedColumns) {
                this.vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, this.row[i]);
            }
        }
        vectorizedRowBatch.size = rowsCount;
        this.consumedRecords += (long)rowsCount;
        this.cleanRowBoat();
        return rowsCount;
    }

    private static KafkaSerDe createAndInitializeSerde(Configuration jobConf) {
        KafkaSerDe serDe = new KafkaSerDe();
        MapWork mapWork = (MapWork)Preconditions.checkNotNull((Object)Utilities.getMapWork((Configuration)jobConf), (Object)"Map work is null");
        Properties properties = mapWork.getPartitionDescs().stream().map(partitionDesc -> partitionDesc.getTableDesc().getProperties()).findAny().orElseThrow(() -> new RuntimeException("Can not find table property at the map work"));
        try {
            serDe.initialize(jobConf, properties, null);
        }
        catch (SerDeException e) {
            throw new RuntimeException("Can not initialized the serde", e);
        }
        return serDe;
    }
}

