/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.ProcessorTopologyFactories;
import org.apache.kafka.streams.processor.internals.testutil.ConsumerRecordUtil;
import org.apache.kafka.test.GlobalStateManagerStub;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.NoOpProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class GlobalStateTaskTest {
    private final LogContext logContext = new LogContext();
    private final String topic1 = "t1";
    private final String topic2 = "t2";
    private final TopicPartition t1 = new TopicPartition("t1", 1);
    private final TopicPartition t2 = new TopicPartition("t2", 1);
    private final MockSourceNode<String, String> sourceOne = new MockSourceNode(new StringDeserializer(), new StringDeserializer());
    private final MockSourceNode<Integer, Integer> sourceTwo = new MockSourceNode(new IntegerDeserializer(), new IntegerDeserializer());
    private final MockProcessorNode<?, ?, ?, ?> processorOne = new MockProcessorNode();
    private final MockProcessorNode<?, ?, ?, ?> processorTwo = new MockProcessorNode();
    private final Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
    private final File testDirectory = TestUtils.tempDirectory((String)"global-store");
    private final NoOpProcessorContext context = new NoOpProcessorContext();
    private final MockTime time = new MockTime();
    private final long flushInterval = 1000L;
    private final long currentOffsetT1 = 50L;
    private final long currentOffsetT2 = 100L;
    private ProcessorTopology topology;
    private GlobalStateManagerStub stateMgr;
    private GlobalStateUpdateTask globalStateTask;

    @BeforeEach
    public void before() {
        Set storeNames = Utils.mkSet((Object[])new String[]{"t1-store", "t2-store"});
        HashMap sourceByTopics = new HashMap();
        sourceByTopics.put("t1", this.sourceOne);
        sourceByTopics.put("t2", this.sourceTwo);
        HashMap<String, String> storeToTopic = new HashMap<String, String>();
        storeToTopic.put("t1-store", "t1");
        storeToTopic.put("t2-store", "t2");
        this.topology = ProcessorTopologyFactories.with(Arrays.asList(new ProcessorNode[]{this.sourceOne, this.sourceTwo, this.processorOne, this.processorTwo}), sourceByTopics, Collections.emptyList(), storeToTopic);
        this.offsets.put(this.t1, 50L);
        this.offsets.put(this.t2, 100L);
        this.stateMgr = new GlobalStateManagerStub(storeNames, this.offsets, this.testDirectory);
        this.globalStateTask = new GlobalStateUpdateTask(this.logContext, this.topology, (InternalProcessorContext)this.context, (GlobalStateManager)this.stateMgr, (DeserializationExceptionHandler)new LogAndFailExceptionHandler(), (Time)this.time, 1000L);
    }

    @Test
    public void shouldInitializeStateManager() {
        Map startingOffsets = this.globalStateTask.initialize();
        Assertions.assertTrue((boolean)this.stateMgr.initialized);
        Assertions.assertEquals(this.offsets, (Object)startingOffsets);
    }

    @Test
    public void shouldInitializeContext() {
        this.globalStateTask.initialize();
        Assertions.assertTrue((boolean)this.context.initialized);
    }

    @Test
    public void shouldInitializeProcessorTopology() {
        this.globalStateTask.initialize();
        Assertions.assertTrue((boolean)this.sourceOne.initialized);
        Assertions.assertTrue((boolean)this.sourceTwo.initialized);
        Assertions.assertTrue((boolean)this.processorOne.initialized);
        Assertions.assertTrue((boolean)this.processorTwo.initialized);
    }

    @Test
    public void shouldProcessRecordsForTopic() {
        this.globalStateTask.initialize();
        this.globalStateTask.update(ConsumerRecordUtil.record("t1", 1, 1L, "foo".getBytes(), "bar".getBytes()));
        Assertions.assertEquals((int)1, (int)this.sourceOne.numReceived);
        Assertions.assertEquals((int)0, (int)this.sourceTwo.numReceived);
    }

    @Test
    public void shouldProcessRecordsForOtherTopic() {
        byte[] integerBytes = new IntegerSerializer().serialize("foo", Integer.valueOf(1));
        this.globalStateTask.initialize();
        this.globalStateTask.update(ConsumerRecordUtil.record("t2", 1, 1L, integerBytes, integerBytes));
        Assertions.assertEquals((int)1, (int)this.sourceTwo.numReceived);
        Assertions.assertEquals((int)0, (int)this.sourceOne.numReceived);
    }

    private void maybeDeserialize(GlobalStateUpdateTask globalStateTask, byte[] key, byte[] recordValue, boolean failExpected) {
        block3: {
            ConsumerRecord record = new ConsumerRecord("t2", 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)key, (Object)recordValue, (Headers)new RecordHeaders(), Optional.empty());
            globalStateTask.initialize();
            try {
                globalStateTask.update(record);
                if (failExpected) {
                    Assertions.fail((String)"Should have failed to deserialize.");
                }
            }
            catch (StreamsException e) {
                if (failExpected) break block3;
                Assertions.fail((String)"Shouldn't have failed to deserialize.");
            }
        }
    }

    @Test
    public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
        byte[] key = new LongSerializer().serialize("t2", Long.valueOf(1L));
        byte[] recordValue = new IntegerSerializer().serialize("t2", Integer.valueOf(10));
        this.maybeDeserialize(this.globalStateTask, key, recordValue, true);
    }

    @Test
    public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
        byte[] key = new IntegerSerializer().serialize("t2", Integer.valueOf(1));
        byte[] recordValue = new LongSerializer().serialize("t2", Long.valueOf(10L));
        this.maybeDeserialize(this.globalStateTask, key, recordValue, true);
    }

    @Test
    public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
        GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(this.logContext, this.topology, (InternalProcessorContext)this.context, (GlobalStateManager)this.stateMgr, (DeserializationExceptionHandler)new LogAndContinueExceptionHandler(), (Time)this.time, 1000L);
        byte[] key = new LongSerializer().serialize("t2", Long.valueOf(1L));
        byte[] recordValue = new IntegerSerializer().serialize("t2", Integer.valueOf(10));
        this.maybeDeserialize(globalStateTask2, key, recordValue, false);
    }

    @Test
    public void shouldNotThrowStreamsExceptionWhenValueDeserializationFails() {
        GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(this.logContext, this.topology, (InternalProcessorContext)this.context, (GlobalStateManager)this.stateMgr, (DeserializationExceptionHandler)new LogAndContinueExceptionHandler(), (Time)this.time, 1000L);
        byte[] key = new IntegerSerializer().serialize("t2", Integer.valueOf(1));
        byte[] recordValue = new LongSerializer().serialize("t2", Long.valueOf(10L));
        this.maybeDeserialize(globalStateTask2, key, recordValue, false);
    }

    @Test
    public void shouldFlushStateManagerWithOffsets() {
        HashMap<TopicPartition, Long> expectedOffsets = new HashMap<TopicPartition, Long>();
        expectedOffsets.put(this.t1, 52L);
        expectedOffsets.put(this.t2, 100L);
        this.globalStateTask.initialize();
        this.globalStateTask.update(ConsumerRecordUtil.record("t1", 1, 51L, "foo".getBytes(), "foo".getBytes()));
        this.globalStateTask.flushState();
        Assertions.assertEquals(expectedOffsets, this.stateMgr.changelogOffsets());
        Assertions.assertTrue((boolean)this.stateMgr.flushed);
    }

    @Test
    public void shouldCheckpointOffsetsWhenStateIsFlushed() {
        HashMap<TopicPartition, Long> expectedOffsets = new HashMap<TopicPartition, Long>();
        expectedOffsets.put(this.t1, 102L);
        expectedOffsets.put(this.t2, 100L);
        this.globalStateTask.initialize();
        this.globalStateTask.update(ConsumerRecordUtil.record("t1", 1, 101L, "foo".getBytes(), "foo".getBytes()));
        this.globalStateTask.flushState();
        Assertions.assertEquals(expectedOffsets, this.stateMgr.changelogOffsets());
        Assertions.assertTrue((boolean)this.stateMgr.checkpointWritten);
    }

    @Test
    public void shouldNotCheckpointIfNotReceivedEnoughRecords() {
        this.globalStateTask.initialize();
        this.globalStateTask.update(ConsumerRecordUtil.record("t1", 1, 9050L, "foo".getBytes(), "foo".getBytes()));
        this.time.sleep(1000L);
        this.globalStateTask.maybeCheckpoint();
        Assertions.assertEquals(this.offsets, this.stateMgr.changelogOffsets());
        Assertions.assertFalse((boolean)this.stateMgr.flushed);
        Assertions.assertFalse((boolean)this.stateMgr.checkpointWritten);
    }

    @Test
    public void shouldNotCheckpointWhenFlushIntervalHasNotLapsed() {
        this.globalStateTask.initialize();
        this.globalStateTask.update(ConsumerRecordUtil.record("t1", 1, 10050L, "foo".getBytes(), "foo".getBytes()));
        this.time.sleep(500L);
        this.globalStateTask.maybeCheckpoint();
        Assertions.assertEquals(this.offsets, this.stateMgr.changelogOffsets());
        Assertions.assertFalse((boolean)this.stateMgr.flushed);
        Assertions.assertFalse((boolean)this.stateMgr.checkpointWritten);
    }

    @Test
    public void shouldCheckpointIfReceivedEnoughRecordsAndFlushIntervalHasElapsed() {
        HashMap<TopicPartition, Long> expectedOffsets = new HashMap<TopicPartition, Long>();
        expectedOffsets.put(this.t1, 10051L);
        expectedOffsets.put(this.t2, 100L);
        this.globalStateTask.initialize();
        this.time.sleep(1000L);
        this.globalStateTask.update(ConsumerRecordUtil.record("t1", 1, 10049L, "foo".getBytes(), "foo".getBytes()));
        this.globalStateTask.maybeCheckpoint();
        Assertions.assertEquals(this.offsets, this.stateMgr.changelogOffsets());
        Assertions.assertFalse((boolean)this.stateMgr.flushed);
        Assertions.assertFalse((boolean)this.stateMgr.checkpointWritten);
        this.globalStateTask.update(ConsumerRecordUtil.record("t1", 1, 10050L, "foo".getBytes(), "foo".getBytes()));
        this.globalStateTask.maybeCheckpoint();
        Assertions.assertEquals(expectedOffsets, this.stateMgr.changelogOffsets());
        Assertions.assertTrue((boolean)this.stateMgr.flushed);
        Assertions.assertTrue((boolean)this.stateMgr.checkpointWritten);
    }

    @Test
    public void shouldCheckpointIfReceivedEnoughRecordsFromMultipleTopicsAndFlushIntervalElapsed() {
        byte[] integerBytes = new IntegerSerializer().serialize("t2", Integer.valueOf(1));
        HashMap<TopicPartition, Long> expectedOffsets = new HashMap<TopicPartition, Long>();
        expectedOffsets.put(this.t1, 9050L);
        expectedOffsets.put(this.t2, 1101L);
        this.globalStateTask.initialize();
        this.time.sleep(1000L);
        this.globalStateTask.update(ConsumerRecordUtil.record("t1", 1, 9049L, "foo".getBytes(), "foo".getBytes()));
        this.globalStateTask.update(ConsumerRecordUtil.record("t2", 1, 1100L, integerBytes, integerBytes));
        this.globalStateTask.maybeCheckpoint();
        Assertions.assertEquals(expectedOffsets, this.stateMgr.changelogOffsets());
        Assertions.assertTrue((boolean)this.stateMgr.flushed);
        Assertions.assertTrue((boolean)this.stateMgr.checkpointWritten);
    }

    @Test
    public void shouldWipeGlobalStateDirectory() throws Exception {
        Assertions.assertTrue((boolean)this.stateMgr.baseDir().exists());
        this.globalStateTask.close(true);
        Assertions.assertFalse((boolean)this.stateMgr.baseDir().exists());
    }
}

