/*
 * Decompiled with CFR 0.152.
 */
package cascading.local.tap.kafka.commit;

import cascading.local.tap.kafka.commit.CommitListener;
import cascading.local.tap.kafka.commit.CommittingRebalanceListener;
import cascading.local.tap.kafka.commit.OffsetRecorderIterator;
import cascading.local.tap.kafka.decorator.ForwardingConsumer;
import cascading.local.tap.kafka.decorator.ForwardingConsumerRecords;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommittingConsumer<K, V>
extends ForwardingConsumer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(CommittingConsumer.class);
    Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
    CommitListener listener = new CommitListener(){

        @Override
        public void onClose(Consumer consumer, Map<TopicPartition, OffsetAndMetadata> offsets) {
            LOG.info("committing offsets on close");
        }

        @Override
        public void onRevoke(Consumer consumer, Map<TopicPartition, OffsetAndMetadata> offsets) {
            LOG.info("committing offsets on partition revoke");
        }

        @Override
        public boolean onFail(Consumer consumer, RuntimeException exception, Map<TopicPartition, OffsetAndMetadata> offsets) {
            LOG.error("failed committing offsets", (Throwable)exception);
            return true;
        }
    };

    public CommittingConsumer(Properties properties, CommitListener listener) {
        super(properties);
        this.listener = listener;
    }

    public CommittingConsumer(Properties properties) {
        super(properties);
    }

    @Override
    protected KafkaConsumer<K, V> createKafkaConsumerInstance(Properties properties) {
        boolean autoCommitEnabled = Boolean.parseBoolean(properties.getProperty("enable.auto.commit"));
        if (autoCommitEnabled) {
            LOG.info("disabling kafka auto-commit");
        }
        properties.setProperty("enable.auto.commit", "false");
        return super.createKafkaConsumerInstance(properties);
    }

    @Override
    public void subscribe(Collection<String> collection) {
        super.subscribe(collection, new CommittingRebalanceListener(this.getConsumer(), this.listener, this.currentOffsets));
    }

    @Override
    public void subscribe(Collection<String> collection, ConsumerRebalanceListener consumerRebalanceListener) {
        super.subscribe(collection, new CommittingRebalanceListener(consumerRebalanceListener, this.getConsumer(), this.listener, this.currentOffsets));
    }

    @Override
    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
        super.subscribe(pattern, new CommittingRebalanceListener(consumerRebalanceListener, this.getConsumer(), this.listener, this.currentOffsets));
    }

    @Override
    public ConsumerRecords<K, V> poll(long l) {
        return new ForwardingConsumerRecords<K, V>(super.poll(l)){

            @Override
            public Iterator<ConsumerRecord<K, V>> iterator() {
                return new OffsetRecorderIterator(CommittingConsumer.this.currentOffsets, super.iterator());
            }
        };
    }

    @Override
    public void close() {
        try {
            this.listener.onClose(this.getConsumer(), this.currentOffsets);
            this.getConsumer().commitSync(this.currentOffsets);
        }
        catch (RuntimeException exception) {
            if (this.listener.onFail(this.getConsumer(), exception, this.currentOffsets)) {
                throw exception;
            }
        }
        finally {
            super.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(long l, TimeUnit timeUnit) {
        try {
            this.listener.onClose(this.getConsumer(), this.currentOffsets);
            this.getConsumer().commitSync(this.currentOffsets);
        }
        catch (RuntimeException exception) {
            if (this.listener.onFail(this.getConsumer(), exception, this.currentOffsets)) {
                throw exception;
            }
        }
        finally {
            super.close(l, timeUnit);
        }
    }
}

