/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RecordQueueTest {
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
    private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
    private final String[] topics = new String[]{"topic"};
    private final Sensor skippedRecordsSensor = new Metrics().sensor("skipped-records");
    final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes((String)"anyName", Bytes.class, Bytes.class), (RecordCollector)new RecordCollectorImpl(null, new LogContext("record-queue-test "), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), this.skippedRecordsSensor));
    private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<Integer, Integer>(this.topics, this.intDeserializer, this.intDeserializer);
    private final RecordQueue queue = new RecordQueue(new TopicPartition(this.topics[0], 1), (SourceNode)this.mockSourceNodeWithMetrics, this.timestampExtractor, (DeserializationExceptionHandler)new LogAndFailExceptionHandler(), (InternalProcessorContext)this.context, new LogContext());
    private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(new TopicPartition(this.topics[0], 1), (SourceNode)this.mockSourceNodeWithMetrics, this.timestampExtractor, (DeserializationExceptionHandler)new LogAndContinueExceptionHandler(), (InternalProcessorContext)this.context, new LogContext());
    private final byte[] recordValue = this.intSerializer.serialize(null, (Object)10);
    private final byte[] recordKey = this.intSerializer.serialize(null, (Object)1);

    @Before
    public void before() {
        this.mockSourceNodeWithMetrics.init((InternalProcessorContext)this.context);
    }

    @After
    public void after() {
        this.mockSourceNodeWithMetrics.close();
    }

    @Test
    public void testTimeTracking() {
        Assert.assertTrue((boolean)this.queue.isEmpty());
        Assert.assertEquals((long)0L, (long)this.queue.size());
        Assert.assertEquals((long)-1L, (long)this.queue.headRecordTimestamp());
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue));
        this.queue.addRawRecords(list1);
        Assert.assertEquals((long)3L, (long)this.queue.size());
        Assert.assertEquals((long)2L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)2L, (long)this.queue.poll().timestamp);
        Assert.assertEquals((long)2L, (long)this.queue.size());
        Assert.assertEquals((long)1L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)1L, (long)this.queue.poll().timestamp);
        Assert.assertEquals((long)1L, (long)this.queue.size());
        Assert.assertEquals((long)3L, (long)this.queue.headRecordTimestamp());
        List<ConsumerRecord> list2 = Arrays.asList(new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue));
        this.queue.addRawRecords(list2);
        Assert.assertEquals((long)4L, (long)this.queue.size());
        Assert.assertEquals((long)3L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)3L, (long)this.queue.poll().timestamp);
        Assert.assertEquals((long)3L, (long)this.queue.size());
        Assert.assertEquals((long)4L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)4L, (long)this.queue.poll().timestamp);
        Assert.assertEquals((long)1L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)1L, (long)this.queue.poll().timestamp);
        Assert.assertEquals((long)2L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)2L, (long)this.queue.poll().timestamp);
        Assert.assertTrue((boolean)this.queue.isEmpty());
        Assert.assertEquals((long)0L, (long)this.queue.size());
        Assert.assertEquals((long)-1L, (long)this.queue.headRecordTimestamp());
        List<ConsumerRecord> list3 = Arrays.asList(new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 6L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue));
        this.queue.addRawRecords(list3);
        Assert.assertEquals((long)3L, (long)this.queue.size());
        Assert.assertEquals((long)4L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)4L, (long)this.queue.poll().timestamp);
        Assert.assertEquals((long)2L, (long)this.queue.size());
        Assert.assertEquals((long)5L, (long)this.queue.headRecordTimestamp());
        this.queue.clear();
        Assert.assertTrue((boolean)this.queue.isEmpty());
        Assert.assertEquals((long)0L, (long)this.queue.size());
        Assert.assertEquals((long)-1L, (long)this.queue.headRecordTimestamp());
        this.queue.addRawRecords(list3);
        Assert.assertEquals((long)3L, (long)this.queue.size());
        Assert.assertEquals((long)4L, (long)this.queue.headRecordTimestamp());
    }

    @Test
    public void shouldTrackPartitionTimeAsMaxSeenTimestamp() {
        Assert.assertTrue((boolean)this.queue.isEmpty());
        Assert.assertEquals((long)0L, (long)this.queue.size());
        Assert.assertEquals((long)-1L, (long)this.queue.headRecordTimestamp());
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue));
        Assert.assertEquals((long)this.queue.partitionTime(), (long)-1L);
        this.queue.addRawRecords(list1);
        Assert.assertEquals((long)this.queue.partitionTime(), (long)2L);
        this.queue.poll();
        Assert.assertEquals((long)this.queue.partitionTime(), (long)2L);
        this.queue.poll();
        Assert.assertEquals((long)this.queue.partitionTime(), (long)3L);
    }

    @Test
    public void shouldSetTimestampAndRespectMaxTimestampPolicy() {
        Assert.assertTrue((boolean)this.queue.isEmpty());
        Assert.assertEquals((long)0L, (long)this.queue.size());
        Assert.assertEquals((long)-1L, (long)this.queue.headRecordTimestamp());
        this.queue.setPartitionTime(150L);
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 200L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 100L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 300L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 400L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue));
        Assert.assertEquals((long)150L, (long)this.queue.partitionTime());
        this.queue.addRawRecords(list1);
        Assert.assertEquals((long)200L, (long)this.queue.partitionTime());
        this.queue.setPartitionTime(500L);
        this.queue.poll();
        Assert.assertEquals((long)500L, (long)this.queue.partitionTime());
        this.queue.poll();
        Assert.assertEquals((long)500L, (long)this.queue.partitionTime());
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
        byte[] key = Serdes.Long().serializer().serialize("foo", (Object)1L);
        List<ConsumerRecord> records = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)key, (Object)this.recordValue));
        this.queue.addRawRecords(records);
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
        byte[] value = Serdes.Long().serializer().serialize("foo", (Object)1L);
        List<ConsumerRecord> records = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)value));
        this.queue.addRawRecords(records);
    }

    @Test
    public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
        byte[] key = Serdes.Long().serializer().serialize("foo", (Object)1L);
        List<ConsumerRecord> records = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)key, (Object)this.recordValue));
        this.queueThatSkipsDeserializeErrors.addRawRecords(records);
        Assert.assertEquals((long)0L, (long)this.queueThatSkipsDeserializeErrors.size());
    }

    @Test
    public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() {
        byte[] value = Serdes.Long().serializer().serialize("foo", (Object)1L);
        List<ConsumerRecord> records = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)value));
        this.queueThatSkipsDeserializeErrors.addRawRecords(records);
        Assert.assertEquals((long)0L, (long)this.queueThatSkipsDeserializeErrors.size());
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowOnNegativeTimestamp() {
        List<ConsumerRecord> records = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue));
        RecordQueue queue = new RecordQueue(new TopicPartition(this.topics[0], 1), new MockSourceNode<Integer, Integer>(this.topics, this.intDeserializer, this.intDeserializer), (TimestampExtractor)new FailOnInvalidTimestamp(), (DeserializationExceptionHandler)new LogAndContinueExceptionHandler(), (InternalProcessorContext)new InternalMockProcessorContext(), new LogContext());
        queue.addRawRecords(records);
    }

    @Test
    public void shouldDropOnNegativeTimestamp() {
        List<ConsumerRecord> records = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue));
        RecordQueue queue = new RecordQueue(new TopicPartition(this.topics[0], 1), new MockSourceNode<Integer, Integer>(this.topics, this.intDeserializer, this.intDeserializer), (TimestampExtractor)new LogAndSkipOnInvalidTimestamp(), (DeserializationExceptionHandler)new LogAndContinueExceptionHandler(), (InternalProcessorContext)new InternalMockProcessorContext(), new LogContext());
        queue.addRawRecords(records);
        Assert.assertEquals((long)0L, (long)queue.size());
    }

    @Test
    public void shouldPassPartitionTimeToTimestampExtractor() {
        PartitionTimeTrackingTimestampExtractor timestampExtractor = new PartitionTimeTrackingTimestampExtractor();
        RecordQueue queue = new RecordQueue(new TopicPartition(this.topics[0], 1), (SourceNode)this.mockSourceNodeWithMetrics, (TimestampExtractor)timestampExtractor, (DeserializationExceptionHandler)new LogAndFailExceptionHandler(), (InternalProcessorContext)this.context, new LogContext());
        Assert.assertTrue((boolean)queue.isEmpty());
        Assert.assertEquals((long)0L, (long)queue.size());
        Assert.assertEquals((long)-1L, (long)queue.headRecordTimestamp());
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue));
        Assert.assertEquals((long)-1L, (long)timestampExtractor.partitionTime);
        queue.addRawRecords(list1);
        Assert.assertEquals((long)-1L, (long)timestampExtractor.partitionTime);
        queue.poll();
        Assert.assertEquals((long)2L, (long)timestampExtractor.partitionTime);
        queue.poll();
        Assert.assertEquals((long)2L, (long)timestampExtractor.partitionTime);
        queue.poll();
        Assert.assertEquals((long)3L, (long)timestampExtractor.partitionTime);
    }

    class PartitionTimeTrackingTimestampExtractor
    implements TimestampExtractor {
        private long partitionTime = -1L;

        PartitionTimeTrackingTimestampExtractor() {
        }

        public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
            if (partitionTime < this.partitionTime) {
                throw new IllegalStateException("Partition time should not decrease");
            }
            this.partitionTime = partitionTime;
            return record.offset();
        }
    }
}

