/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.sink.kafka;

import com.google.common.base.Charsets;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.util.Utf8;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.shared.kafka.test.KafkaPartitionTestUtil;
import org.apache.flume.shared.kafka.test.PartitionOption;
import org.apache.flume.shared.kafka.test.PartitionTestScenario;
import org.apache.flume.sink.kafka.KafkaSink;
import org.apache.flume.sink.kafka.util.TestUtil;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;

public class TestKafkaSink {
    private static final TestUtil testUtil = TestUtil.getInstance();
    private final Set<String> usedTopics = new HashSet<String>();

    @BeforeClass
    public static void setup() {
        testUtil.prepare();
        ArrayList<String> topics = new ArrayList<String>(3);
        topics.add("default-flume-topic");
        topics.add("static-topic");
        topics.add("custom-topic");
        topics.add("test-avro-header-topic");
        testUtil.initTopicList(topics);
    }

    @AfterClass
    public static void tearDown() {
        testUtil.tearDown();
    }

    @Test
    public void testKafkaProperties() {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = new Context();
        context.put("kafka.kafka.topic", "");
        context.put("kafka.producer.value.serializer", "override.default.serializer");
        context.put("kafka.producer.fake.property", "kafka.property.value");
        context.put("kafka.bootstrap.servers", "localhost:9092,localhost:9092");
        context.put("brokerList", "real-broker-list");
        Configurables.configure((Object)kafkaSink, (Context)context);
        Properties kafkaProps = kafkaSink.getKafkaProps();
        Assert.assertEquals((Object)kafkaProps.getProperty("key.serializer"), (Object)"org.apache.kafka.common.serialization.StringSerializer");
        Assert.assertEquals((Object)kafkaProps.getProperty("value.serializer"), (Object)"override.default.serializer");
        Assert.assertEquals((Object)kafkaProps.getProperty("fake.property"), (Object)"kafka.property.value");
        Assert.assertEquals((Object)kafkaProps.getProperty("bootstrap.servers"), (Object)"localhost:9092,localhost:9092");
    }

    @Test
    public void testOldProperties() {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = new Context();
        context.put("topic", "test-topic");
        context.put("batchSize", "300");
        context.put("brokerList", "localhost:9092,localhost:9092");
        context.put("requiredAcks", "all");
        Configurables.configure((Object)kafkaSink, (Context)context);
        Properties kafkaProps = kafkaSink.getKafkaProps();
        Assert.assertEquals((Object)kafkaSink.getTopic(), (Object)"test-topic");
        Assert.assertEquals((long)kafkaSink.getBatchSize(), (long)300L);
        Assert.assertEquals((Object)kafkaProps.getProperty("bootstrap.servers"), (Object)"localhost:9092,localhost:9092");
        Assert.assertEquals((Object)kafkaProps.getProperty("acks"), (Object)"all");
    }

    @Test
    public void testDefaultTopic() {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = this.prepareDefaultContext();
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        String msg = "default-topic-test";
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        Event event = EventBuilder.withBody((byte[])msg.getBytes());
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        try {
            Sink.Status status = kafkaSink.process();
            if (status == Sink.Status.BACKOFF) {
                Assert.fail((String)"Error Occurred");
            }
        }
        catch (EventDeliveryException eventDeliveryException) {
            // empty catch block
        }
        this.checkMessageArrived(msg, "default-flume-topic");
    }

    private void checkMessageArrived(String msg, String topic) {
        this.checkMessageArrived(msg, topic, null, null);
    }

    private void checkMessageArrived(String msg, String topic, Long timestamp, Headers headers) {
        ConsumerRecords<String, String> recs = this.pollConsumerRecords(topic);
        Assert.assertNotNull(recs);
        Assert.assertTrue((recs.count() > 0 ? 1 : 0) != 0);
        Iterator iter = recs.records(topic).iterator();
        boolean match = false;
        while (iter.hasNext()) {
            ConsumerRecord record = (ConsumerRecord)iter.next();
            if (!msg.equals(record.value()) || timestamp != null && !timestamp.equals(record.timestamp()) || headers != null && !this.validateHeaders(headers, record.headers())) continue;
            match = true;
            break;
        }
        Assert.assertTrue((String)("No message matches " + msg), (boolean)match);
    }

