/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.Handover;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
@Internal
public class KafkaFetcher<T>
extends AbstractFetcher<T, TopicPartition> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaFetcher.class);
    private final KafkaDeserializationSchema<T> deserializer;
    private final KafkaCollector kafkaCollector;
    final Handover handover;
    final KafkaConsumerThread consumerThread;
    volatile boolean running = true;
    private long fetchTimestamp;

    public KafkaFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<WatermarkStrategy<T>> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, KafkaDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
        super(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, subtaskMetricGroup, consumerMetricGroup, useMetrics);
        this.deserializer = deserializer;
        this.handover = new Handover();
        this.consumerThread = new KafkaConsumerThread(LOG, this.handover, kafkaProperties, this.unassignedPartitionsQueue, this.getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, useMetrics, consumerMetricGroup, subtaskMetricGroup, this.kafkaMetricRecorder);
        this.kafkaCollector = new KafkaCollector();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void runFetchLoop() throws Exception {
        try {
            this.consumerThread.start();
            while (this.running) {
                Handover.ConsumerRecordsWithFetchTime recordsWithFetchTime = this.handover.pollNext();
                ConsumerRecords<byte[], byte[]> records = recordsWithFetchTime.getRecords();
                this.kafkaMetricRecorder.getNumRecordsInCounter().inc((long)records.count());
                this.fetchTimestamp = recordsWithFetchTime.getFetchTimestamp();
                for (KafkaTopicPartitionState partition : this.subscribedPartitionStates()) {
                    List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records((TopicPartition)partition.getKafkaPartitionHandle());
                    this.partitionConsumerRecordsHandler(partitionRecords, partition);
                }
            }
        }
        catch (Handover.ClosedException ex) {
            if (this.running) {
                ExceptionUtils.rethrowException((Throwable)ex);
            }
        }
        finally {
            this.consumerThread.shutdown();
        }
        try {
            this.consumerThread.join();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void cancel() {
        this.running = false;
        this.handover.close();
        this.consumerThread.shutdown();
    }

    protected String getFetcherName() {
        return "Kafka Fetcher";
    }

    protected void partitionConsumerRecordsHandler(List<ConsumerRecord<byte[], byte[]>> partitionRecords, KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {
        ConsumerRecord<byte[], byte[]> lastRecord = null;
        for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
            this.deserializer.deserialize(record, this.kafkaCollector);
            this.kafkaMetricRecorder.getNumBytesInCounter().inc((long)(record.serializedKeySize() + record.serializedValueSize()));
            this.emitRecordsWithTimestamps(this.kafkaCollector.getRecords(), partition, record.offset(), record.timestamp());
            if (this.kafkaCollector.isEndOfStreamSignalled()) {
                this.running = false;
                break;
            }
            lastRecord = record;
        }
        if (lastRecord != null) {
            this.kafkaMetricRecorder.getCurrentFetchEventTimeLag().update(this.fetchTimestamp - lastRecord.timestamp());
            long currentTimestampMs = this.currentTimestampMs();
            this.kafkaMetricRecorder.getCurrentEmitEventTimeLag().update(currentTimestampMs - lastRecord.timestamp());
            this.kafkaMetricRecorder.getSourceIdleTime().update(currentTimestampMs);
        }
    }

    @Override
    public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
        return new TopicPartition(partition.getTopic(), partition.getPartition());
    }

    @Override
    protected void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception {
        List partitions = this.subscribedPartitionStates();
        HashMap<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<TopicPartition, OffsetAndMetadata>(partitions.size());
        for (KafkaTopicPartitionState partition : partitions) {
            Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
            if (lastProcessedOffset == null) continue;
            Preconditions.checkState((lastProcessedOffset >= 0L ? 1 : 0) != 0, (Object)"Illegal offset value to commit");
            long offsetToCommit = lastProcessedOffset + 1L;
            offsetsToCommit.put((TopicPartition)partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
            partition.setCommittedOffset(offsetToCommit);
        }
        this.consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
    }

    private class KafkaCollector
    implements Collector<T> {
        private final Queue<T> records = new ArrayDeque();
        private boolean endOfStreamSignalled = false;

        private KafkaCollector() {
        }

        public void collect(T record) {
            if (this.endOfStreamSignalled || KafkaFetcher.this.deserializer.isEndOfStream(record)) {
                this.endOfStreamSignalled = true;
                return;
            }
            this.records.add(record);
        }

        public Queue<T> getRecords() {
            return this.records;
        }

        public boolean isEndOfStreamSignalled() {
            return this.endOfStreamSignalled;
        }

        public void close() {
        }
    }
}

