/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.io.File;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.tests.SmokeTestUtil;

public class EosTestClient
extends SmokeTestUtil {
    static final String APP_ID = "EosTest";
    private final String kafka;
    private final File stateDir;
    private final boolean withRepartitioning;
    private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false);
    private KafkaStreams streams;
    private boolean uncaughtException;
    private volatile boolean isRunning = true;

    EosTestClient(String kafka, File stateDir, boolean withRepartitioning) {
        this.kafka = kafka;
        this.stateDir = stateDir;
        this.withRepartitioning = withRepartitioning;
    }

    public void start() {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                EosTestClient.this.isRunning = false;
                EosTestClient.this.streams.close(TimeUnit.SECONDS.toMillis(300L), TimeUnit.SECONDS);
                EosTestClient.this.waitForStateTransitionCallback();
                if (!EosTestClient.this.uncaughtException) {
                    System.out.println(System.currentTimeMillis());
                    System.out.println("EOS-TEST-CLIENT-CLOSED");
                    System.out.flush();
                }
            }
        }));
        while (this.isRunning) {
            if (this.streams == null) {
                this.uncaughtException = false;
                this.streams = this.createKafkaStreams(this.stateDir, this.kafka);
                this.streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                    @Override
                    public void uncaughtException(Thread t, Throwable e) {
                        System.out.println(System.currentTimeMillis());
                        System.out.println("EOS-TEST-CLIENT-EXCEPTION");
                        e.printStackTrace();
                        System.out.flush();
                        EosTestClient.this.uncaughtException = true;
                    }
                });
                this.streams.setStateListener(new KafkaStreams.StateListener(){

                    @Override
                    public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
                        System.out.println(System.currentTimeMillis());
                        System.out.println("StateChange: " + (Object)((Object)oldState) + " -> " + (Object)((Object)newState));
                        System.out.flush();
                        if (newState == KafkaStreams.State.NOT_RUNNING) {
                            EosTestClient.this.notRunningCallbackReceived.set(true);
                        }
                    }
                });
                this.streams.start();
            }
            if (this.uncaughtException) {
                this.streams.close(TimeUnit.SECONDS.toMillis(60L), TimeUnit.SECONDS);
                this.streams = null;
            }
            EosTestClient.sleep(1000L);
        }
    }

    private KafkaStreams createKafkaStreams(File stateDir, String kafka) {
        Properties props = new Properties();
        props.put("application.id", APP_ID);
        props.put("state.dir", stateDir.toString());
        props.put("bootstrap.servers", kafka);
        props.put("num.stream.threads", (Object)1);
        props.put("num.standby.replicas", (Object)2);
        props.put("replication.factor", (Object)3);
        props.put("processing.guarantee", "exactly_once");
        props.put("cache.max.bytes.buffering", (Object)0);
        props.put("default.key.serde", Serdes.String().getClass());
        props.put("default.value.serde", Serdes.Integer().getClass());
        props.put(StreamsConfig.producerPrefix("request.timeout.ms"), (Object)60000);
        StreamsBuilder builder = new StreamsBuilder();
        KStream<Object, Object> data = builder.stream("data");
        data.to("echo");
        data.process(SmokeTestUtil.printProcessorSupplier("data"), new String[0]);
        KGroupedStream<String, Integer> groupedData = data.groupByKey();
        groupedData.aggregate(new Initializer<Integer>(){

            @Override
            public Integer apply() {
                return Integer.MAX_VALUE;
            }
        }, new Aggregator<String, Integer, Integer>(){

            @Override
            public Integer apply(String aggKey, Integer value, Integer aggregate) {
                return value < aggregate ? value : aggregate;
            }
        }, Materialized.with(null, intSerde)).toStream().to("min", Produced.with(stringSerde, intSerde));
        groupedData.aggregate(new Initializer<Long>(){

            @Override
            public Long apply() {
                return 0L;
            }
        }, new Aggregator<String, Integer, Long>(){

            @Override
            public Long apply(String aggKey, Integer value, Long aggregate) {
                return (long)value.intValue() + aggregate;
            }
        }, Materialized.with(null, longSerde)).toStream().to("sum", Produced.with(stringSerde, longSerde));
        if (this.withRepartitioning) {
            KStream<Object, Object> repartitionedData = data.through("repartition");
            repartitionedData.process(SmokeTestUtil.printProcessorSupplier("repartition"), new String[0]);
            KGroupedStream<String, Integer> groupedDataAfterRepartitioning = repartitionedData.groupByKey();
            groupedDataAfterRepartitioning.aggregate(new Initializer<Integer>(){

                @Override
                public Integer apply() {
                    return Integer.MIN_VALUE;
                }
            }, new Aggregator<String, Integer, Integer>(){

                @Override
                public Integer apply(String aggKey, Integer value, Integer aggregate) {
                    return value > aggregate ? value : aggregate;
                }
            }, Materialized.with(null, intSerde)).toStream().to("max", Produced.with(stringSerde, intSerde));
            groupedDataAfterRepartitioning.count().toStream().to("cnt", Produced.with(stringSerde, longSerde));
        }
        return new KafkaStreams(builder.build(), props);
    }

    private void waitForStateTransitionCallback() {
        long maxWaitTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(300L);
        while (!this.notRunningCallbackReceived.get() && System.currentTimeMillis() < maxWaitTime) {
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException interruptedException) {}
        }
        if (!this.notRunningCallbackReceived.get()) {
            System.err.println("State transition callback to NOT_RUNNING never received. Timed out after 5 minutes.");
            System.err.flush();
        }
    }
}