    private boolean validateHeaders(Headers expected, Headers actual) {
        return expected.equals(actual);
    }

    @Test
    public void testStaticTopic() {
        Context context = this.prepareDefaultContext();
        context.put("kafka.topic", "static-topic");
        String msg = "static-topic-test";
        try {
            Sink.Status status = this.prepareAndSend(context, msg);
            if (status == Sink.Status.BACKOFF) {
                Assert.fail((String)"Error Occurred");
            }
        }
        catch (EventDeliveryException eventDeliveryException) {
            // empty catch block
        }
        this.checkMessageArrived(msg, "static-topic");
    }

    @Test
    public void testTopicAndKeyFromHeader() {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = this.prepareDefaultContext();
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        String msg = "test-topic-and-key-from-header";
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("topic", "custom-topic");
        headers.put("key", "custom-key");
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        Event event = EventBuilder.withBody((byte[])msg.getBytes(), headers);
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        try {
            Sink.Status status = kafkaSink.process();
            if (status == Sink.Status.BACKOFF) {
                Assert.fail((String)"Error Occurred");
            }
        }
        catch (EventDeliveryException eventDeliveryException) {
            // empty catch block
        }
        this.checkMessageArrived(msg, "custom-topic");
    }

    @Test
    public void testTimestampAndHeaders() {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = this.prepareDefaultContext();
        context.put("timestampHeader", "timestamp");
        context.put("header.correlator", "FLUME_CORRELATOR");
        context.put("header.method", "FLUME_METHOD");
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        String msg = "test-topic-and-key-from-header";
        HashMap<String, String> headers = new HashMap<String, String>();
        long now = System.currentTimeMillis();
        headers.put("timestamp", Long.toString(now));
        headers.put("topic", "custom-topic");
        headers.put("key", "custom-key");
        headers.put("correlator", "12345");
        headers.put("method", "testTimestampAndHeaders");
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        Event event = EventBuilder.withBody((byte[])msg.getBytes(), headers);
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        try {
            Sink.Status status = kafkaSink.process();
            if (status == Sink.Status.BACKOFF) {
                Assert.fail((String)"Error Occurred");
            }
        }
        catch (EventDeliveryException status) {
            // empty catch block
        }
        RecordHeaders expected = new RecordHeaders();
        expected.add((Header)new RecordHeader("FLUME_CORRELATOR", "12345".getBytes(StandardCharsets.UTF_8)));
        expected.add((Header)new RecordHeader("FLUME_METHOD", "testTimestampAndHeaders".getBytes(StandardCharsets.UTF_8)));
        this.checkMessageArrived(msg, "custom-topic", now, (Headers)expected);
    }

    @Test
    public void testTopicFromConfHeader() {
        String customTopicHeader = "customTopicHeader";
        KafkaSink kafkaSink = new KafkaSink();
        Context context = this.prepareDefaultContext();
        context.put("topicHeader", customTopicHeader);
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        String msg = "test-topic-from-config-header";
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put(customTopicHeader, "custom-topic");
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        Event event = EventBuilder.withBody((byte[])msg.getBytes(), headers);
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        try {
            Sink.Status status = kafkaSink.process();
            if (status == Sink.Status.BACKOFF) {
                Assert.fail((String)"Error Occurred");
            }
        }
        catch (EventDeliveryException eventDeliveryException) {
            // empty catch block
        }
        this.checkMessageArrived(msg, "custom-topic");
    }

