/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;

public class StreamsUpgradeToCooperativeRebalanceTest {
    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires one argument (properties-file) but no args provided");
        }
        System.out.println("Args are " + Arrays.toString(args));
        String propFileName = args[0];
        Properties streamsProperties = Utils.loadProps((String)propFileName);
        Properties config = new Properties();
        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest)");
        System.out.println("props=" + streamsProperties);
        config.put("application.id", "cooperative-rebalance-upgrade");
        config.put("default.key.serde", Serdes.String().getClass());
        config.put("default.value.serde", Serdes.String().getClass());
        config.put("commit.interval.ms", (Object)1000);
        config.putAll((Map<?, ?>)streamsProperties);
        String sourceTopic = streamsProperties.getProperty("source.topic", "source");
        String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
        String taskDelimiter = "#";
        final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
        final String upgradePhase = streamsProperties.getProperty("upgrade.phase", "");
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(sourceTopic).peek((ForeachAction)new ForeachAction<String, String>(){
            int recordCounter = 0;

            public void apply(String key, String value) {
                if (this.recordCounter++ % reportInterval == 0) {
                    System.out.println(String.format("%sProcessed %d records so far", upgradePhase, this.recordCounter));
                    System.out.flush();
                }
            }
        }).to(sinkTopic);
        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
                Set allThreadMetadata = streams.localThreadsMetadata();
                StringBuilder taskReportBuilder = new StringBuilder();
                ArrayList<String> activeTasks = new ArrayList<String>();
                ArrayList<String> standbyTasks = new ArrayList<String>();
                for (ThreadMetadata threadMetadata : allThreadMetadata) {
                    StreamsUpgradeToCooperativeRebalanceTest.getTasks(threadMetadata.activeTasks(), activeTasks);
                    if (threadMetadata.standbyTasks().isEmpty()) continue;
                    StreamsUpgradeToCooperativeRebalanceTest.getTasks(threadMetadata.standbyTasks(), standbyTasks);
                }
                StreamsUpgradeToCooperativeRebalanceTest.addTasksToBuilder(activeTasks, taskReportBuilder);
                taskReportBuilder.append("#");
                if (!standbyTasks.isEmpty()) {
                    StreamsUpgradeToCooperativeRebalanceTest.addTasksToBuilder(standbyTasks, taskReportBuilder);
                }
                System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
            }
            if (newState == KafkaStreams.State.REBALANCING) {
                System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
            }
        });
        streams.start();
        Exit.addShutdownHook((String)"streams-shutdown-hook", () -> {
            streams.close();
            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
            System.out.flush();
        });
    }

    private static void addTasksToBuilder(List<String> tasks, StringBuilder builder) {
        if (!tasks.isEmpty()) {
            for (String task : tasks) {
                builder.append(task).append(",");
            }
            builder.setLength(builder.length() - 1);
        }
    }

    private static void getTasks(Set<TaskMetadata> taskMetadata, List<String> taskList) {
        for (TaskMetadata task : taskMetadata) {
            Set topicPartitions = task.topicPartitions();
            for (TopicPartition topicPartition : topicPartitions) {
                taskList.add(topicPartition.toString());
            }
        }
    }
}

