/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.stream.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamAdapter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;

public class KafkaStreamer<K, V>
extends StreamAdapter<ConsumerRecord, K, V> {
    private static final long DFLT_TIMEOUT = 100L;
    private IgniteLogger log;
    private ExecutorService executor;
    private List<String> topics;
    private int threads;
    private Properties consumerCfg;
    private long timeout = 100L;
    private final List<ConsumerTask> consumerTasks = new ArrayList<ConsumerTask>();

    public void setTopic(List<String> topics) {
        this.topics = topics;
    }

    public void setThreads(int threads) {
        this.threads = threads;
    }

    public void setConsumerConfig(Properties consumerCfg) {
        this.consumerCfg = consumerCfg;
    }

    public void setTimeout(long timeout) {
        A.ensure((timeout > 0L ? 1 : 0) != 0, (String)"timeout > 0");
        this.timeout = timeout;
    }

    public void start() {
        A.notNull((Object)this.getStreamer(), (String)"streamer");
        A.notNull((Object)this.getIgnite(), (String)"ignite");
        A.notNull(this.topics, (String)"topics");
        A.notNull((Object)this.consumerCfg, (String)"kafka consumer config");
        A.ensure((this.threads > 0 ? 1 : 0) != 0, (String)"threads > 0");
        A.ensure((null != this.getSingleTupleExtractor() || null != this.getMultipleTupleExtractor() ? 1 : 0) != 0, (String)"Extractor must be configured");
        this.log = this.getIgnite().log();
        this.executor = Executors.newFixedThreadPool(this.threads);
        IntStream.range(0, this.threads).forEach(i -> this.consumerTasks.add(new ConsumerTask(this.consumerCfg)));
        for (ConsumerTask task : this.consumerTasks) {
            this.executor.submit(task);
        }
    }

    public void stop() {
        block5: {
            for (ConsumerTask task : this.consumerTasks) {
                task.stop();
            }
            if (this.executor != null) {
                this.executor.shutdown();
                try {
                    if (!this.executor.awaitTermination(5000L, TimeUnit.MILLISECONDS) && this.log.isDebugEnabled()) {
                        this.log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
                    }
                }
                catch (InterruptedException ignored) {
                    if (!this.log.isDebugEnabled()) break block5;
                    this.log.debug("Interrupted during shutdown, exiting uncleanly.");
                }
            }
        }
    }

    class ConsumerTask
    implements Callable<Void> {
        private final KafkaConsumer<?, ?> consumer;
        private volatile boolean stopped;

        public ConsumerTask(Properties consumerCfg) {
            this.consumer = new KafkaConsumer(consumerCfg);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() {
            this.consumer.subscribe((Collection)KafkaStreamer.this.topics);
            try {
                while (!this.stopped) {
                    for (ConsumerRecord record : this.consumer.poll(KafkaStreamer.this.timeout)) {
                        try {
                            KafkaStreamer.this.addMessage(record);
                        }
                        catch (Exception e) {
                            U.error((IgniteLogger)KafkaStreamer.this.log, (Object)("Record is ignored due to an error [record = " + record + ']'), (Throwable)e);
                        }
                    }
                }
            }
            catch (WakeupException we) {
                if (KafkaStreamer.this.log.isInfoEnabled()) {
                    KafkaStreamer.this.log.info("Consumer is being stopped.");
                }
            }
            catch (KafkaException ke) {
                KafkaStreamer.this.log.error("Kafka error", (Throwable)ke);
            }
            finally {
                this.consumer.close();
            }
            return null;
        }

        public void stop() {
            this.stopped = true;
            if (this.consumer != null) {
                this.consumer.wakeup();
            }
        }
    }
}