    @Test
    public void testTopicNotFromConfHeader() {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = this.prepareDefaultContext();
        context.put("allowTopicOverride", "false");
        context.put("topicHeader", "foo");
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        String msg = "test-topic-from-config-header";
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("topic", "custom-topic");
        headers.put("foo", "custom-topic");
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        Event event = EventBuilder.withBody((byte[])msg.getBytes(), headers);
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        try {
            Sink.Status status = kafkaSink.process();
            if (status == Sink.Status.BACKOFF) {
                Assert.fail((String)"Error Occurred");
            }
        }
        catch (EventDeliveryException eventDeliveryException) {
            // empty catch block
        }
        this.checkMessageArrived(msg, "default-flume-topic");
    }

    @Test
    public void testReplaceSubStringOfTopicWithHeaders() {
        String topic = "test-avro-header-topic";
        KafkaSink kafkaSink = new KafkaSink();
        Context context = this.prepareDefaultContext();
        context.put("kafka.topic", "%{header1}-topic");
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        String msg = "test-replace-substring-of-topic-with-headers";
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("header1", "test-avro-header");
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        Event event = EventBuilder.withBody((byte[])msg.getBytes(), headers);
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        try {
            Sink.Status status = kafkaSink.process();
            if (status == Sink.Status.BACKOFF) {
                Assert.fail((String)"Error Occurred");
            }
        }
        catch (EventDeliveryException eventDeliveryException) {
            // empty catch block
        }
        this.checkMessageArrived(msg, topic);
    }

    @Test
    public void testAvroEvent() throws IOException {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = this.prepareDefaultContext();
        context.put("useFlumeEventFormat", "true");
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        String msg = "test-avro-event";
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("topic", "custom-topic");
        headers.put("key", "custom-key");
        headers.put("header1", "test-avro-header");
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        Event event = EventBuilder.withBody((byte[])msg.getBytes(), headers);
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        try {
            Sink.Status status = kafkaSink.process();
            if (status == Sink.Status.BACKOFF) {
                Assert.fail((String)"Error Occurred");
            }
        }
        catch (EventDeliveryException status) {
            // empty catch block
        }
        String topic = "custom-topic";
        ConsumerRecords<String, String> recs = this.pollConsumerRecords(topic);
        Assert.assertNotNull(recs);
        Assert.assertTrue((recs.count() > 0 ? 1 : 0) != 0);
        ConsumerRecord consumerRecord = (ConsumerRecord)recs.iterator().next();
        ByteArrayInputStream in = new ByteArrayInputStream(((String)consumerRecord.value()).getBytes());
        BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder((InputStream)in, null);
        SpecificDatumReader reader = new SpecificDatumReader(AvroFlumeEvent.class);
        AvroFlumeEvent avroevent = (AvroFlumeEvent)reader.read(null, (Decoder)decoder);
        String eventBody = new String(avroevent.getBody().array(), Charsets.UTF_8);
        Map eventHeaders = avroevent.getHeaders();
        Assert.assertEquals((Object)msg, (Object)eventBody);
        Assert.assertEquals((Object)"custom-key", (Object)consumerRecord.key());
        Assert.assertEquals((Object)"test-avro-header", (Object)((CharSequence)eventHeaders.get(new Utf8("header1"))).toString());
        Assert.assertEquals((Object)"custom-key", (Object)((CharSequence)eventHeaders.get(new Utf8("key"))).toString());
    }

    private ConsumerRecords<String, String> pollConsumerRecords(String topic) {
        return this.pollConsumerRecords(topic, 20);
    }

