/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class RestoreIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String INPUT_STREAM = "input-stream";
    private static final String INPUT_STREAM_2 = "input-stream-2";
    private final int numberOfKeys = 10000;
    private KafkaStreams kafkaStreams;
    private String applicationId = "restore-test";

    @BeforeClass
    public static void createTopics() throws InterruptedException {
        CLUSTER.createTopic(INPUT_STREAM, 2, 1);
        CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
    }

    private Properties props(String applicationId) {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", applicationId);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("auto.offset.reset", "earliest");
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory((String)applicationId).getPath());
        streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("commit.interval.ms", (Object)1000);
        return streamsConfiguration;
    }

    @After
    public void shutdown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(30L, TimeUnit.SECONDS);
        }
    }

    @Test
    public void shouldRestoreState() throws ExecutionException, InterruptedException {
        final AtomicInteger numReceived = new AtomicInteger(0);
        StreamsBuilder builder = new StreamsBuilder();
        this.createStateForRestoration();
        builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer())).toStream().foreach(new ForeachAction<Integer, Integer>(){

            @Override
            public void apply(Integer key, Integer value) {
                numReceived.incrementAndGet();
            }
        });
        final CountDownLatch startupLatch = new CountDownLatch(1);
        this.kafkaStreams = new KafkaStreams(builder.build(), this.props(this.applicationId));
        this.kafkaStreams.setStateListener(new KafkaStreams.StateListener(){

            @Override
            public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                    startupLatch.countDown();
                }
            }
        });
        final AtomicLong restored = new AtomicLong(0L);
        this.kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener(){

            @Override
            public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
            }

            @Override
            public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
            }

            @Override
            public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
                restored.addAndGet(totalRestored);
            }
        });
        this.kafkaStreams.start();
        Assert.assertTrue((boolean)startupLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat((Object)restored.get(), (Matcher)IsEqual.equalTo((Object)10000L));
        MatcherAssert.assertThat((Object)numReceived.get(), (Matcher)IsEqual.equalTo((Object)0));
    }

    @Test
    public void shouldSuccessfullyStartWhenLoggingDisabled() throws InterruptedException {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream(INPUT_STREAM);
        stream.groupByKey().reduce(new Reducer<Integer>(){

            @Override
            public Integer apply(Integer value1, Integer value2) {
                return value1 + value2;
            }
        }, Materialized.as("reduce-store").withLoggingDisabled());
        final CountDownLatch startupLatch = new CountDownLatch(1);
        this.kafkaStreams = new KafkaStreams(builder.build(), this.props(this.applicationId));
        this.kafkaStreams.setStateListener(new KafkaStreams.StateListener(){

            @Override
            public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                    startupLatch.countDown();
                }
            }
        });
        this.kafkaStreams.start();
        Assert.assertTrue((boolean)startupLatch.await(30L, TimeUnit.SECONDS));
    }

    @Test
    public void shouldProcessDataFromStoresWithLoggingDisabled() throws InterruptedException, ExecutionException {
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_STREAM_2, Arrays.asList(KeyValue.pair(1, 1), KeyValue.pair(2, 2), KeyValue.pair(3, 3)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), (Time)RestoreIntegrationTest.CLUSTER.time);
        KeyValueBytesStoreSupplier lruMapSupplier = Stores.lruMap(INPUT_STREAM_2, 10);
        StoreBuilder storeBuilder = new KeyValueStoreBuilder<Integer, Integer>(lruMapSupplier, Serdes.Integer(), Serdes.Integer(), (Time)RestoreIntegrationTest.CLUSTER.time).withLoggingDisabled();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(storeBuilder);
        KStream<Integer, Integer> stream = streamsBuilder.stream(INPUT_STREAM_2);
        final CountDownLatch processorLatch = new CountDownLatch(3);
        stream.process(new ProcessorSupplier<Integer, Integer>(){

            @Override
            public Processor<Integer, Integer> get() {
                return new KeyValueStoreProcessor(RestoreIntegrationTest.INPUT_STREAM_2, processorLatch);
            }
        }, INPUT_STREAM_2);
        Topology topology = streamsBuilder.build();
        this.kafkaStreams = new KafkaStreams(topology, this.props(this.applicationId + "-logging-disabled"));
        final CountDownLatch latch = new CountDownLatch(1);
        this.kafkaStreams.setStateListener(new KafkaStreams.StateListener(){

            @Override
            public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                    latch.countDown();
                }
            }
        });
        this.kafkaStreams.start();
        latch.await(30L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)processorLatch.await(30L, TimeUnit.SECONDS));
    }

    private void createStateForRestoration() throws ExecutionException, InterruptedException {
        Properties producerConfig = new Properties();
        producerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        try (KafkaProducer<Integer, Integer> producer = new KafkaProducer<Integer, Integer>(producerConfig, (Serializer<Integer>)new IntegerSerializer(), (Serializer<Integer>)new IntegerSerializer());){
            for (int i = 0; i < 10000; ++i) {
                producer.send(new ProducerRecord<Integer, Integer>(INPUT_STREAM, i, i));
            }
        }
        Properties consumerConfig = new Properties();
        consumerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerConfig.put("group.id", this.applicationId);
        consumerConfig.put("key.deserializer", IntegerDeserializer.class);
        consumerConfig.put("value.deserializer", IntegerDeserializer.class);
        KafkaConsumer consumer = new KafkaConsumer(consumerConfig);
        List<TopicPartition> partitions = Arrays.asList(new TopicPartition(INPUT_STREAM, 0), new TopicPartition(INPUT_STREAM, 1));
        consumer.assign(partitions);
        consumer.seekToEnd(partitions);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (TopicPartition partition : partitions) {
            long position = consumer.position(partition);
            offsets.put(partition, new OffsetAndMetadata(position + 1L));
        }
        consumer.commitSync(offsets);
        consumer.close();
    }

    public static class KeyValueStoreProcessor
    implements Processor<Integer, Integer> {
        private String topic;
        private final CountDownLatch processorLatch;
        private KeyValueStore<Integer, Integer> store;

        public KeyValueStoreProcessor(String topic, CountDownLatch processorLatch) {
            this.topic = topic;
            this.processorLatch = processorLatch;
        }

        @Override
        public void init(ProcessorContext context) {
            this.store = (KeyValueStore)context.getStateStore(this.topic);
        }

        @Override
        public void process(Integer key, Integer value) {
            if (key != null) {
                this.store.put(key, value);
                this.processorLatch.countDown();
            }
        }

        @Override
        public void punctuate(long timestamp) {
        }

        @Override
        public void close() {
        }
    }
}

