/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.test.TestUtils;

public class StreamsBrokerDownResilienceTest {
    private static final int KEY = 0;
    private static final int VALUE = 1;
    private static final String SOURCE_TOPIC_1 = "streamsResilienceSource";
    private static final String SINK_TOPIC = "streamsResilienceSink";

    public static void main(String[] args) {
        System.out.println("StreamsTest instance started");
        String kafka = args.length > 0 ? args[0] : "localhost:9092";
        String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
        String additionalConfigs = args.length > 2 ? args[2] : null;
        Serde<String> stringSerde = Serdes.String();
        Properties streamsProperties = new Properties();
        streamsProperties.put("bootstrap.servers", kafka);
        streamsProperties.put("application.id", "kafka-streams-resilience");
        streamsProperties.put("default.key.serde", Serdes.String().getClass());
        streamsProperties.put("default.value.serde", Serdes.String().getClass());
        streamsProperties.put("commit.interval.ms", (Object)100);
        if (additionalConfigs != null && !additionalConfigs.equalsIgnoreCase("none")) {
            Map<String, String> updated = StreamsBrokerDownResilienceTest.updatedConfigs(additionalConfigs);
            System.out.println("Updating configs with " + updated);
            streamsProperties.putAll(updated);
        }
        if (!StreamsBrokerDownResilienceTest.confirmCorrectConfigs(streamsProperties)) {
            System.err.println(String.format("ERROR: Did not have all required configs expected  to contain %s %s %s %s", StreamsConfig.consumerPrefix("max.poll.interval.ms"), StreamsConfig.producerPrefix("retries"), StreamsConfig.producerPrefix("request.timeout.ms"), StreamsConfig.producerPrefix("max.block.ms")));
            System.exit(1);
        }
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(Collections.singletonList(SOURCE_TOPIC_1), Consumed.with(stringSerde, stringSerde)).peek(new ForeachAction<String, String>(){

            @Override
            public void apply(String key, String value) {
                System.out.println("received key " + key + " and value " + value);
            }
        }).to(SINK_TOPIC);
        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.err.println("FATAL: An unexpected exception " + e);
                System.err.flush();
                streams.close(30L, TimeUnit.SECONDS);
            }
        });
        System.out.println("Start Kafka Streams");
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                System.out.println("Shutting down streams now");
                streams.close(10L, TimeUnit.SECONDS);
            }
        }));
    }

    private static boolean confirmCorrectConfigs(Properties properties) {
        return properties.containsKey(StreamsConfig.consumerPrefix("max.poll.interval.ms")) && properties.containsKey(StreamsConfig.producerPrefix("retries")) && properties.containsKey(StreamsConfig.producerPrefix("request.timeout.ms")) && properties.containsKey(StreamsConfig.producerPrefix("max.block.ms"));
    }

    private static Map<String, String> updatedConfigs(String formattedConfigs) {
        String[] parts = formattedConfigs.split(",");
        HashMap<String, String> updatedConfigs = new HashMap<String, String>();
        for (String part : parts) {
            String[] keyValue = part.split("=");
            updatedConfigs.put(keyValue[0], keyValue[1]);
        }
        return updatedConfigs;
    }
}