    private ConsumerRecords<String, String> pollConsumerRecords(String topic, int maxIter) {
        ConsumerRecords<String, String> recs = null;
        for (int i = 0; i < maxIter && (recs = testUtil.getNextMessageFromConsumer(topic)).count() <= 0; ++i) {
            try {
                Thread.sleep(1000L);
                continue;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        return recs;
    }

    @Test
    public void testEmptyChannel() throws EventDeliveryException {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = this.prepareDefaultContext();
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        Sink.Status status = kafkaSink.process();
        if (status != Sink.Status.BACKOFF) {
            Assert.fail((String)"Error Occurred");
        }
        ConsumerRecords<String, String> recs = this.pollConsumerRecords("default-flume-topic", 2);
        Assert.assertNotNull(recs);
        Assert.assertEquals((long)recs.count(), (long)0L);
    }

    @Test
    public void testPartitionHeaderSet() throws Exception {
        this.doPartitionHeader(PartitionTestScenario.PARTITION_ID_HEADER_ONLY);
    }

    @Test
    public void testPartitionHeaderNotSet() throws Exception {
        this.doPartitionHeader(PartitionTestScenario.NO_PARTITION_HEADERS);
    }

    @Test
    public void testStaticPartitionAndHeaderSet() throws Exception {
        this.doPartitionHeader(PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID);
    }

    @Test
    public void testStaticPartitionHeaderNotSet() throws Exception {
        this.doPartitionHeader(PartitionTestScenario.STATIC_HEADER_ONLY);
    }

    @Test
    public void testPartitionHeaderMissing() throws Exception {
        this.doPartitionErrors(PartitionOption.NOTSET);
    }

    @Test
    public void testPartitionHeaderOutOfRange() throws Exception {
        KafkaSink kafkaSink = new KafkaSink();
        try {
            this.doPartitionErrors(PartitionOption.VALIDBUTOUTOFRANGE, (Sink)kafkaSink);
            Assert.fail();
        }
        catch (EventDeliveryException eventDeliveryException) {
            // empty catch block
        }
        SinkCounter sinkCounter = (SinkCounter)Whitebox.getInternalState((Object)kafkaSink, (String)"counter");
        Assert.assertEquals((long)1L, (long)sinkCounter.getEventWriteFail());
    }

    @Test(expected=EventDeliveryException.class)
    public void testPartitionHeaderInvalid() throws Exception {
        this.doPartitionErrors(PartitionOption.NOTANUMBER);
    }

    @Test
    public void testDefaultSettingsOnReConfigure() {
        String sampleProducerProp = "compression.type";
        String sampleProducerVal = "snappy";
        Context context = this.prepareDefaultContext();
        context.put("kafka.producer." + sampleProducerProp, sampleProducerVal);
        KafkaSink kafkaSink = new KafkaSink();
        Configurables.configure((Object)kafkaSink, (Context)context);
        Assert.assertEquals((Object)sampleProducerVal, (Object)kafkaSink.getKafkaProps().getProperty(sampleProducerProp));
        context = this.prepareDefaultContext();
        Configurables.configure((Object)kafkaSink, (Context)context);
        Assert.assertNull((Object)kafkaSink.getKafkaProps().getProperty(sampleProducerProp));
    }

    private void doPartitionErrors(PartitionOption option) throws Exception {
        this.doPartitionErrors(option, (Sink)new KafkaSink());
    }

    private void doPartitionErrors(PartitionOption option, Sink kafkaSink) throws Exception {
        Context context = this.prepareDefaultContext();
        context.put("partitionIdHeader", "partition-header");
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        String topic = this.findUnusedTopic();
        this.createTopic(topic, 5);
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("topic", topic);
        switch (option) {
            case VALIDBUTOUTOFRANGE: {
                headers.put("partition-header", "9");
                break;
            }
            case NOTSET: {
                headers.put("wrong-header", "2");
                break;
            }
            case NOTANUMBER: {
                headers.put("partition-header", "not-a-number");
                break;
            }
        }
        Event event = EventBuilder.withBody((byte[])String.valueOf(9).getBytes(), headers);
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        Sink.Status status = kafkaSink.process();
        Assert.assertEquals((Object)Sink.Status.READY, (Object)status);
        this.deleteTopic(topic);
    }

    private void doPartitionHeader(PartitionTestScenario scenario) throws Exception {
        int numPtns = 5;
        int numMsgs = 50;
        Integer staticPtn = 3;
        String topic = this.findUnusedTopic();
        this.createTopic(topic, 5);
        Context context = this.prepareDefaultContext();
        context.put("flumeBatchSize", "100");
        if (scenario == PartitionTestScenario.PARTITION_ID_HEADER_ONLY || scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID) {
            context.put("partitionIdHeader", "partition-header");
        }
        if (scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID || scenario == PartitionTestScenario.STATIC_HEADER_ONLY) {
            context.put("defaultPartitionId", staticPtn.toString());
        }
        KafkaSink kafkaSink = new KafkaSink();
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        HashMap partitionMap = new HashMap(5);
        for (int i = 0; i < 5; ++i) {
            partitionMap.put(i, new ArrayList());
        }
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        List orderedEvents = KafkaPartitionTestUtil.generateSkewedMessageList((PartitionTestScenario)scenario, (int)50, partitionMap, (int)5, (int)staticPtn);
        for (Event event : orderedEvents) {
            event.getHeaders().put("topic", topic);
            memoryChannel.put(event);
        }
        tx.commit();
        tx.close();
        Sink.Status status = kafkaSink.process();
        Assert.assertEquals((Object)Sink.Status.READY, (Object)status);
        Properties props = new Properties();
        props.put("bootstrap.servers", testUtil.getKafkaServerUrl());
        props.put("group.id", "group_1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        Map resultsMap = KafkaPartitionTestUtil.retrieveRecordsFromPartitions((String)topic, (int)5, (Properties)props);
        KafkaPartitionTestUtil.checkResultsAgainstSkew((PartitionTestScenario)scenario, partitionMap, (Map)resultsMap, (int)staticPtn, (int)50);
        memoryChannel.stop();
        kafkaSink.stop();
        this.deleteTopic(topic);
    }

    private Context prepareDefaultContext() {
        Context context = new Context();
        context.put("kafka.bootstrap.servers", testUtil.getKafkaServerUrl());
        context.put("flumeBatchSize", "1");
        return context;
    }

    private Sink.Status prepareAndSend(Context context, String msg) throws EventDeliveryException {
        KafkaSink kafkaSink = new KafkaSink();
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        Event event = EventBuilder.withBody((byte[])msg.getBytes());
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        return kafkaSink.process();
    }

    private void createTopic(String topicName, int numPartitions) {
        testUtil.createTopics(Collections.singletonList(topicName), numPartitions);
    }

    private void deleteTopic(String topicName) {
        testUtil.deleteTopic(topicName);
    }

    private String findUnusedTopic() {
        String newTopic = null;
        boolean topicFound = false;
        while (!topicFound) {
            newTopic = RandomStringUtils.randomAlphabetic((int)8);
            if (this.usedTopics.contains(newTopic)) continue;
            this.usedTopics.add(newTopic);
            topicFound = true;
        }
        return newTopic;
    }

    @Test
    public void testSslTopic() {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = this.prepareDefaultContext();
        context.put("kafka.bootstrap.servers", testUtil.getKafkaServerSslUrl());
        context.put("kafka.producer.security.protocol", "SSL");
        context.put("kafka.producer.ssl.truststore.location", "src/test/resources/truststorefile.jks");
        context.put("kafka.producer.ssl.truststore.password", "password");
        context.put("kafka.producer.ssl.disableTLSHostnameVerification", "true");
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        context = this.prepareDefaultContext();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        String msg = "default-topic-test";
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        Event event = EventBuilder.withBody((byte[])msg.getBytes());
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        try {
            Sink.Status status = kafkaSink.process();
            if (status == Sink.Status.BACKOFF) {
                Assert.fail((String)"Error Occurred");
            }
        }
        catch (EventDeliveryException eventDeliveryException) {
            // empty catch block
        }
        this.checkMessageArrived(msg, "default-flume-topic");
    }
}

