/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.File;
import java.util.Properties;
import junit.framework.Assert;
import kafka.api.FetchRequestBuilder;
import kafka.api.FetchResponse;
import kafka.consumer.SimpleConsumer;
import kafka.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.serializer.StringEncoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import kafka.server.NotRunning$;
import kafka.utils.IntEncoder;
import kafka.utils.TestUtils$;
import kafka.utils.Utils$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness;
import kafka.zk.ZooKeeperTestHarness$class;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkException;
import org.junit.Test;
import org.scalatest.junit.JUnit3Suite;
import scala.Array$;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

@ScalaSignature(bytes="\u0006\u0001E4A!\u0001\u0002\u0001\u000f\t\u00112+\u001a:wKJ\u001c\u0006.\u001e;e_^tG+Z:u\u0015\t\u0019A!\u0001\u0004tKJ4XM\u001d\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011c\u0001\u0001\t%A\u0011\u0011\u0002E\u0007\u0002\u0015)\u00111\u0002D\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0003\u001b9\t\u0011b]2bY\u0006$Xm\u001d;\u000b\u0003=\t1a\u001c:h\u0013\t\t\"BA\u0006K+:LGoM*vSR,\u0007CA\n\u0017\u001b\u0005!\"BA\u000b\u0005\u0003\tQ8.\u0003\u0002\u0018)\t!\"l\\8LK\u0016\u0004XM\u001d+fgRD\u0015M\u001d8fgNDQ!\u0007\u0001\u0005\u0002i\ta\u0001P5oSRtD#A\u000e\u0011\u0005q\u0001Q\"\u0001\u0002\t\u000fy\u0001!\u0019!C\u0001?\u0005!\u0001o\u001c:u+\u0005\u0001\u0003CA\u0011%\u001b\u0005\u0011#\"A\u0012\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0015\u0012#aA%oi\"1q\u0005\u0001Q\u0001\n\u0001\nQ\u0001]8si\u0002Bq!\u000b\u0001C\u0002\u0013\u0005!&A\u0003qe>\u00048/F\u0001,!\ta\u0013'D\u0001.\u0015\tqs&\u0001\u0003vi&d'\"\u0001\u0019\u0002\t)\fg/Y\u0005\u0003e5\u0012!\u0002\u0015:pa\u0016\u0014H/[3t\u0011\u0019!\u0004\u0001)A\u0005W\u00051\u0001O]8qg\u0002BqA\u000e\u0001C\u0002\u0013\u0005q'\u0001\u0004d_:4\u0017nZ\u000b\u0002qA\u0011A$O\u0005\u0003u\t\u00111bS1gW\u0006\u001cuN\u001c4jO\"1A\b\u0001Q\u0001\na\nqaY8oM&<\u0007\u0005C\u0004?\u0001\t\u0007I\u0011A \u0002\t!|7\u000f^\u000b\u0002\u0001B\u0011\u0011\tR\u0007\u0002\u0005*\u00111iL\u0001\u0005Y\u0006tw-\u0003\u0002F\u0005\n11\u000b\u001e:j]\u001eDaa\u0012\u0001!\u0002\u0013\u0001\u0015!\u00025pgR\u0004\u0003bB%\u0001\u0005\u0004%\taP\u0001\u0006i>\u0004\u0018n\u0019\u0005\u0007\u0017\u0002\u0001\u000b\u0011\u0002!\u0002\rQ|\u0007/[2!\u0011\u001di\u0005A1A\u0005\u00029\u000bQa]3oiF*\u0012a\u0014\t\u0004!V\u0003U\"A)\u000b\u0005I\u001b\u0016!C5n[V$\u0018M\u00197f\u0015\t!&%\u0001\u0006d_2dWm\u0019;j_:L!AV)\u0003\t1K7\u000f\u001e\u0005\u00071\u0002\u0001\u000b\u0011B(\u0002\rM,g\u000e^\u0019!\u0011\u001dQ\u0006A1A\u0005\u00029\u000bQa]3oiJBa\u0001\u0018\u0001!\u0002\u0013y\u0015AB:f]R\u0014\u0004\u0005C\u0003_\u0001\u0011\u0005q,A\tuKN$8\t\\3b]NCW\u000f\u001e3po:$\u0012\u0001\u0019\t\u0003C\u0005L!A\u0019\u0012\u0003\tUs\u0017\u000e\u001e\u0015\u0003;\u0012\u0004\"!Z4\u000e\u0003\u0019T!a\u0003\b\n\u0005!4'\u0001\u0002+fgRDQA\u001b\u0001\u0005\u0002}\u000bq\u0005^3ti\u000ecW-\u00198TQV$Hm\\<o/&$\b\u000eR3mKR,Gk\u001c9jG\u0016s\u0017M\u00197fI\"\u0012\u0011\u000e\u001a\u0005\u0006[\u0002!\taX\u0001$i\u0016\u001cHo\u00117fC:\u001c\u0006.\u001e;e_^t\u0017I\u001a;fe\u001a\u000b\u0017\u000e\\3e'R\f'\u000f^;qQ\taG\rC\u0003q\u0001\u0011\u0005q,\u0001\u000fwKJLg-\u001f(p]\u0012\u000bW-\\8o)\"\u0014X-\u00193t'R\fG/^:")
public class ServerShutdownTest
extends JUnit3Suite
implements ZooKeeperTestHarness {
    private final int port;
    private final Properties props;
    private final KafkaConfig config;
    private final String host;
    private final String topic;
    private final List<String> sent1;
    private final List<String> sent2;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;

    @Override
    public String zkConnect() {
        return this.zkConnect;
    }

    @Override
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override
    public void zookeeper_$eq(EmbeddedZookeeper x$1) {
        this.zookeeper = x$1;
    }

    @Override
    public ZkClient zkClient() {
        return this.zkClient;
    }

    @Override
    public void zkClient_$eq(ZkClient x$1) {
        this.zkClient = x$1;
    }

    @Override
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super.setUp();
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super.tearDown();
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String x$1) {
        this.zkConnect = x$1;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int x$1) {
        this.zkConnectionTimeout = x$1;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int x$1) {
        this.zkSessionTimeout = x$1;
    }

    @Override
    public void setUp() {
        ZooKeeperTestHarness$class.setUp(this);
    }

    @Override
    public void tearDown() {
        ZooKeeperTestHarness$class.tearDown(this);
    }

    public int port() {
        return this.port;
    }

    public Properties props() {
        return this.props;
    }

    public KafkaConfig config() {
        return this.config;
    }

    public String host() {
        return this.host;
    }

    public String topic() {
        return this.topic;
    }

    public List<String> sent1() {
        return this.sent1;
    }

    public List<String> sent2() {
        return this.sent2;
    }

    @Test
    public void testCleanShutdown() {
        ObjectRef server = new ObjectRef((Object)new KafkaServer(this.config(), KafkaServer$.MODULE$.$lessinit$greater$default$2()));
        ((KafkaServer)server.elem).startup();
        Producer producer = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaConfig[]{this.config()})))), StringEncoder.class.getName(), IntEncoder.class.getName(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5());
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), 1, 1, (Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{(KafkaServer)server.elem}))), TestUtils$.MODULE$.createTopic$default$6());
        producer.send((Seq)this.sent1().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ServerShutdownTest $outer;

            public final KeyedMessage<Object, String> apply(String m) {
                return new KeyedMessage(this.$outer.topic(), (Object)BoxesRunTime.boxToInteger((int)0), (Object)m);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        }, List$.MODULE$.canBuildFrom()));
        ((KafkaServer)server.elem).shutdown();
        this.config().logDirs().foreach((Function1)new Serializable(this, server){
            public static final long serialVersionUID = 0L;
            private final ObjectRef server$1;

            public final void apply(String logDir) {
                File OffsetCheckpointFile = new File(logDir, ((KafkaServer)this.server$1.elem).logManager().RecoveryPointCheckpointFile());
                Assert.assertTrue((boolean)OffsetCheckpointFile.exists());
                Assert.assertTrue((OffsetCheckpointFile.length() > 0L ? 1 : 0) != 0);
            }
            {
                this.server$1 = server$1;
            }
        });
        producer.close();
        server.elem = new KafkaServer(this.config(), KafkaServer$.MODULE$.$lessinit$greater$default$2());
        ((KafkaServer)server.elem).startup();
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{(KafkaServer)server.elem}))), this.topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        producer = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaConfig[]{this.config()})))), StringEncoder.class.getName(), IntEncoder.class.getName(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5());
        SimpleConsumer consumer = new SimpleConsumer(this.host(), this.port(), 1000000, 65536, "");
        ByteBufferMessageSet fetchedMessage = null;
        while (fetchedMessage == null || fetchedMessage.validBytes() == 0) {
            FetchResponse fetched = consumer.fetch(new FetchRequestBuilder().addFetch(this.topic(), 0, 0L, 10000).maxWait(0).build());
            fetchedMessage = fetched.messageSet(this.topic(), 0);
        }
        Assert.assertEquals(this.sent1(), (Object)fetchedMessage.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(MessageAndOffset m) {
                return Utils$.MODULE$.readString(m.message().payload(), Utils$.MODULE$.readString$default$2());
            }
        }, Iterable$.MODULE$.canBuildFrom()));
        long newOffset = ((MessageAndOffset)fetchedMessage.last()).nextOffset();
        producer.send((Seq)this.sent2().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ServerShutdownTest $outer;

            public final KeyedMessage<Object, String> apply(String m) {
                return new KeyedMessage(this.$outer.topic(), (Object)BoxesRunTime.boxToInteger((int)0), (Object)m);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        }, List$.MODULE$.canBuildFrom()));
        fetchedMessage = null;
        while (fetchedMessage == null || fetchedMessage.validBytes() == 0) {
            FetchResponse fetched = consumer.fetch(new FetchRequestBuilder().addFetch(this.topic(), 0, newOffset, 10000).build());
            fetchedMessage = fetched.messageSet(this.topic(), 0);
        }
        Assert.assertEquals(this.sent2(), (Object)fetchedMessage.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(MessageAndOffset m) {
                return Utils$.MODULE$.readString(m.message().payload(), Utils$.MODULE$.readString$default$2());
            }
        }, Iterable$.MODULE$.canBuildFrom()));
        consumer.close();
        producer.close();
        ((KafkaServer)server.elem).shutdown();
        Utils$.MODULE$.rm(((KafkaServer)server.elem).config().logDirs());
        this.verifyNonDaemonThreadsStatus();
    }

    @Test
    public void testCleanShutdownWithDeleteTopicEnabled() {
        Properties newProps = TestUtils$.MODULE$.createBrokerConfig(0, this.port(), TestUtils$.MODULE$.createBrokerConfig$default$3());
        newProps.setProperty("delete.topic.enable", "true");
        KafkaConfig newConfig = new KafkaConfig(newProps);
        KafkaServer server = new KafkaServer(newConfig, KafkaServer$.MODULE$.$lessinit$greater$default$2());
        server.startup();
        server.shutdown();
        server.awaitShutdown();
        Utils$.MODULE$.rm(server.config().logDirs());
        this.verifyNonDaemonThreadsStatus();
    }

    @Test
    public void testCleanShutdownAfterFailedStartup() {
        Properties newProps = TestUtils$.MODULE$.createBrokerConfig(0, this.port(), TestUtils$.MODULE$.createBrokerConfig$default$3());
        newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535");
        KafkaConfig newConfig = new KafkaConfig(newProps);
        KafkaServer server = new KafkaServer(newConfig, KafkaServer$.MODULE$.$lessinit$greater$default$2());
        try {
            server.startup();
            throw this.fail("Expected KafkaServer setup to fail, throw exception");
        }
        catch (Throwable throwable) {
            throw this.fail("Expected KafkaServer setup to fail with connection exception but caught a different exception.");
        }
        catch (ZkException zkException) {
            Assert.assertEquals((byte)server.brokerState().currentState(), (byte)NotRunning$.MODULE$.state());
            if (server.brokerState().currentState() != NotRunning$.MODULE$.state()) {
                server.shutdown();
            }
            server.awaitShutdown();
            Utils$.MODULE$.rm(server.config().logDirs());
            this.verifyNonDaemonThreadsStatus();
            return;
        }
    }

    public void verifyNonDaemonThreadsStatus() {
        Assert.assertEquals((int)0, (int)Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps(Thread.getAllStackTraces().keySet().toArray()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Thread apply(Object x$1) {
                return (Thread)x$1;
            }
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Thread.class)))).count((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Thread t) {
                return !t.isDaemon() && t.isAlive() && t.getClass().getCanonicalName().toLowerCase().startsWith("kafka");
            }
        }));
    }

    public ServerShutdownTest() {
        ZooKeeperTestHarness$class.$init$(this);
        this.port = TestUtils$.MODULE$.choosePort();
        this.props = TestUtils$.MODULE$.createBrokerConfig(0, this.port(), TestUtils$.MODULE$.createBrokerConfig$default$3());
        this.config = new KafkaConfig(this.props());
        this.host = "localhost";
        this.topic = "test";
        this.sent1 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"hello", "there"}));
        this.sent2 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"more", "messages"}));
    }
}

