/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.tests.SystemTestUtil;

public class StreamsStandByReplicaTest {
    public static void main(String[] args) throws IOException {
        if (args.length < 2) {
            System.err.println("StreamsStandByReplicaTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
            Exit.exit((int)1);
        }
        System.out.println("StreamsTest instance started");
        String propFileName = args[0];
        String additionalConfigs = args[1];
        Properties streamsProperties = Utils.loadProps((String)propFileName);
        String kafka = streamsProperties.getProperty("bootstrap.servers");
        if (kafka == null) {
            System.err.println("No bootstrap kafka servers specified in bootstrap.servers");
            Exit.exit((int)1);
        }
        streamsProperties.put("application.id", "kafka-streams-standby-tasks");
        streamsProperties.put("commit.interval.ms", (Object)100L);
        streamsProperties.put("num.standby.replicas", (Object)1);
        streamsProperties.put("statestore.cache.max.bytes", (Object)0);
        streamsProperties.put("default.key.serde", Serdes.String().getClass());
        streamsProperties.put("default.value.serde", Serdes.String().getClass());
        streamsProperties.put(StreamsConfig.producerPrefix((String)"enable.idempotence"), (Object)true);
        if (additionalConfigs == null) {
            System.err.println("additional configs are not provided");
            System.err.flush();
            Exit.exit((int)1);
        }
        Map<String, String> updated = SystemTestUtil.parseConfigs(additionalConfigs);
        System.out.println("Updating configs with " + updated);
        String sourceTopic = updated.remove("sourceTopic");
        String sinkTopic1 = updated.remove("sinkTopic1");
        String sinkTopic2 = updated.remove("sinkTopic2");
        if (sourceTopic == null || sinkTopic1 == null || sinkTopic2 == null) {
            System.err.println(String.format("one or more required topics null sourceTopic[%s], sinkTopic1[%s], sinkTopic2[%s]", sourceTopic, sinkTopic1, sinkTopic2));
            System.err.flush();
            Exit.exit((int)1);
        }
        streamsProperties.putAll(updated);
        if (!StreamsStandByReplicaTest.confirmCorrectConfigs(streamsProperties)) {
            System.err.println(String.format("ERROR: Did not have all required configs expected  to contain %s, %s,  %s,  %s", StreamsConfig.consumerPrefix((String)"max.poll.interval.ms"), StreamsConfig.producerPrefix((String)"retries"), StreamsConfig.producerPrefix((String)"request.timeout.ms"), StreamsConfig.producerPrefix((String)"max.block.ms")));
            Exit.exit((int)1);
        }
        StreamsBuilder builder = new StreamsBuilder();
        String inMemoryStoreName = "in-memory-store";
        String persistentMemoryStoreName = "persistent-memory-store";
        KeyValueBytesStoreSupplier inMemoryStoreSupplier = Stores.inMemoryKeyValueStore((String)"in-memory-store");
        KeyValueBytesStoreSupplier persistentStoreSupplier = Stores.persistentKeyValueStore((String)"persistent-memory-store");
        Serde stringSerde = Serdes.String();
        ValueMapper countMapper = Object::toString;
        KStream inputStream = builder.stream(sourceTopic, Consumed.with((Serde)stringSerde, (Serde)stringSerde));
        inputStream.groupByKey().count(Materialized.as((KeyValueBytesStoreSupplier)inMemoryStoreSupplier)).toStream().mapValues(countMapper).to(sinkTopic1, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        inputStream.groupByKey().count(Materialized.as((KeyValueBytesStoreSupplier)persistentStoreSupplier)).toStream().mapValues(countMapper).to(sinkTopic2, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
        streams.setUncaughtExceptionHandler(e -> {
            System.err.println("FATAL: An unexpected exception " + e);
            e.printStackTrace(System.err);
            System.err.flush();
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
        });
        streams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                Set threadMetadata = streams.metadataForLocalThreads();
                for (ThreadMetadata threadMetadatum : threadMetadata) {
                    System.out.println("ACTIVE_TASKS:" + threadMetadatum.activeTasks().size() + " STANDBY_TASKS:" + threadMetadatum.standbyTasks().size());
                }
            }
        });
        System.out.println("Start Kafka Streams");
        streams.start();
        Exit.addShutdownHook((String)"streams-shutdown-hook", () -> {
            StreamsStandByReplicaTest.shutdown(streams);
            System.out.println("Shut down streams now");
        });
    }

    private static void shutdown(KafkaStreams streams) {
        streams.close(Duration.ofSeconds(10L));
    }

    private static boolean confirmCorrectConfigs(Properties properties) {
        return properties.containsKey(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms")) && properties.containsKey(StreamsConfig.producerPrefix((String)"retries")) && properties.containsKey(StreamsConfig.producerPrefix((String)"request.timeout.ms")) && properties.containsKey(StreamsConfig.producerPrefix((String)"max.block.ms"));
    }
}

