/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.stream.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamAdapter;

public class KafkaStreamer<T, K, V>
extends StreamAdapter<T, K, V> {
    private static final long DFLT_RETRY_TIMEOUT = 10000L;
    private IgniteLogger log;
    private ExecutorService executor;
    private String topic;
    private int threads;
    private ConsumerConfig consumerCfg;
    private Decoder<K> keyDecoder;
    private Decoder<V> valDecoder;
    private ConsumerConnector consumer;
    private long retryTimeout = 10000L;
    private volatile boolean stopped;

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public void setThreads(int threads) {
        this.threads = threads;
    }

    public void setConsumerConfig(ConsumerConfig consumerCfg) {
        this.consumerCfg = consumerCfg;
    }

    public void setKeyDecoder(Decoder<K> keyDecoder) {
        this.keyDecoder = keyDecoder;
    }

    public void setValueDecoder(Decoder<V> valDecoder) {
        this.valDecoder = valDecoder;
    }

    public void setRetryTimeout(long retryTimeout) {
        A.ensure((retryTimeout > 0L ? 1 : 0) != 0, (String)"retryTimeout > 0");
        this.retryTimeout = retryTimeout;
    }

    public void start() {
        A.notNull((Object)this.getStreamer(), (String)"streamer");
        A.notNull((Object)this.getIgnite(), (String)"ignite");
        A.notNull((Object)this.topic, (String)"topic");
        A.notNull(this.keyDecoder, (String)"key decoder");
        A.notNull(this.valDecoder, (String)"value decoder");
        A.notNull((Object)this.consumerCfg, (String)"kafka consumer config");
        A.ensure((this.threads > 0 ? 1 : 0) != 0, (String)"threads > 0");
        this.log = this.getIgnite().log();
        this.consumer = Consumer.createJavaConsumerConnector((ConsumerConfig)this.consumerCfg);
        HashMap<String, Integer> topicCntMap = new HashMap<String, Integer>();
        topicCntMap.put(this.topic, this.threads);
        Map consumerMap = this.consumer.createMessageStreams(topicCntMap, this.keyDecoder, this.valDecoder);
        List streams = (List)consumerMap.get(this.topic);
        this.executor = Executors.newFixedThreadPool(this.threads);
        this.stopped = false;
        for (final KafkaStream stream : streams) {
            this.executor.submit(new Runnable(){

                @Override
                public void run() {
                    while (!KafkaStreamer.this.stopped) {
                        try {
                            ConsumerIterator it = stream.iterator();
                            while (it.hasNext() && !KafkaStreamer.this.stopped) {
                                MessageAndMetadata msg = it.next();
                                try {
                                    KafkaStreamer.this.getStreamer().addData(msg.key(), msg.message());
                                }
                                catch (Exception e) {
                                    U.error((IgniteLogger)KafkaStreamer.this.log, (Object)("Message is ignored due to an error [msg=" + msg + ']'), (Throwable)e);
                                }
                            }
                        }
                        catch (Exception e) {
                            U.error((IgniteLogger)KafkaStreamer.this.log, (Object)("Message can't be consumed from stream. Retry after " + KafkaStreamer.this.retryTimeout + " ms."), (Throwable)e);
                            try {
                                Thread.sleep(KafkaStreamer.this.retryTimeout);
                            }
                            catch (InterruptedException interruptedException) {}
                        }
                    }
                }
            });
        }
    }

    public void stop() {
        block5: {
            this.stopped = true;
            if (this.consumer != null) {
                this.consumer.shutdown();
            }
            if (this.executor != null) {
                this.executor.shutdown();
                try {
                    if (!this.executor.awaitTermination(5000L, TimeUnit.MILLISECONDS) && this.log.isDebugEnabled()) {
                        this.log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
                    }
                }
                catch (InterruptedException e) {
                    if (!this.log.isDebugEnabled()) break block5;
                    this.log.debug("Interrupted during shutdown, exiting uncleanly.");
                }
            }
        }
    }
}

