/*
 * Decompiled with CFR 0.152.
 */
package io.druid.firehose.kafka;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.InvalidMessageException;

public class KafkaEightFirehoseFactory
implements FirehoseFactory<ByteBufferInputRowParser> {
    private static final Logger log = new Logger(KafkaEightFirehoseFactory.class);
    @JsonProperty
    private final Properties consumerProps;
    @JsonProperty
    private final String feed;

    @JsonCreator
    public KafkaEightFirehoseFactory(@JsonProperty(value="consumerProps") Properties consumerProps, @JsonProperty(value="feed") String feed) {
        this.consumerProps = consumerProps;
        this.feed = feed;
    }

    public Firehose connect(ByteBufferInputRowParser firehoseParser) throws IOException {
        Sets.SetView newDimExclus = Sets.union((Set)firehoseParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), (Set)Sets.newHashSet((Object[])new String[]{"feed"}));
        final ByteBufferInputRowParser theParser = firehoseParser.withParseSpec(firehoseParser.getParseSpec().withDimensionsSpec(firehoseParser.getParseSpec().getDimensionsSpec().withDimensionExclusions((Set)newDimExclus)));
        final ConsumerConnector connector = Consumer.createJavaConsumerConnector((ConsumerConfig)new ConsumerConfig(this.consumerProps));
        Map streams = connector.createMessageStreams((Map)ImmutableMap.of((Object)this.feed, (Object)1));
        List streamList = (List)streams.get(this.feed);
        if (streamList == null || streamList.size() != 1) {
            return null;
        }
        KafkaStream stream = (KafkaStream)streamList.get(0);
        final ConsumerIterator iter = stream.iterator();
        return new Firehose(){

            public boolean hasMore() {
                return iter.hasNext();
            }

            public InputRow nextRow() {
                try {
                    byte[] message = (byte[])iter.next().message();
                    if (message == null) {
                        return null;
                    }
                    return theParser.parse((Object)ByteBuffer.wrap(message));
                }
                catch (InvalidMessageException e) {
                    log.error((Throwable)e, "Message failed its checksum and it is corrupt, will skip it", new Object[0]);
                    return null;
                }
            }

            public Runnable commit() {
                return new Runnable(){

                    @Override
                    public void run() {
                        log.info("committing offsets", new Object[0]);
                        connector.commitOffsets();
                    }
                };
            }

            public void close() throws IOException {
                connector.shutdown();
            }
        };
    }
}

