/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.io.File;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.tests.SmokeTestUtil;

public class SmokeTestClient
extends SmokeTestUtil {
    private final String kafka;
    private final File stateDir;
    private KafkaStreams streams;
    private Thread thread;
    private boolean uncaughtException = false;

    public SmokeTestClient(File stateDir, String kafka) {
        this.stateDir = stateDir;
        this.kafka = kafka;
    }

    public void start() {
        this.streams = SmokeTestClient.createKafkaStreams(this.stateDir, this.kafka);
        this.streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
                SmokeTestClient.this.uncaughtException = true;
                e.printStackTrace();
            }
        });
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                SmokeTestClient.this.close();
            }
        }));
        this.thread = new Thread(){

            @Override
            public void run() {
                SmokeTestClient.this.streams.start();
            }
        };
        this.thread.start();
    }

    public void close() {
        this.streams.close(5L, TimeUnit.SECONDS);
        if (!this.uncaughtException) {
            System.out.println("SMOKE-TEST-CLIENT-CLOSED");
        }
        try {
            this.thread.join();
        }
        catch (Exception ex) {
            System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
        }
    }

    private static KafkaStreams createKafkaStreams(File stateDir, String kafka) {
        Properties props = new Properties();
        props.put("application.id", "SmokeTest");
        props.put("state.dir", stateDir.toString());
        props.put("bootstrap.servers", kafka);
        props.put("num.stream.threads", (Object)3);
        props.put("num.standby.replicas", (Object)2);
        props.put("buffered.records.per.partition", (Object)100);
        props.put("commit.interval.ms", (Object)1000);
        props.put("replication.factor", (Object)3);
        props.put("auto.offset.reset", "earliest");
        props.put("retries", (Object)Integer.MAX_VALUE);
        props.put("acks", "all");
        props.put(StreamsConfig.producerPrefix("request.timeout.ms"), (Object)80000);
        StreamsBuilder builder = new StreamsBuilder();
        Consumed stringIntConsumed = Consumed.with(stringSerde, intSerde);
        KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
        source.to(stringSerde, intSerde, "echo");
        KStream<String, Integer> data = source.filter(new Predicate<String, Integer>(){

            @Override
            public boolean test(String key, Integer value) {
                return value == null || value != Integer.MAX_VALUE;
            }
        });
        data.process(SmokeTestUtil.printProcessorSupplier("data"), new String[0]);
        KGroupedStream<String, Integer> groupedData = data.groupByKey(Serialized.with(stringSerde, intSerde));
        groupedData.aggregate(new Initializer<Integer>(){

            @Override
            public Integer apply() {
                return Integer.MAX_VALUE;
            }
        }, new Aggregator<String, Integer, Integer>(){

            @Override
            public Integer apply(String aggKey, Integer value, Integer aggregate) {
                return value < aggregate ? value : aggregate;
            }
        }, TimeWindows.of(TimeUnit.DAYS.toMillis(1L)), intSerde, "uwin-min").toStream().map(new SmokeTestUtil.Unwindow()).to(stringSerde, intSerde, "min");
        KTable minTable = builder.table("min", stringIntConsumed);
        minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"), new String[0]);
        groupedData.aggregate(new Initializer<Integer>(){

            @Override
            public Integer apply() {
                return Integer.MIN_VALUE;
            }
        }, new Aggregator<String, Integer, Integer>(){

            @Override
            public Integer apply(String aggKey, Integer value, Integer aggregate) {
                return value > aggregate ? value : aggregate;
            }
        }, TimeWindows.of(TimeUnit.DAYS.toMillis(2L)), intSerde, "uwin-max").toStream().map(new SmokeTestUtil.Unwindow()).to(stringSerde, intSerde, "max");
        KTable maxTable = builder.table("max", stringIntConsumed);
        maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"), new String[0]);
        groupedData.aggregate(new Initializer<Long>(){

            @Override
            public Long apply() {
                return 0L;
            }
        }, new Aggregator<String, Integer, Long>(){

            @Override
            public Long apply(String aggKey, Integer value, Long aggregate) {
                return (long)value.intValue() + aggregate;
            }
        }, TimeWindows.of(TimeUnit.DAYS.toMillis(2L)), longSerde, "win-sum").toStream().map(new SmokeTestUtil.Unwindow()).to(stringSerde, longSerde, "sum");
        Consumed stringLongConsumed = Consumed.with(stringSerde, longSerde);
        KTable sumTable = builder.table("sum", stringLongConsumed);
        sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"), new String[0]);
        groupedData.count(TimeWindows.of(TimeUnit.DAYS.toMillis(2L)), "uwin-cnt").toStream().map(new SmokeTestUtil.Unwindow()).to(stringSerde, longSerde, "cnt");
        KTable<String, Long> cntTable = builder.table("cnt", stringLongConsumed);
        cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"), new String[0]);
        maxTable.join(minTable, new ValueJoiner<Integer, Integer, Integer>(){

            @Override
            public Integer apply(Integer value1, Integer value2) {
                return value1 - value2;
            }
        }).to(stringSerde, intSerde, "dif");
        sumTable.join(cntTable, new ValueJoiner<Long, Long, Double>(){

            @Override
            public Double apply(Long value1, Long value2) {
                return (double)value1.longValue() / (double)value2.longValue();
            }
        }).to(stringSerde, doubleSerde, "avg");
        SmokeTestUtil.Agg agg = new SmokeTestUtil.Agg();
        cntTable.groupBy(agg.selector(), Serialized.with(stringSerde, longSerde)).aggregate(agg.init(), agg.adder(), agg.remover(), Materialized.as(Stores.inMemoryKeyValueStore("cntByCnt")).withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())).to(stringSerde, longSerde, "tagg");
        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);
        streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
                streamsClient.close(30L, TimeUnit.SECONDS);
            }
        });
        return streamsClient;
    }
}

