/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.internals.GlobalStateMaintainer;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class StateConsumerTest {
    private final TopicPartition topicOne = new TopicPartition("topic-one", 1);
    private final TopicPartition topicTwo = new TopicPartition("topic-two", 1);
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final Map<TopicPartition, Long> partitionOffsets = new HashMap<TopicPartition, Long>();
    private final LogContext logContext = new LogContext("test ");
    private GlobalStreamThread.StateConsumer stateConsumer;
    private TaskStub stateMaintainer;

    @BeforeEach
    public void setUp() {
        this.partitionOffsets.put(this.topicOne, 20L);
        this.partitionOffsets.put(this.topicTwo, 30L);
        this.stateMaintainer = new TaskStub(this.partitionOffsets);
        this.stateConsumer = new GlobalStreamThread.StateConsumer(this.logContext, this.consumer, (GlobalStateMaintainer)this.stateMaintainer, Duration.ofMillis(10L));
    }

    @Test
    public void shouldAssignPartitionsToConsumer() {
        this.stateConsumer.initialize();
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.topicOne, this.topicTwo}), (Object)this.consumer.assignment());
    }

    @Test
    public void shouldSeekToInitialOffsets() {
        this.stateConsumer.initialize();
        Assertions.assertEquals((long)20L, (long)this.consumer.position(this.topicOne));
        Assertions.assertEquals((long)30L, (long)this.consumer.position(this.topicTwo));
    }

    @Test
    public void shouldUpdateStateWithReceivedRecordsForPartition() {
        this.stateConsumer.initialize();
        this.consumer.addRecord(new ConsumerRecord("topic-one", 1, 20L, (Object)new byte[0], (Object)new byte[0]));
        this.consumer.addRecord(new ConsumerRecord("topic-one", 1, 21L, (Object)new byte[0], (Object)new byte[0]));
        this.stateConsumer.pollAndUpdate();
        Assertions.assertEquals((int)2, (int)((Integer)this.stateMaintainer.updatedPartitions.get(this.topicOne)));
        Assertions.assertTrue((boolean)this.stateMaintainer.flushed);
    }

    @Test
    public void shouldUpdateStateWithReceivedRecordsForAllTopicPartition() {
        this.stateConsumer.initialize();
        this.consumer.addRecord(new ConsumerRecord("topic-one", 1, 20L, (Object)new byte[0], (Object)new byte[0]));
        this.consumer.addRecord(new ConsumerRecord("topic-two", 1, 31L, (Object)new byte[0], (Object)new byte[0]));
        this.consumer.addRecord(new ConsumerRecord("topic-two", 1, 32L, (Object)new byte[0], (Object)new byte[0]));
        this.stateConsumer.pollAndUpdate();
        Assertions.assertEquals((int)1, (int)((Integer)this.stateMaintainer.updatedPartitions.get(this.topicOne)));
        Assertions.assertEquals((int)2, (int)((Integer)this.stateMaintainer.updatedPartitions.get(this.topicTwo)));
        Assertions.assertTrue((boolean)this.stateMaintainer.flushed);
    }

    @Test
    public void shouldCloseConsumer() throws IOException {
        this.stateConsumer.close(false);
        Assertions.assertTrue((boolean)this.consumer.closed());
    }

    @Test
    public void shouldCloseStateMaintainer() throws IOException {
        this.stateConsumer.close(false);
        Assertions.assertTrue((boolean)this.stateMaintainer.closed);
    }

    @Test
    public void shouldWipeStoreOnClose() throws IOException {
        this.stateConsumer.close(true);
        Assertions.assertTrue((boolean)this.stateMaintainer.wipeStore);
    }

    private static class TaskStub
    implements GlobalStateMaintainer {
        private final Map<TopicPartition, Long> partitionOffsets;
        private final Map<TopicPartition, Integer> updatedPartitions = new HashMap<TopicPartition, Integer>();
        private boolean flushed;
        private boolean wipeStore;
        private boolean closed;

        TaskStub(Map<TopicPartition, Long> partitionOffsets) {
            this.partitionOffsets = partitionOffsets;
        }

        public Map<TopicPartition, Long> initialize() {
            return this.partitionOffsets;
        }

        public void flushState() {
            this.flushed = true;
        }

        public void close(boolean wipeStateStore) {
            this.closed = true;
            this.wipeStore = wipeStateStore;
        }

        public void update(ConsumerRecord<byte[], byte[]> record) {
            TopicPartition tp = new TopicPartition(record.topic(), record.partition());
            if (!this.updatedPartitions.containsKey(tp)) {
                this.updatedPartitions.put(tp, 0);
            }
            this.updatedPartitions.put(tp, this.updatedPartitions.get(tp) + 1);
        }

        public void maybeCheckpoint() {
            this.flushState();
        }
    }
}

