/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayDeque;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.MinTimestampTracker;
import org.apache.kafka.streams.processor.internals.RecordDeserializer;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StampedRecord;
import org.apache.kafka.streams.processor.internals.TimestampTracker;
import org.slf4j.Logger;

public class RecordQueue {
    private final SourceNode source;
    private final TimestampExtractor timestampExtractor;
    private final TopicPartition partition;
    private final ArrayDeque<StampedRecord> fifoQueue;
    private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
    private final RecordDeserializer recordDeserializer;
    private final ProcessorContext processorContext;
    private final Logger log;
    private long partitionTime = -1L;

    RecordQueue(TopicPartition partition, SourceNode source, TimestampExtractor timestampExtractor, DeserializationExceptionHandler deserializationExceptionHandler, ProcessorContext processorContext, LogContext logContext) {
        this.partition = partition;
        this.source = source;
        this.timestampExtractor = timestampExtractor;
        this.fifoQueue = new ArrayDeque();
        this.timeTracker = new MinTimestampTracker<ConsumerRecord<Object, Object>>();
        this.recordDeserializer = new RecordDeserializer(source, deserializationExceptionHandler, logContext);
        this.processorContext = processorContext;
        this.log = logContext.logger(RecordQueue.class);
    }

    public SourceNode source() {
        return this.source;
    }

    public TopicPartition partition() {
        return this.partition;
    }

    int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
        for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
            long timestamp;
            ConsumerRecord<Object, Object> record = this.recordDeserializer.deserialize(this.processorContext, rawRecord);
            if (record == null) continue;
            try {
                timestamp = this.timestampExtractor.extract(record, this.timeTracker.get());
            }
            catch (StreamsException internalFatalExtractorException) {
                throw internalFatalExtractorException;
            }
            catch (Exception fatalUserException) {
                throw new StreamsException(String.format("Fatal user code error in TimestampExtractor callback for record %s.", record), fatalUserException);
            }
            this.log.trace("Source node {} extracted timestamp {} for record {}", this.source.name(), timestamp, record);
            if (timestamp < 0L) continue;
            StampedRecord stampedRecord = new StampedRecord(record, timestamp);
            this.fifoQueue.addLast(stampedRecord);
            this.timeTracker.addElement(stampedRecord);
        }
        long timestamp = this.timeTracker.get();
        if (timestamp > this.partitionTime) {
            this.partitionTime = timestamp;
        }
        return this.size();
    }

    public StampedRecord poll() {
        StampedRecord elem = this.fifoQueue.pollFirst();
        if (elem == null) {
            return null;
        }
        this.timeTracker.removeElement(elem);
        long timestamp = this.timeTracker.get();
        if (timestamp > this.partitionTime) {
            this.partitionTime = timestamp;
        }
        return elem;
    }

    public int size() {
        return this.fifoQueue.size();
    }

    public boolean isEmpty() {
        return this.fifoQueue.isEmpty();
    }

    public long timestamp() {
        return this.partitionTime;
    }

    public void clear() {
        this.fifoQueue.clear();
        this.timeTracker.clear();
        this.partitionTime = -1L;
    }

    TimestampTracker<ConsumerRecord<Object, Object>> timeTracker() {
        return this.timeTracker;
    }
}

