/*
 * Decompiled with CFR 0.152.
 */
package kafka.producer;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Properties;
import junit.framework.Assert;
import kafka.admin.AdminUtils$;
import kafka.api.ProducerRequest;
import kafka.api.ProducerResponse;
import kafka.api.ProducerResponseStatus;
import kafka.common.ErrorMapping$;
import kafka.common.TopicAndPartition;
import kafka.integration.KafkaServerTestHarness;
import kafka.integration.KafkaServerTestHarness$class;
import kafka.message.ByteBufferMessageSet;
import kafka.message.CompressionCodec;
import kafka.message.Message;
import kafka.message.Message$;
import kafka.message.MessageSet$;
import kafka.message.NoCompressionCodec$;
import kafka.producer.SyncProducer;
import kafka.producer.SyncProducerConfig;
import kafka.producer.SyncProducerConfig$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
import kafka.utils.TestUtils$;
import kafka.utils.TestZKUtils$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness$class;
import org.I0Itec.zkclient.ZkClient;
import org.junit.Test;
import org.scalatest.junit.JUnit3Suite;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.TraitSetter;

@ScalaSignature(bytes="\u0006\u0001E4A!\u0001\u0002\u0001\u000f\t\u00012+\u001f8d!J|G-^2feR+7\u000f\u001e\u0006\u0003\u0007\u0011\t\u0001\u0002\u001d:pIV\u001cWM\u001d\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011c\u0001\u0001\t%A\u0011\u0011\u0002E\u0007\u0002\u0015)\u00111\u0002D\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0003\u001b9\t\u0011b]2bY\u0006$Xm\u001d;\u000b\u0003=\t1a\u001c:h\u0013\t\t\"BA\u0006K+:LGoM*vSR,\u0007CA\n\u0017\u001b\u0005!\"BA\u000b\u0005\u0003-Ig\u000e^3he\u0006$\u0018n\u001c8\n\u0005]!\"AF&bM.\f7+\u001a:wKJ$Vm\u001d;ICJtWm]:\t\u000be\u0001A\u0011\u0001\u000e\u0002\rqJg.\u001b;?)\u0005Y\u0002C\u0001\u000f\u0001\u001b\u0005\u0011\u0001b\u0002\u0010\u0001\u0001\u0004%IaH\u0001\r[\u0016\u001c8/Y4f\u0005f$Xm]\u000b\u0002AA\u0019\u0011\u0005\n\u0014\u000e\u0003\tR\u0011aI\u0001\u0006g\u000e\fG.Y\u0005\u0003K\t\u0012Q!\u0011:sCf\u0004\"!I\u0014\n\u0005!\u0012#\u0001\u0002\"zi\u0016DqA\u000b\u0001A\u0002\u0013%1&\u0001\tnKN\u001c\u0018mZ3CsR,7o\u0018\u0013fcR\u0011Af\f\t\u0003C5J!A\f\u0012\u0003\tUs\u0017\u000e\u001e\u0005\ba%\n\t\u00111\u0001!\u0003\rAH%\r\u0005\u0007e\u0001\u0001\u000b\u0015\u0002\u0011\u0002\u001b5,7o]1hK\nKH/Z:!\u0011\u001d!\u0004A1A\u0005\u0002U\nqaY8oM&<7/F\u00017!\r9DHP\u0007\u0002q)\u0011\u0011HO\u0001\nS6lW\u000f^1cY\u0016T!a\u000f\u0012\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002>q\t!A*[:u!\ty$)D\u0001A\u0015\t\tE!\u0001\u0004tKJ4XM]\u0005\u0003\u0007\u0002\u00131bS1gW\u0006\u001cuN\u001c4jO\"1Q\t\u0001Q\u0001\nY\n\u0001bY8oM&<7\u000f\t\u0005\b\u000f\u0002\u0011\r\u0011\"\u0001I\u0003AQxn\\6fKB,'oQ8o]\u0016\u001cG/F\u0001J!\tQu*D\u0001L\u0015\taU*\u0001\u0003mC:<'\"\u0001(\u0002\t)\fg/Y\u0005\u0003!.\u0013aa\u0015;sS:<\u0007B\u0002*\u0001A\u0003%\u0011*A\t{_>\\W-\u001a9fe\u000e{gN\\3di\u0002BQ\u0001\u0016\u0001\u0005\u0002U\u000b1\u0003^3tiJ+\u0017m\u00195bE2,7+\u001a:wKJ$\u0012\u0001\f\u0015\u0003'^\u0003\"\u0001\u0017.\u000e\u0003eS!a\u0003\b\n\u0005mK&\u0001\u0002+fgRDQ!\u0018\u0001\u0005\u0002U\u000bq\u0003^3ti\u0016k\u0007\u000f^=Qe>$WoY3SKF,Xm\u001d;)\u0005q;\u0006\"\u00021\u0001\t\u0003)\u0016a\u0006;fgRlUm]:bO\u0016\u001c\u0016N_3U_>d\u0015M]4fQ\tyv\u000bC\u0003d\u0001\u0011\u0005Q+\u0001\u0012uKN$X*Z:tC\u001e,7+\u001b>f)>|G*\u0019:hK^KG\u000f[!dWj+'o\u001c\u0015\u0003E^CQA\u001a\u0001\u0005\u0002U\u000bA\u0005^3tiB\u0013x\u000eZ;dK\u000e{'O]3di2L(+Z2fSZ,7OU3ta>t7/\u001a\u0015\u0003K^CQ!\u001b\u0001\u0005\u0002U\u000ba\u0003^3tiB\u0013x\u000eZ;dKJ\u001c\u0015M\u001c+j[\u0016|W\u000f\u001e\u0015\u0003Q^CQ\u0001\u001c\u0001\u0005\u0002U\u000b\u0001\u0005^3tiB\u0013x\u000eZ;dKJ+\u0017/^3ti^KG\u000f\u001b(p%\u0016\u001c\bo\u001c8tK\"\u00121n\u0016\u0005\u0006_\u0002!\t!V\u0001\u0016i\u0016\u001cHOT8u\u000b:|Wo\u001a5SKBd\u0017nY1tQ\tqw\u000b")
public class SyncProducerTest
extends JUnit3Suite
implements KafkaServerTestHarness {
    private byte[] messageBytes;
    private final List<KafkaConfig> configs;
    private final String zookeeperConnect;
    private List<KafkaServer> servers;
    private String brokerList;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;

    @Override
    public List<KafkaServer> servers() {
        return this.servers;
    }

    @Override
    @TraitSetter
    public void servers_$eq(List<KafkaServer> x$1) {
        this.servers = x$1;
    }

    @Override
    public String brokerList() {
        return this.brokerList;
    }

    @Override
    @TraitSetter
    public void brokerList_$eq(String x$1) {
        this.brokerList = x$1;
    }

    @Override
    public /* synthetic */ void kafka$integration$KafkaServerTestHarness$$super$setUp() {
        ZooKeeperTestHarness$class.setUp(this);
    }

    @Override
    public /* synthetic */ void kafka$integration$KafkaServerTestHarness$$super$tearDown() {
        ZooKeeperTestHarness$class.tearDown(this);
    }

    @Override
    public void setUp() {
        KafkaServerTestHarness$class.setUp(this);
    }

    @Override
    public void tearDown() {
        KafkaServerTestHarness$class.tearDown(this);
    }

    @Override
    public String zkConnect() {
        return this.zkConnect;
    }

    @Override
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override
    public void zookeeper_$eq(EmbeddedZookeeper x$1) {
        this.zookeeper = x$1;
    }

    @Override
    public ZkClient zkClient() {
        return this.zkClient;
    }

    @Override
    public void zkClient_$eq(ZkClient x$1) {
        this.zkClient = x$1;
    }

    @Override
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override
    public /* synthetic */ void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super.setUp();
    }

    @Override
    public /* synthetic */ void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super.tearDown();
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String x$1) {
        this.zkConnect = x$1;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int x$1) {
        this.zkConnectionTimeout = x$1;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int x$1) {
        this.zkSessionTimeout = x$1;
    }

    private byte[] messageBytes() {
        return this.messageBytes;
    }

    private void messageBytes_$eq(byte[] x$1) {
        this.messageBytes = x$1;
    }

    @Override
    public List<KafkaConfig> configs() {
        return this.configs;
    }

    public String zookeeperConnect() {
        return this.zookeeperConnect;
    }

    @Test
    public void testReachableServer() {
        KafkaServer server = (KafkaServer)this.servers().head();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(server.socketServer().port());
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        long firstStart = SystemTime$.MODULE$.milliseconds();
        try {
            ProducerResponse response = producer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
            Assert.assertNotNull((Object)response);
        }
        catch (Exception exception) {
            Assert.fail((String)new StringBuilder().append((Object)"Unexpected failure sending message to broker. ").append((Object)exception.getMessage()).toString());
        }
        long firstEnd = SystemTime$.MODULE$.milliseconds();
        Assert.assertTrue((firstEnd - firstStart < 500L ? 1 : 0) != 0);
        long secondStart = SystemTime$.MODULE$.milliseconds();
        try {
            ProducerResponse response = producer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
            Assert.assertNotNull((Object)response);
        }
        catch (Exception exception) {
            Assert.fail((String)new StringBuilder().append((Object)"Unexpected failure sending message to broker. ").append((Object)exception.getMessage()).toString());
        }
        long secondEnd = SystemTime$.MODULE$.milliseconds();
        Assert.assertTrue((secondEnd - secondStart < 500L ? 1 : 0) != 0);
        try {
            ProducerResponse response = producer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
            Assert.assertNotNull((Object)response);
        }
        catch (Exception exception) {
            Assert.fail((String)new StringBuilder().append((Object)"Unexpected failure sending message to broker. ").append((Object)exception.getMessage()).toString());
        }
    }

    @Test
    public void testEmptyProduceRequest() {
        int ackTimeoutMs;
        short ack;
        String clientId;
        int correlationId;
        ProducerRequest emptyRequest;
        KafkaServer server = (KafkaServer)this.servers().head();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(server.socketServer().port());
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        ProducerResponse response = producer.send(emptyRequest = new ProducerRequest(correlationId = 0, clientId = SyncProducerConfig$.MODULE$.DefaultClientId(), ack = 1, ackTimeoutMs = SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$)));
        Assert.assertTrue((response != null ? 1 : 0) != 0);
        Assert.assertTrue((!response.hasError() && response.status().size() == 0 ? 1 : 0) != 0);
    }

    @Test
    public void testMessageSizeTooLarge() {
        KafkaServer server = (KafkaServer)this.servers().head();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(server.socketServer().port());
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        TestUtils$.MODULE$.createTopic(this.zkClient(), "test", 1, 1, (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
        Message message1 = new Message(new byte[((KafkaConfig)this.configs().apply(0)).messageMaxBytes() + 1]);
        ByteBufferMessageSet messageSet1 = new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{message1}));
        ProducerResponse response1 = producer.send(TestUtils$.MODULE$.produceRequest("test", 0, messageSet1, 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        Assert.assertEquals((int)1, (int)response1.status().count((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Tuple2<TopicAndPartition, ProducerResponseStatus> x$1) {
                return ((ProducerResponseStatus)x$1._2()).error() != ErrorMapping$.MODULE$.NoError();
            }
        }));
        Assert.assertEquals((short)ErrorMapping$.MODULE$.MessageSizeTooLargeCode(), (short)((ProducerResponseStatus)response1.status().apply((Object)new TopicAndPartition("test", 0))).error());
        Assert.assertEquals((long)-1L, (long)((ProducerResponseStatus)response1.status().apply((Object)new TopicAndPartition("test", 0))).offset());
        int safeSize = ((KafkaConfig)this.configs().apply(0)).messageMaxBytes() - Message$.MODULE$.MessageOverhead() - MessageSet$.MODULE$.LogOverhead() - 1;
        Message message2 = new Message(new byte[safeSize]);
        ByteBufferMessageSet messageSet2 = new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{message2}));
        ProducerResponse response2 = producer.send(TestUtils$.MODULE$.produceRequest("test", 0, messageSet2, 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        Assert.assertEquals((int)1, (int)response1.status().count((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Tuple2<TopicAndPartition, ProducerResponseStatus> x$2) {
                return ((ProducerResponseStatus)x$2._2()).error() != ErrorMapping$.MODULE$.NoError();
            }
        }));
        Assert.assertEquals((short)ErrorMapping$.MODULE$.NoError(), (short)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("test", 0))).error());
        Assert.assertEquals((long)0L, (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("test", 0))).offset());
    }

    @Test
    public void testMessageSizeTooLargeWithAckZero() {
        KafkaServer server = (KafkaServer)this.servers().head();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(server.socketServer().port());
        props.put("request.required.acks", "0");
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        AdminUtils$.MODULE$.createTopic(this.zkClient(), "test", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), "test", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        producer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(new byte[((KafkaConfig)this.configs().apply(0)).messageMaxBytes() + 1])})), 0, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        try {
            producer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(new byte[((KafkaConfig)this.configs().apply(0)).messageMaxBytes() + 1])})), 0, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        }
        catch (Throwable throwable) {
            throw throwable;
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Test
    public void testProduceCorrectlyReceivesResponse() {
        KafkaServer server = (KafkaServer)this.servers().head();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(server.socketServer().port());
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        ByteBufferMessageSet messages2 = new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())}));
        ProducerRequest request = TestUtils$.MODULE$.produceRequestWithAcks((Seq<String>)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"topic1", "topic2", "topic3"}), (Seq<Object>)Predef$.MODULE$.wrapIntArray(new int[]{0}), messages2, 1, TestUtils$.MODULE$.produceRequestWithAcks$default$5(), TestUtils$.MODULE$.produceRequestWithAcks$default$6(), TestUtils$.MODULE$.produceRequestWithAcks$default$7());
        ProducerResponse response = producer.send(request);
        Assert.assertNotNull((Object)response);
        Assert.assertEquals((int)request.correlationId(), (int)response.correlationId());
        Assert.assertEquals((int)3, (int)response.status().size());
        response.status().values().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(ProducerResponseStatus x0$1) {
                ProducerResponseStatus producerResponseStatus = x0$1;
                if (producerResponseStatus != null) {
                    short error = producerResponseStatus.error();
                    long nextOffset = producerResponseStatus.offset();
                    Assert.assertEquals((short)ErrorMapping$.MODULE$.UnknownTopicOrPartitionCode(), (short)error);
                    Assert.assertEquals((long)-1L, (long)nextOffset);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
                throw new MatchError((Object)producerResponseStatus);
            }
        });
        AdminUtils$.MODULE$.createTopic(this.zkClient(), "topic1", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), "topic1", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        AdminUtils$.MODULE$.createTopic(this.zkClient(), "topic3", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), "topic3", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        ProducerResponse response2 = producer.send(request);
        Assert.assertNotNull((Object)response2);
        Assert.assertEquals((int)request.correlationId(), (int)response2.correlationId());
        Assert.assertEquals((int)3, (int)response2.status().size());
        Assert.assertEquals((short)ErrorMapping$.MODULE$.NoError(), (short)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic1", 0))).error());
        Assert.assertEquals((short)ErrorMapping$.MODULE$.NoError(), (short)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic3", 0))).error());
        Assert.assertEquals((long)0L, (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic1", 0))).offset());
        Assert.assertEquals((long)0L, (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic3", 0))).offset());
        Assert.assertEquals((short)ErrorMapping$.MODULE$.UnknownTopicOrPartitionCode(), (short)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic2", 0))).error());
        Assert.assertEquals((long)-1L, (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic2", 0))).offset());
    }

    @Test
    public void testProducerCanTimeout() {
        int timeoutMs = 500;
        KafkaServer server = (KafkaServer)this.servers().head();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(server.socketServer().port());
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        ByteBufferMessageSet messages2 = new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())}));
        ProducerRequest request = TestUtils$.MODULE$.produceRequest("topic1", 0, messages2, 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7());
        server.requestHandlerPool().shutdown();
        long t1 = SystemTime$.MODULE$.milliseconds();
        try {
            producer.send(request);
            Assert.fail((String)"Should have received timeout exception since request handling is stopped.");
        }
        catch (Throwable throwable) {
            Assert.fail((String)new StringBuilder().append((Object)"Unexpected exception when expecting timeout: ").append((Object)throwable).toString());
        }
        catch (SocketTimeoutException socketTimeoutException) {
            // empty catch block
        }
        long t2 = SystemTime$.MODULE$.milliseconds();
        Assert.assertTrue((t2 - t1 >= (long)timeoutMs ? 1 : 0) != 0);
    }

    @Test
    public void testProduceRequestWithNoResponse() {
        int ackTimeoutMs;
        short ack;
        String clientId;
        int correlationId;
        ProducerRequest emptyRequest;
        KafkaServer server = (KafkaServer)this.servers().head();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(server.socketServer().port());
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        ProducerResponse response = producer.send(emptyRequest = new ProducerRequest(correlationId = 0, clientId = SyncProducerConfig$.MODULE$.DefaultClientId(), ack = 0, ackTimeoutMs = SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$)));
        Assert.assertTrue((response == null ? 1 : 0) != 0);
    }

    @Test
    public void testNotEnoughReplicas() {
        String topicName = "minisrtest";
        KafkaServer server = (KafkaServer)this.servers().head();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(server.socketServer().port());
        props.put("request.required.acks", "-1");
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        Properties topicProps = new Properties();
        topicProps.put("min.insync.replicas", "2");
        AdminUtils$.MODULE$.createTopic(this.zkClient(), topicName, 1, 1, topicProps);
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), topicName, 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        ProducerResponse response = producer.send(TestUtils$.MODULE$.produceRequest(topicName, 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())})), -1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        Assert.assertEquals((short)ErrorMapping$.MODULE$.NotEnoughReplicasCode(), (short)((ProducerResponseStatus)response.status().apply((Object)new TopicAndPartition(topicName, 0))).error());
    }

    public SyncProducerTest() {
        ZooKeeperTestHarness$class.$init$(this);
        KafkaServerTestHarness$class.$init$(this);
        this.messageBytes = new byte[2];
        this.configs = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaConfig[]{new KafkaConfig((Properties)TestUtils$.MODULE$.createBrokerConfigs(1, false).head())}));
        this.zookeeperConnect = TestZKUtils$.MODULE$.zookeeperConnect();
    }
}

