/*
 * Decompiled with CFR 0.152.
 */
package com.espertech.esperio.kafka;

import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPStatement;
import com.espertech.esperio.kafka.EsperIOKafkaOutputFlowControllerByAnnotatedStmt;
import com.espertech.esperio.kafka.SupportAwaitUtil;
import com.espertech.esperio.kafka.SupportBean;
import com.espertech.esperio.kafka.SupportConstants;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * Exception performing whole class analysis ignored.
 */
public class TestKafkaOutput
extends TestCase {
    private static final Logger log = LoggerFactory.getLogger(TestKafkaOutput.class);

    public void testOutput() {
        Properties adapterProps = SupportConstants.getOutputPluginProps();
        adapterProps.put("esperio.kafka.topics", "esperio_regression_output_t1");
        adapterProps.put("key.serializer", StringSerializer.class.getName());
        adapterProps.put("value.serializer", StringSerializer.class.getName());
        adapterProps.put("esperio.kafka.output.flowcontroller", EsperIOKafkaOutputFlowControllerByAnnotatedStmt.class.getName());
        EPServiceProvider epService = SupportConstants.getEngineWKafkaOutput((String)this.getClass().getSimpleName(), (Properties)adapterProps);
        epService.getEPAdministrator().getConfiguration().addEventType("SupportBean", SupportBean.class);
        EPStatement statement = epService.getEPAdministrator().createEPL("@name('first') @KafkaOutputDefault select * from SupportBean");
        KafkaConsumer consumer = this.initConsumer();
        List<TopicPartition> topicPartitions = Collections.singletonList(new TopicPartition("esperio_regression_output_t1", 0));
        consumer.assign(topicPartitions);
        int numMessages = this.countMessages(topicPartitions, consumer);
        consumer.seek((TopicPartition)topicPartitions.iterator().next(), 0L);
        this.sendAndAwait(epService, consumer, "E1");
        this.sendAndAwait(epService, consumer, "E2");
        int numMessagesAfter = this.countMessages(topicPartitions, consumer);
        statement.destroy();
        epService.getEPRuntime().sendEvent(new SupportBean("XXX", -1));
        TestKafkaOutput.assertEquals((int)2, (int)(numMessagesAfter - numMessages));
        epService.getEPAdministrator().createEPL("@name('second') @KafkaOutputDefault select stringProp from SupportBean");
        this.sendAndAwait(epService, consumer, "E3");
        epService.destroy();
    }

    private int countMessages(Collection<TopicPartition> topicPartitions, KafkaConsumer<String, String> consumer) {
        consumer.seek(topicPartitions.iterator().next(), 0L);
        int count = 0;
        boolean more = true;
        while (more) {
            ConsumerRecords rows = consumer.poll(1000L);
            count += rows.count();
            more = !rows.isEmpty();
        }
        return count;
    }

    private void sendAndAwait(EPServiceProvider epService, KafkaConsumer<String, String> consumer, String eventId) {
        String uniqueId = eventId + "___" + UUID.randomUUID().toString();
        epService.getEPRuntime().sendEvent(new SupportBean(uniqueId, 10));
        SupportAwaitUtil.awaitOrFail((long)10L, (TimeUnit)TimeUnit.SECONDS, (String)"failed to receive expected event", () -> {
            ConsumerRecords rows = consumer.poll(1000L);
            Iterator it = rows.iterator();
            boolean found = false;
            while (it.hasNext()) {
                ConsumerRecord row = (ConsumerRecord)it.next();
                log.info("Received: {}", row.value());
                if (!((String)row.value()).contains(uniqueId)) continue;
                found = true;
            }
            return found ? Boolean.valueOf(true) : null;
        });
    }

    private KafkaConsumer<String, String> initConsumer() {
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "127.0.0.1:9092");
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", StringDeserializer.class.getName());
        consumerProps.put("group.id", "esperio_regression_output_t1__mygroup");
        return new KafkaConsumer(consumerProps);
    }
}

