/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.tests.SmokeTestUtil;

public class EosTestClient
extends SmokeTestUtil {
    static final String APP_ID = "EosTest";
    private final Properties properties;
    private final boolean withRepartitioning;
    private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false);
    private KafkaStreams streams;
    private boolean uncaughtException;
    private volatile boolean isRunning = true;

    EosTestClient(Properties properties, boolean withRepartitioning) {
        this.properties = properties;
        this.withRepartitioning = withRepartitioning;
    }

    public void start() {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                EosTestClient.this.isRunning = false;
                EosTestClient.this.streams.close(Duration.ofSeconds(300L));
                EosTestClient.this.waitForStateTransitionCallback();
                if (!EosTestClient.this.uncaughtException) {
                    System.out.println(System.currentTimeMillis());
                    System.out.println("EOS-TEST-CLIENT-CLOSED");
                    System.out.flush();
                }
            }
        }));
        while (this.isRunning) {
            if (this.streams == null) {
                this.uncaughtException = false;
                this.streams = this.createKafkaStreams(this.properties);
                this.streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                    @Override
                    public void uncaughtException(Thread t, Throwable e) {
                        System.out.println(System.currentTimeMillis());
                        System.out.println("EOS-TEST-CLIENT-EXCEPTION");
                        e.printStackTrace();
                        System.out.flush();
                        EosTestClient.this.uncaughtException = true;
                    }
                });
                this.streams.setStateListener(new KafkaStreams.StateListener(){

                    public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
                        System.out.println(System.currentTimeMillis());
                        System.out.println("StateChange: " + oldState + " -> " + newState);
                        System.out.flush();
                        if (newState == KafkaStreams.State.NOT_RUNNING) {
                            EosTestClient.this.notRunningCallbackReceived.set(true);
                        }
                    }
                });
                this.streams.start();
            }
            if (this.uncaughtException) {
                this.streams.close(Duration.ofSeconds(60000L));
                this.streams = null;
            }
            EosTestClient.sleep(1000L);
        }
    }

    private KafkaStreams createKafkaStreams(Properties props) {
        props.put("application.id", APP_ID);
        props.put("num.stream.threads", (Object)1);
        props.put("num.standby.replicas", (Object)2);
        props.put("replication.factor", (Object)3);
        props.put("processing.guarantee", "exactly_once");
        props.put("cache.max.bytes.buffering", (Object)0);
        props.put("default.key.serde", Serdes.String().getClass());
        props.put("default.value.serde", Serdes.Integer().getClass());
        StreamsBuilder builder = new StreamsBuilder();
        KStream data = builder.stream("data");
        data.to("echo");
        data.process(SmokeTestUtil.printProcessorSupplier("data"), new String[0]);
        KGroupedStream groupedData = data.groupByKey();
        groupedData.aggregate((Initializer)new Initializer<Integer>(){

            public Integer apply() {
                return Integer.MAX_VALUE;
            }
        }, (Aggregator)new Aggregator<String, Integer, Integer>(){

            public Integer apply(String aggKey, Integer value, Integer aggregate) {
                return value < aggregate ? value : aggregate;
            }
        }, Materialized.with(null, (Serde)intSerde)).toStream().to("min", Produced.with((Serde)stringSerde, (Serde)intSerde));
        groupedData.aggregate((Initializer)new Initializer<Long>(){

            public Long apply() {
                return 0L;
            }
        }, (Aggregator)new Aggregator<String, Integer, Long>(){

            public Long apply(String aggKey, Integer value, Long aggregate) {
                return (long)value.intValue() + aggregate;
            }
        }, Materialized.with(null, (Serde)longSerde)).toStream().to("sum", Produced.with((Serde)stringSerde, (Serde)longSerde));
        if (this.withRepartitioning) {
            KStream repartitionedData = data.through("repartition");
            repartitionedData.process(SmokeTestUtil.printProcessorSupplier("repartition"), new String[0]);
            KGroupedStream groupedDataAfterRepartitioning = repartitionedData.groupByKey();
            groupedDataAfterRepartitioning.aggregate((Initializer)new Initializer<Integer>(){

                public Integer apply() {
                    return Integer.MIN_VALUE;
                }
            }, (Aggregator)new Aggregator<String, Integer, Integer>(){

                public Integer apply(String aggKey, Integer value, Integer aggregate) {
                    return value > aggregate ? value : aggregate;
                }
            }, Materialized.with(null, (Serde)intSerde)).toStream().to("max", Produced.with((Serde)stringSerde, (Serde)intSerde));
            groupedDataAfterRepartitioning.count().toStream().to("cnt", Produced.with((Serde)stringSerde, (Serde)longSerde));
        }
        return new KafkaStreams(builder.build(), props);
    }

    private void waitForStateTransitionCallback() {
        long maxWaitTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(300L);
        while (!this.notRunningCallbackReceived.get() && System.currentTimeMillis() < maxWaitTime) {
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException interruptedException) {}
        }
        if (!this.notRunningCallbackReceived.get()) {
            System.err.println("State transition callback to NOT_RUNNING never received. Timed out after 5 minutes.");
            System.err.flush();
        }
    }
}

