/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.internals.GlobalStateMaintainer;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StateConsumerTest {
    private static final long FLUSH_INTERVAL = 1000L;
    private final TopicPartition topicOne = new TopicPartition("topic-one", 1);
    private final TopicPartition topicTwo = new TopicPartition("topic-two", 1);
    private final MockTime time = new MockTime();
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final Map<TopicPartition, Long> partitionOffsets = new HashMap<TopicPartition, Long>();
    private final LogContext logContext = new LogContext("test ");
    private GlobalStreamThread.StateConsumer stateConsumer;
    private StateMaintainerStub stateMaintainer;

    @Before
    public void setUp() {
        this.partitionOffsets.put(this.topicOne, 20L);
        this.partitionOffsets.put(this.topicTwo, 30L);
        this.stateMaintainer = new StateMaintainerStub(this.partitionOffsets);
        this.stateConsumer = new GlobalStreamThread.StateConsumer(this.logContext, this.consumer, this.stateMaintainer, (Time)this.time, 10L, 1000L);
    }

    @Test
    public void shouldAssignPartitionsToConsumer() {
        this.stateConsumer.initialize();
        Assert.assertEquals(Utils.mkSet(this.topicOne, this.topicTwo), this.consumer.assignment());
    }

    @Test
    public void shouldSeekToInitialOffsets() {
        this.stateConsumer.initialize();
        Assert.assertEquals((long)20L, (long)this.consumer.position(this.topicOne));
        Assert.assertEquals((long)30L, (long)this.consumer.position(this.topicTwo));
    }

    @Test
    public void shouldUpdateStateWithReceivedRecordsForPartition() {
        this.stateConsumer.initialize();
        this.consumer.addRecord(new ConsumerRecord<byte[], byte[]>("topic-one", 1, 20L, new byte[0], new byte[0]));
        this.consumer.addRecord(new ConsumerRecord<byte[], byte[]>("topic-one", 1, 21L, new byte[0], new byte[0]));
        this.stateConsumer.pollAndUpdate();
        Assert.assertEquals((long)2L, (long)((Integer)this.stateMaintainer.updatedPartitions.get(this.topicOne)).intValue());
    }

    @Test
    public void shouldUpdateStateWithReceivedRecordsForAllTopicPartition() {
        this.stateConsumer.initialize();
        this.consumer.addRecord(new ConsumerRecord<byte[], byte[]>("topic-one", 1, 20L, new byte[0], new byte[0]));
        this.consumer.addRecord(new ConsumerRecord<byte[], byte[]>("topic-two", 1, 31L, new byte[0], new byte[0]));
        this.consumer.addRecord(new ConsumerRecord<byte[], byte[]>("topic-two", 1, 32L, new byte[0], new byte[0]));
        this.stateConsumer.pollAndUpdate();
        Assert.assertEquals((long)1L, (long)((Integer)this.stateMaintainer.updatedPartitions.get(this.topicOne)).intValue());
        Assert.assertEquals((long)2L, (long)((Integer)this.stateMaintainer.updatedPartitions.get(this.topicTwo)).intValue());
    }

    @Test
    public void shouldFlushStoreWhenFlushIntervalHasLapsed() {
        this.stateConsumer.initialize();
        this.consumer.addRecord(new ConsumerRecord<byte[], byte[]>("topic-one", 1, 20L, new byte[0], new byte[0]));
        this.time.sleep(1000L);
        this.stateConsumer.pollAndUpdate();
        Assert.assertTrue((boolean)this.stateMaintainer.flushed);
    }

    @Test
    public void shouldNotFlushOffsetsWhenFlushIntervalHasNotLapsed() {
        this.stateConsumer.initialize();
        this.consumer.addRecord(new ConsumerRecord<byte[], byte[]>("topic-one", 1, 20L, new byte[0], new byte[0]));
        this.time.sleep(500L);
        this.stateConsumer.pollAndUpdate();
        Assert.assertFalse((boolean)this.stateMaintainer.flushed);
    }

    @Test
    public void shouldNotFlushWhenFlushIntervalIsZero() {
        this.stateConsumer = new GlobalStreamThread.StateConsumer(this.logContext, this.consumer, this.stateMaintainer, (Time)this.time, 10L, -1L);
        this.stateConsumer.initialize();
        this.time.sleep(100L);
        this.stateConsumer.pollAndUpdate();
        Assert.assertFalse((boolean)this.stateMaintainer.flushed);
    }

    @Test
    public void shouldCloseConsumer() throws IOException {
        this.stateConsumer.close();
        Assert.assertTrue((boolean)this.consumer.closed());
    }

    @Test
    public void shouldCloseStateMaintainer() throws IOException {
        this.stateConsumer.close();
        Assert.assertTrue((boolean)this.stateMaintainer.closed);
    }

    private static class StateMaintainerStub
    implements GlobalStateMaintainer {
        private final Map<TopicPartition, Long> partitionOffsets;
        private final Map<TopicPartition, Integer> updatedPartitions = new HashMap<TopicPartition, Integer>();
        private boolean flushed;
        private boolean closed;

        StateMaintainerStub(Map<TopicPartition, Long> partitionOffsets) {
            this.partitionOffsets = partitionOffsets;
        }

        @Override
        public Map<TopicPartition, Long> initialize() {
            return this.partitionOffsets;
        }

        @Override
        public void flushState() {
            this.flushed = true;
        }

        @Override
        public void close() {
            this.closed = true;
        }

        @Override
        public void update(ConsumerRecord<byte[], byte[]> record) {
            TopicPartition tp = new TopicPartition(record.topic(), record.partition());
            if (!this.updatedPartitions.containsKey(tp)) {
                this.updatedPartitions.put(tp, 0);
            }
            this.updatedPartitions.put(tp, this.updatedPartitions.get(tp) + 1);
        }
    }
}

