/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.TestTaskAssignor;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(value="integration")
@Timeout(value=600L)
public class RebalanceIntegrationTest {
    private static final Logger LOG = LoggerFactory.getLogger(RebalanceIntegrationTest.class);
    private static final int NUM_BROKERS = 3;
    private static final int MAX_POLL_INTERVAL_MS = 30000;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"auto.create.topics.enable", (Object)"true"), Utils.mkEntry((Object)"transaction.max.timeout.ms", (Object)"2147483647")})));
    private String applicationId;
    private static final int NUM_TOPIC_PARTITIONS = 2;
    private static final String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
    private static final String SINGLE_PARTITION_OUTPUT_TOPIC = "singlePartitionOutputTopic";
    private static final AtomicInteger TEST_NUMBER = new AtomicInteger(0);

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void createTopics() throws Exception {
        this.applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
        CLUSTER.deleteTopics(MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
        CLUSTER.createTopics(SINGLE_PARTITION_OUTPUT_TOPIC);
        CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, 2, 1);
    }

    private void checkResultPerKey(List<KeyValue<Long, Long>> result, List<KeyValue<Long, Long>> expectedResult) {
        HashSet<Long> allKeys = new HashSet<Long>();
        this.addAllKeys(allKeys, result);
        this.addAllKeys(allKeys, expectedResult);
        for (Long key : allKeys) {
            MatcherAssert.assertThat((String)"The records do not match what expected", this.getAllRecordPerKey(key, result), (Matcher)CoreMatchers.equalTo(this.getAllRecordPerKey(key, expectedResult)));
        }
    }

    private void addAllKeys(Set<Long> allKeys, List<KeyValue<Long, Long>> records) {
        for (KeyValue<Long, Long> record : records) {
            allKeys.add((Long)record.key);
        }
    }

    private List<KeyValue<Long, Long>> getAllRecordPerKey(Long key, List<KeyValue<Long, Long>> records) {
        ArrayList<KeyValue<Long, Long>> recordsPerKey = new ArrayList<KeyValue<Long, Long>>(records.size());
        for (KeyValue<Long, Long> record : records) {
            if (!((Long)record.key).equals(key)) continue;
            recordsPerKey.add(record);
        }
        return recordsPerKey;
    }

    @Test
    public void shouldCommitAllTasksIfRevokedTaskTriggerPunctuation() throws Exception {
        final AtomicBoolean requestCommit = new AtomicBoolean(false);
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(MULTI_PARTITION_INPUT_TOPIC).process(() -> new Processor<Long, Long, Long, Long>(){
            ProcessorContext context;

            public void init(ProcessorContext<Long, Long> context) {
                this.context = context;
                AtomicReference<Cancellable> cancellable = new AtomicReference<Cancellable>();
                cancellable.set(context.schedule(Duration.ofSeconds(1L), PunctuationType.WALL_CLOCK_TIME, time -> {
                    context.forward(new Record((Object)((long)(context.taskId().partition() + 1) * 100L), (Object)(-((long)context.taskId().partition() + 1L)), context.currentSystemTimeMs()));
                    ((Cancellable)cancellable.get()).cancel();
                }));
            }

            public void process(Record<Long, Long> record) {
                this.context.forward(record.withValue((Object)((RecordMetadata)this.context.recordMetadata().get()).offset()));
                if (requestCommit.get()) {
                    this.context.commit();
                }
            }
        }, new String[0]).to(SINGLE_PARTITION_OUTPUT_TOPIC);
        Properties properties = new Properties();
        properties.put("statestore.cache.max.bytes", (Object)0);
        properties.put("commit.interval.ms", (Object)Integer.MAX_VALUE);
        properties.put(StreamsConfig.consumerPrefix((String)"max.poll.records"), (Object)1);
        properties.put(StreamsConfig.consumerPrefix((String)"metadata.max.age.ms"), "1000");
        properties.put(StreamsConfig.consumerPrefix((String)"auto.offset.reset"), "earliest");
        properties.put(StreamsConfig.consumerPrefix((String)"session.timeout.ms"), (Object)29999);
        properties.put(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms"), (Object)30000);
        properties.put(StreamsConfig.producerPrefix((String)"transaction.timeout.ms"), (Object)Integer.MAX_VALUE);
        properties.put("task.assignor.class", TestTaskAssignor.class.getName());
        Properties config = StreamsTestUtils.getStreamsConfig(this.applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), properties);
        try (KafkaStreams streams = new KafkaStreams(builder.build(), config);){
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
            List<KeyValue<Long, Long>> expectedUncommittedResultBeforeRebalance = Arrays.asList(KeyValue.pair((Object)100L, (Object)-1L), KeyValue.pair((Object)200L, (Object)-2L));
            List<KeyValue<Long, Long>> uncommittedRecordsBeforeRebalance = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), SINGLE_PARTITION_OUTPUT_TOPIC, expectedUncommittedResultBeforeRebalance.size());
            this.checkResultPerKey(uncommittedRecordsBeforeRebalance, expectedUncommittedResultBeforeRebalance);
            streams.addStreamThread();
            List<KeyValue<Long, Long>> expectedUncommittedResultAfterRebalance = Arrays.asList(KeyValue.pair((Object)100L, (Object)-1L), KeyValue.pair((Object)200L, (Object)-2L));
            List<KeyValue<Long, Long>> uncommittedRecordsAfterRebalance = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), SINGLE_PARTITION_OUTPUT_TOPIC, expectedUncommittedResultAfterRebalance.size());
            this.checkResultPerKey(uncommittedRecordsAfterRebalance, expectedUncommittedResultAfterRebalance);
        }
    }
}

