/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;

public class ShutdownDeadlockTest {
    private final String kafka;

    public ShutdownDeadlockTest(String kafka) {
        this.kafka = kafka;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        String topic = "source";
        Properties props = new Properties();
        props.setProperty("application.id", "shouldNotDeadlock");
        props.setProperty("bootstrap.servers", this.kafka);
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("source", Consumed.with(Serdes.String(), Serdes.String()));
        source.foreach(new ForeachAction<String, String>(){

            @Override
            public void apply(String key, String value) {
                throw new RuntimeException("KABOOM!");
            }
        });
        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                Exit.exit(1);
            }
        });
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                streams.close(5L, TimeUnit.SECONDS);
            }
        }));
        Properties producerProps = new Properties();
        producerProps.put("client.id", "SmokeTest");
        producerProps.put("bootstrap.servers", this.kafka);
        producerProps.put("key.serializer", StringSerializer.class);
        producerProps.put("value.serializer", StringSerializer.class);
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProps);
        producer.send(new ProducerRecord<String, String>("source", "a", "a"));
        producer.flush();
        streams.start();
        ShutdownDeadlockTest shutdownDeadlockTest = this;
        synchronized (shutdownDeadlockTest) {
            try {
                this.wait();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

