/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag(value="integration")
@Timeout(value=600L)
public class StandbyTaskEOSIntegrationTest {
    private static final long REBALANCE_TIMEOUT = Duration.ofMinutes(2L).toMillis();
    private static final int KEY_0 = 0;
    private static final int KEY_1 = 1;
    private final AtomicBoolean skipRecord = new AtomicBoolean(false);
    private String appId;
    private String inputTopic;
    private String storeName;
    private String outputTopic;
    private KafkaStreams streamInstanceOne;
    private KafkaStreams streamInstanceTwo;
    private KafkaStreams streamInstanceOneRecovery;
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void createTopics(TestInfo testInfo) throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        this.appId = "app-" + safeTestName;
        this.inputTopic = "input-" + safeTestName;
        this.outputTopic = "output-" + safeTestName;
        this.storeName = "store-" + safeTestName;
        CLUSTER.deleteTopicsAndWait(this.inputTopic, this.outputTopic, this.appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
        CLUSTER.createTopic(this.inputTopic, 1, 3);
        CLUSTER.createTopic(this.outputTopic, 1, 3);
    }

    @AfterEach
    public void cleanUp() {
        if (this.streamInstanceOne != null) {
            this.streamInstanceOne.close();
        }
        if (this.streamInstanceTwo != null) {
            this.streamInstanceTwo.close();
        }
        if (this.streamInstanceOneRecovery != null) {
            this.streamInstanceOneRecovery.close();
        }
    }

    @ParameterizedTest
    @ValueSource(strings={"exactly_once", "exactly_once_v2"})
    public void shouldSurviveWithOneTaskAsStandby(String eosConfig) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue((Object)0, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 10L);
        String stateDirPath = TestUtils.tempDirectory((String)this.appId).getPath();
        CountDownLatch instanceLatch = new CountDownLatch(1);
        this.streamInstanceOne = this.buildStreamWithDirtyStateDir(stateDirPath + "/" + this.appId + "-1/", instanceLatch, eosConfig);
        this.streamInstanceTwo = this.buildStreamWithDirtyStateDir(stateDirPath + "/" + this.appId + "-2/", instanceLatch, eosConfig);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(this.streamInstanceOne, this.streamInstanceTwo), Duration.ofSeconds(60L));
        Assertions.assertTrue((boolean)instanceLatch.await(15L, TimeUnit.SECONDS));
        this.streamInstanceOne.close();
        this.streamInstanceTwo.close();
        this.streamInstanceOne.cleanUp();
        this.streamInstanceTwo.cleanUp();
    }

    private KafkaStreams buildStreamWithDirtyStateDir(String stateDirPath, CountDownLatch recordProcessLatch, String eosConfig) throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        TaskId taskId = new TaskId(0, 0);
        Properties props = this.props(stateDirPath, eosConfig);
        StateDirectory stateDirectory = new StateDirectory(new StreamsConfig((Map)props), (Time)new MockTime(), true, false);
        new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(taskId), ".checkpoint")).write(Collections.singletonMap(new TopicPartition("unknown-topic", 0), 5L));
        Assertions.assertTrue((boolean)new File(stateDirectory.getOrCreateDirectoryForTask(taskId), "rocksdb/KSTREAM-AGGREGATE-STATE-STORE-0000000001").mkdirs());
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).groupByKey().count().toStream().peek((key, value) -> recordProcessLatch.countDown());
        return new KafkaStreams(builder.build(), props);
    }

    @ParameterizedTest
    @ValueSource(strings={"exactly_once", "exactly_once_v2"})
    public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing(String eosConfig) throws Exception {
        long time = System.currentTimeMillis();
        String base = TestUtils.tempDirectory((String)this.appId).getPath();
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue((Object)0, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 10L + time);
        this.streamInstanceOne = this.buildWithDeduplicationTopology(base + "-1", eosConfig);
        this.streamInstanceTwo = this.buildWithDeduplicationTopology(base + "-2", eosConfig);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(this.streamInstanceOne);
        IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputTopic, 1);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(this.streamInstanceTwo);
        TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)this.streamInstanceTwo.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()).enableStaleStores())).get((Object)0) != null, (long)REBALANCE_TIMEOUT, (String)"Could not get key from standby store");
        TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)this.streamInstanceOne.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()))).get((Object)0) != null, (String)"Could not get key from main store");
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue((Object)1, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 10L + time);
        TestUtils.waitForCondition(() -> this.streamInstanceOne.state() == KafkaStreams.State.ERROR, (String)"Stream instance 1 did not go into error state");
        this.streamInstanceOne.close();
        IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputTopic, 2);
        this.streamInstanceOneRecovery = this.buildWithDeduplicationTopology(base + "-1", eosConfig);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(this.streamInstanceOneRecovery);
        TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)this.streamInstanceOneRecovery.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()).enableStaleStores())).get((Object)0) != null, (String)"Could not get key from recovered standby store");
        this.streamInstanceTwo.close();
        TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)this.streamInstanceOneRecovery.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()))).get((Object)0) != null, (long)REBALANCE_TIMEOUT, (String)"Could not get key from recovered main store");
        this.skipRecord.set(false);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue((Object)1, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 10L + time);
        TestUtils.waitForCondition(() -> this.streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR, (String)("Stream instance 1 did not go into error state. Is in " + this.streamInstanceOneRecovery.state() + " state."));
    }

    private KafkaStreams buildWithDeduplicationTopology(String stateDirPath, String eosConfig) {
        StreamsBuilder builder = new StreamsBuilder();
        builder.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)this.storeName), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        builder.stream(this.inputTopic).transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>(){
            private KeyValueStore store;

            public void init(ProcessorContext context) {
                this.store = (KeyValueStore)context.getStateStore(StandbyTaskEOSIntegrationTest.this.storeName);
            }

            public KeyValue<Integer, Integer> transform(Integer key, Integer value) {
                if (StandbyTaskEOSIntegrationTest.this.skipRecord.get()) {
                    return KeyValue.pair((Object)key, (Object)value);
                }
                if (this.store.get((Object)key) != null) {
                    return null;
                }
                this.store.put((Object)key, (Object)value);
                this.store.flush();
                if (key == 1) {
                    StandbyTaskEOSIntegrationTest.this.skipRecord.set(true);
                    throw new RuntimeException("Injected test error");
                }
                return KeyValue.pair((Object)key, (Object)value);
            }

            public void close() {
            }
        }, new String[]{this.storeName}).to(this.outputTopic);
        return new KafkaStreams(builder.build(), this.props(stateDirPath, eosConfig));
    }

    private Properties props(String stateDirPath, String eosConfig) {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", this.appId);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("statestore.cache.max.bytes", (Object)0);
        streamsConfiguration.put("state.dir", stateDirPath);
        streamsConfiguration.put("num.standby.replicas", (Object)1);
        streamsConfiguration.put("processing.guarantee", eosConfig);
        streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("commit.interval.ms", (Object)1000L);
        streamsConfiguration.put("acceptable.recovery.lag", (Object)0);
        streamsConfiguration.put("auto.offset.reset", "earliest");
        return streamsConfiguration;
    }
}

