/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class GlobalStreamThreadTest {
    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer(OffsetResetStrategy.NONE);
    private final MockTime time = new MockTime();
    private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private GlobalStreamThread globalStreamThread;
    private StreamsConfig config;
    private static final String GLOBAL_STORE_TOPIC_NAME = "foo";
    private static final String GLOBAL_STORE_NAME = "bar";
    private final TopicPartition topicPartition = new TopicPartition("foo", 0);

    @Before
    public void before() {
        MaterializedInternal materialized = new MaterializedInternal(Materialized.with(null, null), new InternalNameProvider(){

            @Override
            public String newProcessorName(String prefix) {
                return "processorName";
            }

            @Override
            public String newStoreName(String prefix) {
                return GlobalStreamThreadTest.GLOBAL_STORE_NAME;
            }
        }, "store-");
        this.builder.addGlobalStore(new KeyValueStoreMaterializer(materialized).materialize().withLoggingDisabled(), "sourceName", null, null, null, GLOBAL_STORE_TOPIC_NAME, "processorName", new KTableSource(GLOBAL_STORE_NAME));
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("bootstrap.servers", "blah");
        properties.put("application.id", "blah");
        properties.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        this.config = new StreamsConfig(properties);
        this.globalStreamThread = new GlobalStreamThread(this.builder.buildGlobalStateTopology(), this.config, this.mockConsumer, new StateDirectory(this.config, (Time)this.time), 0L, new Metrics(), (Time)new MockTime(), "clientId", this.stateRestoreListener);
    }

    @Test
    public void shouldThrowStreamsExceptionOnStartupIfThereIsAStreamsException() {
        try {
            this.globalStreamThread.start();
            Assert.fail((String)"Should have thrown StreamsException if start up failed");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
        Assert.assertFalse((boolean)this.globalStreamThread.stillRunning());
    }

    @Test
    public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() {
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST){

            @Override
            public List<PartitionInfo> partitionsFor(String topic) {
                throw new RuntimeException("KABOOM!");
            }
        };
        this.globalStreamThread = new GlobalStreamThread(this.builder.buildGlobalStateTopology(), this.config, mockConsumer, new StateDirectory(this.config, (Time)this.time), 0L, new Metrics(), (Time)new MockTime(), "clientId", this.stateRestoreListener);
        try {
            this.globalStreamThread.start();
            Assert.fail((String)"Should have thrown StreamsException if start up failed");
        }
        catch (StreamsException e) {
            MatcherAssert.assertThat((Object)e.getCause(), (Matcher)IsInstanceOf.instanceOf(RuntimeException.class));
            MatcherAssert.assertThat((Object)e.getCause().getMessage(), (Matcher)CoreMatchers.equalTo((Object)"KABOOM!"));
        }
        Assert.assertFalse((boolean)this.globalStreamThread.stillRunning());
    }

    @Test
    public void shouldBeRunningAfterSuccessfulStart() {
        this.initializeConsumer();
        this.globalStreamThread.start();
        Assert.assertTrue((boolean)this.globalStreamThread.stillRunning());
    }

    @Test(timeout=30000L)
    public void shouldStopRunningWhenClosedByUser() throws InterruptedException {
        this.initializeConsumer();
        this.globalStreamThread.start();
        this.globalStreamThread.shutdown();
        this.globalStreamThread.join();
        Assert.assertEquals((Object)GlobalStreamThread.State.DEAD, (Object)this.globalStreamThread.state());
    }

    @Test
    public void shouldCloseStateStoresOnClose() throws InterruptedException {
        this.initializeConsumer();
        this.globalStreamThread.start();
        StateStore globalStore = this.builder.globalStateStores().get(GLOBAL_STORE_NAME);
        Assert.assertTrue((boolean)globalStore.isOpen());
        this.globalStreamThread.shutdown();
        this.globalStreamThread.join();
        Assert.assertFalse((boolean)globalStore.isOpen());
    }

    @Test
    public void shouldTransitionToDeadOnClose() throws InterruptedException {
        this.initializeConsumer();
        this.globalStreamThread.start();
        this.globalStreamThread.shutdown();
        this.globalStreamThread.join();
        Assert.assertEquals((Object)GlobalStreamThread.State.DEAD, (Object)this.globalStreamThread.state());
    }

    @Test
    public void shouldStayDeadAfterTwoCloses() throws InterruptedException {
        this.initializeConsumer();
        this.globalStreamThread.start();
        this.globalStreamThread.shutdown();
        this.globalStreamThread.join();
        this.globalStreamThread.shutdown();
        Assert.assertEquals((Object)GlobalStreamThread.State.DEAD, (Object)this.globalStreamThread.state());
    }

    @Test
    public void shouldTransitionToRunningOnStart() throws InterruptedException {
        this.initializeConsumer();
        this.globalStreamThread.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return GlobalStreamThreadTest.this.globalStreamThread.state() == GlobalStreamThread.State.RUNNING;
            }
        }, (long)10000L, (String)"Thread never started.");
        this.globalStreamThread.shutdown();
    }

    @Test
    public void shouldDieOnInvalidOffsetException() throws Exception {
        this.initializeConsumer();
        this.globalStreamThread.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return GlobalStreamThreadTest.this.globalStreamThread.state() == GlobalStreamThread.State.RUNNING;
            }
        }, (long)10000L, (String)"Thread never started.");
        this.mockConsumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 1L));
        this.mockConsumer.addRecord(new ConsumerRecord<byte[], byte[]>(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return GlobalStreamThreadTest.this.mockConsumer.position(GlobalStreamThreadTest.this.topicPartition) == 1L;
            }
        }, (long)10000L, (String)"Input record never consumed");
        this.mockConsumer.setException(new InvalidOffsetException("Try Again!"){

            @Override
            public Set<TopicPartition> partitions() {
                return Collections.singleton(GlobalStreamThreadTest.this.topicPartition);
            }
        });
        this.mockConsumer.addRecord(new ConsumerRecord<byte[], byte[]>(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return GlobalStreamThreadTest.this.globalStreamThread.state() == GlobalStreamThread.State.DEAD;
            }
        }, (long)10000L, (String)"GlobalStreamThread should have died.");
    }

    private void initializeConsumer() {
        this.mockConsumer.updatePartitions(GLOBAL_STORE_TOPIC_NAME, Collections.singletonList(new PartitionInfo(GLOBAL_STORE_TOPIC_NAME, 0, null, new Node[0], new Node[0])));
        this.mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.topicPartition, 0L));
        this.mockConsumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 0L));
        this.mockConsumer.assign(Collections.singleton(this.topicPartition));
    }
}

