/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.kafka;

import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.kafka.Consumers;
import cz.o2.proxima.direct.kafka.ElementConsumer;
import cz.o2.proxima.direct.kafka.ElementSerializer;
import cz.o2.proxima.direct.kafka.KafkaAccessor;
import cz.o2.proxima.direct.kafka.KafkaConsumerFactory;
import cz.o2.proxima.direct.kafka.OffsetCommitter;
import cz.o2.proxima.direct.kafka.TopicOffset;
import cz.o2.proxima.direct.kafka.Utils;
import cz.o2.proxima.direct.time.MinimalPartitionWatermarkEstimator;
import cz.o2.proxima.functional.BiConsumer;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.base.Functions;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.PartitionInfo;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.storage.AbstractStorage;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.time.PartitionedWatermarkEstimator;
import cz.o2.proxima.time.WatermarkEstimator;
import cz.o2.proxima.time.WatermarkEstimatorFactory;
import cz.o2.proxima.time.WatermarkIdlePolicyFactory;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaLogReader
extends AbstractStorage
implements CommitLogReader {
    private static final Logger log = LoggerFactory.getLogger(KafkaLogReader.class);
    final KafkaAccessor accessor;
    private final Context context;
    private final long consumerPollInterval;
    private final long maxBytesPerSec;
    private final String topic;
    private final Map<String, Object> cfg;

    KafkaLogReader(KafkaAccessor accessor, Context context) {
        super(accessor.getEntityDescriptor(), accessor.getUri());
        this.accessor = accessor;
        this.context = context;
        this.consumerPollInterval = accessor.getConsumerPollInterval();
        this.maxBytesPerSec = accessor.getMaxBytesPerSec();
        this.topic = accessor.getTopic();
        this.cfg = accessor.getCfg();
        log.debug("Created {} for accessor {}", (Object)((Object)((Object)this)).getClass().getSimpleName(), (Object)accessor);
    }

    public ObserveHandle observe(String name, Position position, LogObserver observer) {
        return this.observeKafka(name, null, position, false, observer);
    }

    public ObserveHandle observePartitions(String name, @Nullable Collection<Partition> partitions, Position position, boolean stopAtCurrent, LogObserver observer) {
        return this.observeKafka(null, partitions, position, stopAtCurrent, observer);
    }

    public ObserveHandle observeBulk(String name, Position position, boolean stopAtCurrent, LogObserver observer) {
        return this.observeKafkaBulk(name, null, position, stopAtCurrent, observer);
    }

    public ObserveHandle observeBulkPartitions(String name, Collection<Partition> partitions, Position position, boolean stopAtCurrent, LogObserver observer) {
        return this.observeKafkaBulk(null, KafkaLogReader.createDefaultOffsets(partitions), position, stopAtCurrent, observer);
    }

    public ObserveHandle observeBulkOffsets(Collection<Offset> offsets, LogObserver observer) {
        return this.observeKafkaBulk(null, offsets, Position.CURRENT, false, observer);
    }

    public List<Partition> getPartitions() {
        List<PartitionInfo> partitions;
        try (KafkaConsumer<Object, Object> consumer = this.createConsumer();){
            partitions = consumer.partitionsFor(this.topic);
        }
        return partitions.stream().map(p -> Partition.of((int)p.partition())).collect(Collectors.toList());
    }

    @VisibleForTesting
    ObserveHandle observeKafka(@Nullable String name, @Nullable Collection<Partition> partitions, Position position, boolean stopAtCurrent, LogObserver observer) {
        try {
            return this.processConsumer(name, KafkaLogReader.createDefaultOffsets(partitions), position, stopAtCurrent, name != null, observer, this.context.getExecutorService());
        }
        catch (InterruptedException ex) {
            log.warn("Interrupted waiting for kafka observer to start", ex);
            Thread.currentThread().interrupt();
            throw new RuntimeException(ex);
        }
    }

    private ObserveHandle observeKafkaBulk(@Nullable String name, @Nullable Collection<Offset> offsets, Position position, boolean stopAtCurrent, LogObserver observer) {
        Preconditions.checkArgument((name != null || offsets != null ? 1 : 0) != 0, (Object)"Either name or offsets have to be non null");
        Preconditions.checkArgument((position != null ? 1 : 0) != 0, (Object)"Position cannot be null");
        try {
            return this.processConsumerBulk(name, offsets, position, stopAtCurrent, name != null, observer, this.context.getExecutorService());
        }
        catch (InterruptedException ex) {
            log.warn("Interrupted waiting for kafka observer to start", ex);
            Thread.currentThread().interrupt();
            throw new RuntimeException(ex);
        }
    }

    @VisibleForTesting
    ObserveHandle processConsumer(@Nullable String name, @Nullable Collection<Offset> offsets, Position position, boolean stopAtCurrent, boolean commitToKafka, LogObserver observer, ExecutorService executor) throws InterruptedException {
        Map kafkaCommitMap = Collections.synchronizedMap(new HashMap());
        OffsetCommitter<TopicPartition> offsetCommitter = this.createOffsetCommitter();
        BiConsumer & Serializable preWrite = (BiConsumer & Serializable)(tp, r) -> {
            long offset = r.offset();
            offsetCommitter.register((TopicPartition)tp, offset, 1, () -> {
                OffsetAndMetadata mtd = new OffsetAndMetadata(offset + 1L);
                if (commitToKafka) {
                    kafkaCommitMap.put(tp, mtd);
                }
            });
        };
        Consumers.OnlineConsumer<Object, Object> onlineConsumer = new Consumers.OnlineConsumer<Object, Object>(observer, offsetCommitter, (Factory<Map<TopicPartition, OffsetAndMetadata>>)(Factory & Serializable)() -> {
            Map map = kafkaCommitMap;
            synchronized (map) {
                HashMap clone = new HashMap(kafkaCommitMap);
                kafkaCommitMap.clear();
                return clone;
            }
        });
        AtomicReference<ObserveHandle> handle = new AtomicReference<ObserveHandle>();
        this.submitConsumerWithObserver(name, offsets, position, stopAtCurrent, (BiConsumer<TopicPartition, ConsumerRecord<Object, Object>>)preWrite, onlineConsumer, executor, handle);
        return KafkaLogReader.dynamicHandle(handle);
    }

    @VisibleForTesting
    ObserveHandle processConsumerBulk(@Nullable String name, @Nullable Collection<Offset> offsets, Position position, boolean stopAtCurrent, boolean commitToKafka, LogObserver observer, ExecutorService executor) throws InterruptedException {
        Map kafkaCommitMap = Collections.synchronizedMap(new HashMap());
        Consumers.BulkConsumer<Object, Object> bulkConsumer = new Consumers.BulkConsumer<Object, Object>(this.topic, observer, (BiConsumer<TopicPartition, Long>)(BiConsumer & Serializable)(tp, o) -> {
            if (commitToKafka) {
                OffsetAndMetadata off = new OffsetAndMetadata((long)o);
                kafkaCommitMap.put(tp, off);
            }
        }, (Factory<Map<TopicPartition, OffsetAndMetadata>>)(Factory & Serializable)() -> {
            Map map = kafkaCommitMap;
            synchronized (map) {
                HashMap clone = new HashMap(kafkaCommitMap);
                kafkaCommitMap.clear();
                return clone;
            }
        }, kafkaCommitMap::clear);
        AtomicReference<ObserveHandle> handle = new AtomicReference<ObserveHandle>();
        this.submitConsumerWithObserver(name, offsets, position, stopAtCurrent, (BiConsumer<TopicPartition, ConsumerRecord<Object, Object>>)(BiConsumer & Serializable)(tp, r) -> {}, bulkConsumer, executor, handle);
        return KafkaLogReader.dynamicHandle(handle);
    }

    private void submitConsumerWithObserver(@Nullable String name, @Nullable Collection<Offset> offsets, Position position, boolean stopAtCurrent, BiConsumer<TopicPartition, ConsumerRecord<Object, Object>> preWrite, ElementConsumer<Object, Object> consumer, ExecutorService executor, AtomicReference<ObserveHandle> handle) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicBoolean completed = new AtomicBoolean();
        AtomicBoolean shutdown = new AtomicBoolean();
        List seekOffsets = Collections.synchronizedList(new ArrayList());
        executor.submit(() -> {
            block21: {
                handle.set(this.createObserveHandle(shutdown, seekOffsets, consumer, latch));
                AtomicReference<Object> watermarkEstimator = new AtomicReference<Object>(null);
                ConcurrentHashMap<Integer, Integer> emptyPollCount = new ConcurrentHashMap<Integer, Integer>();
                Duration pollDuration = Duration.ofMillis(this.consumerPollInterval);
                AtomicReference<KafkaConsumer<Object, Object>> consumerRef = new AtomicReference<KafkaConsumer<Object, Object>>();
                consumer.onStart();
                ConsumerRebalanceListener listener = this.listener(name, consumerRef, consumer, emptyPollCount, watermarkEstimator);
                ElementSerializer<Object, Object> serializer = this.accessor.getSerializer();
                try {
                    KafkaConsumer<Object, Object> kafka = this.createConsumer(name, offsets, name != null ? listener : null, position);
                    Throwable throwable = null;
                    try {
                        Map<TopicPartition, Long> endOffsets;
                        consumerRef.set(kafka);
                        ConsumerRecords<Object, Object> poll = kafka.poll(pollDuration);
                        Map<TopicPartition, Long> map = endOffsets = stopAtCurrent ? this.findNonEmptyEndOffsets(kafka) : null;
                        if (log.isDebugEnabled()) {
                            log.debug("End offsets of current assignment {}: {}", (Object)kafka.assignment(), (Object)endOffsets);
                        }
                        if (offsets != null) {
                            listener.onPartitionsAssigned(kafka.assignment());
                        }
                        latch.countDown();
                        AtomicReference<Throwable> error = new AtomicReference<Throwable>();
                        do {
                            if (poll.isEmpty()) {
                                Optional.ofNullable(watermarkEstimator.get()).ifPresent(consumer::onIdle);
                            }
                            this.logConsumerWatermark(name, offsets, watermarkEstimator, poll.count());
                            poll = this.seekToNewOffsetsIfNeeded(seekOffsets, consumer, watermarkEstimator, kafka, poll);
                            long bytesPerPoll = this.maxBytesPerSec < Long.MAX_VALUE ? Math.max(1L, this.maxBytesPerSec / (1000L * this.consumerPollInterval)) : Long.MAX_VALUE;
                            long bytesPolled = 0L;
                            emptyPollCount.replaceAll((k, v) -> v + 1);
                            for (ConsumerRecord<Object, Object> consumerRecord : poll) {
                                Long end;
                                bytesPolled += (long)(consumerRecord.serializedKeySize() + consumerRecord.serializedValueSize());
                                TopicPartition tp = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                                emptyPollCount.put(tp.partition(), 0);
                                preWrite.accept((Object)tp, consumerRecord);
                                StreamElement ingest = serializer.read(consumerRecord, this.getEntityDescriptor());
                                if (ingest != null) {
                                    ((PartitionedWatermarkEstimator)watermarkEstimator.get()).update(tp.partition(), ingest);
                                }
                                boolean cont = consumer.consumeWithConfirm(ingest, tp, consumerRecord.offset(), watermarkEstimator.get(), error::set);
                                if (!cont) {
                                    log.info("Terminating consumption by request");
                                    completed.set(true);
                                    break;
                                }
                                if (!stopAtCurrent || (end = endOffsets.get(tp)) == null || end - 1L > consumerRecord.offset()) continue;
                                log.debug("Reached end of partition {} at offset {}", (Object)tp, (Object)consumerRecord.offset());
                                endOffsets.remove(tp);
                            }
                            this.increaseWatermarkOnEmptyPolls(emptyPollCount, watermarkEstimator);
                            this.flushCommits(kafka, consumer);
                            this.rethrowErrorIfPresent(name, error);
                            this.terminateIfConsumed(stopAtCurrent, kafka, endOffsets, completed);
                            this.waitToReduceThroughput(bytesPolled, bytesPerPoll);
                            poll = kafka.poll(pollDuration);
                        } while (!shutdown.get() && !completed.get() && !Thread.currentThread().isInterrupted());
                        if (log.isDebugEnabled()) {
                            log.debug("Terminating poll loop for assignment {}: shutdown: {}, completed: {}, interrupted: {}", kafka.assignment(), shutdown.get(), completed.get(), Thread.currentThread().isInterrupted());
                        }
                        if (!Thread.currentThread().isInterrupted()) {
                            consumer.onCompleted();
                        } else {
                            consumer.onCancelled();
                        }
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (kafka != null) {
                            KafkaLogReader.$closeResource(throwable, kafka);
                        }
                    }
                }
                catch (InterruptedException ex) {
                    log.info("Interrupted while polling kafka. Terminating consumption.", ex);
                    Thread.currentThread().interrupt();
                    consumer.onCancelled();
                }
                catch (Throwable err) {
                    log.error("Error processing consumer {}", (Object)name, (Object)err);
                    if (!consumer.onError(err)) break block21;
                    try {
                        this.submitConsumerWithObserver(name, offsets, position, stopAtCurrent, preWrite, consumer, executor, handle);
                    }
                    catch (InterruptedException ex) {
                        log.warn("Interrupted while restarting observer");
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(ex);
                    }
                }
            }
        });
        latch.await();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ConsumerRecords<Object, Object> seekToNewOffsetsIfNeeded(List<TopicOffset> seekOffsets, ElementConsumer<Object, Object> consumer, AtomicReference<PartitionedWatermarkEstimator> watermarkEstimator, KafkaConsumer<Object, Object> kafka, ConsumerRecords<Object, Object> poll) {
        List<Offset> list = seekOffsets;
        synchronized (list) {
            if (!seekOffsets.isEmpty()) {
                List<Offset> toSeek = seekOffsets;
                Utils.seekToOffsets(this.topic, toSeek, kafka);
                consumer.onAssign(kafka, kafka.assignment().stream().map(tp -> new TopicOffset(tp.partition(), kafka.position((TopicPartition)tp), ((PartitionedWatermarkEstimator)watermarkEstimator.get()).getWatermark())).collect(Collectors.toList()));
                log.info("Seeked consumer to offsets {} as requested", (Object)seekOffsets);
                seekOffsets.clear();
                return ConsumerRecords.empty();
            }
        }
        return poll;
    }

    private void logConsumerWatermark(@Nullable String name, @Nullable Collection<Offset> offsets, AtomicReference<PartitionedWatermarkEstimator> watermarkEstimator, int polledCount) {
        if (log.isDebugEnabled()) {
            log.debug("Current watermark of consumer name {} with offsets {} on {} poll'd records is {}", name, offsets, polledCount, Optional.ofNullable(watermarkEstimator.get()).map(PartitionedWatermarkEstimator::getWatermark).orElse(Long.MIN_VALUE));
        }
    }

    private void rethrowErrorIfPresent(@Nullable String consumerName, AtomicReference<Throwable> error) {
        Throwable errorThrown = error.getAndSet(null);
        if (errorThrown != null) {
            log.warn("Error during processing {}", (Object)consumerName, (Object)errorThrown);
            throw new RuntimeException(errorThrown);
        }
    }

    private void terminateIfConsumed(boolean stopAtCurrent, KafkaConsumer<?, ?> consumer, Map<TopicPartition, Long> endOffsets, AtomicBoolean completed) {
        if (stopAtCurrent && endOffsets.isEmpty()) {
            log.info("Assignment {} reached end of current data. Terminating consumption.", (Object)consumer.assignment());
            completed.set(true);
        }
    }

    private void waitToReduceThroughput(long bytesPolled, long bytesPerPoll) throws InterruptedException {
        long sleepDuration = bytesPolled * this.consumerPollInterval / bytesPerPoll;
        if (sleepDuration > 0L) {
            TimeUnit.MILLISECONDS.sleep(sleepDuration);
        }
    }

    private void flushCommits(KafkaConsumer<Object, Object> kafka, ElementConsumer<?, ?> consumer) {
        Map<TopicPartition, OffsetAndMetadata> commitMapClone = consumer.prepareOffsetsForCommit();
        if (!commitMapClone.isEmpty()) {
            kafka.commitSync(commitMapClone);
        }
    }

    private void increaseWatermarkOnEmptyPolls(Map<Integer, Integer> emptyPollCount, AtomicReference<PartitionedWatermarkEstimator> watermarkEstimator) {
        int numEmptyPolls = emptyPollCount.size();
        emptyPollCount.entrySet().stream().filter(e -> (Integer)e.getValue() >= numEmptyPolls).forEach(e -> ((PartitionedWatermarkEstimator)watermarkEstimator.get()).idle(((Integer)e.getKey()).intValue()));
    }

    private ObserveHandle createObserveHandle(final AtomicBoolean shutdown, final List<TopicOffset> seekOffsets, final ElementConsumer<?, ?> consumer, final CountDownLatch latch) {
        return new ObserveHandle(){

            public void close() {
                shutdown.set(true);
            }

            public List<Offset> getCommittedOffsets() {
                return consumer.getCommittedOffsets();
            }

            public void resetOffsets(List<Offset> offsets) {
                seekOffsets.addAll(offsets);
            }

            public List<Offset> getCurrentOffsets() {
                return consumer.getCurrentOffsets();
            }

            public void waitUntilReady() throws InterruptedException {
                latch.await();
            }
        };
    }

    private Map<TopicPartition, Long> findNonEmptyEndOffsets(KafkaConsumer<Object, Object> kafka) {
        Set<TopicPartition> assignment = kafka.assignment();
        Map<TopicPartition, Long> beginning = kafka.beginningOffsets(assignment);
        return kafka.endOffsets(assignment).entrySet().stream().filter(entry -> (Long)beginning.get(entry.getKey()) < (Long)entry.getValue()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private KafkaConsumer<Object, Object> createConsumer() {
        return this.createConsumer(UUID.randomUUID().toString(), null, null, Position.NEWEST);
    }

    @VisibleForTesting
    KafkaConsumer<Object, Object> createConsumer(@Nullable String name, @Nullable Collection<Offset> offsets, @Nullable ConsumerRebalanceListener listener, Position position) {
        KafkaConsumer<Object, Object> consumer;
        Preconditions.checkArgument((name != null || listener == null ? 1 : 0) != 0, (Object)"Please use either named group (with listener) or offsets without listener");
        KafkaConsumerFactory factory = this.accessor.createConsumerFactory();
        if ("".equals(name)) {
            throw new IllegalArgumentException("Consumer group cannot be empty string");
        }
        if (name != null) {
            consumer = factory.create(name, listener);
        } else if (offsets != null) {
            List<Partition> partitions = offsets.stream().map(Offset::getPartition).collect(Collectors.toList());
            consumer = factory.create(partitions);
        } else {
            throw new IllegalArgumentException("Need either name or offsets to observe");
        }
        this.validateTopic(consumer, this.topic);
        if (position == Position.OLDEST) {
            if (offsets == null) {
                if (consumer.assignment().isEmpty()) {
                    consumer.poll(Duration.ofMillis(this.accessor.getAssignmentTimeoutMillis()));
                }
                Set<TopicPartition> assignment = consumer.assignment();
                log.info("Seeking consumer name {} to beginning of partitions {}", (Object)name, (Object)assignment);
                consumer.seekToBeginning(assignment);
            } else {
                List<TopicPartition> tps = offsets.stream().map(p -> new TopicPartition(this.topic, p.getPartition().getId())).collect(Collectors.toList());
                log.info("Seeking given partitions {} to beginning", (Object)tps);
                consumer.seekToBeginning(tps);
            }
        } else if (position == Position.CURRENT) {
            log.info("Seeking to given offsets {}", (Object)offsets);
            Utils.seekToOffsets(this.topic, offsets, consumer);
        } else {
            log.info("Starting to process kafka partitions from newest data");
        }
        return consumer;
    }

    @VisibleForTesting
    void validateTopic(KafkaConsumer<?, ?> consumer, String topicToValidate) {
        List<PartitionInfo> partitions = consumer.partitionsFor(topicToValidate);
        Preconditions.checkArgument((partitions != null && !partitions.isEmpty() ? 1 : 0) != 0, (String)"Received null or empty partitions for topic [%s]. Please check that the topic exists and has at least one partition.", (Object)topicToValidate);
    }

    public boolean hasExternalizableOffsets() {
        return true;
    }

    public CommitLogReader.Factory asFactory() {
        KafkaAccessor accessor = this.accessor;
        Context context = this.context;
        return (CommitLogReader.Factory & Serializable)repo -> new KafkaLogReader(accessor, context);
    }

    private static Collection<Offset> createDefaultOffsets(Collection<Partition> partitions) {
        if (partitions != null) {
            return partitions.stream().map(p -> new TopicOffset(p.getId(), -1L, Long.MIN_VALUE)).collect(Collectors.toList());
        }
        return null;
    }

    private static ObserveHandle dynamicHandle(final AtomicReference<ObserveHandle> proxy) {
        return new ObserveHandle(){

            public void close() {
                ((ObserveHandle)proxy.get()).close();
            }

            public List<Offset> getCommittedOffsets() {
                return ((ObserveHandle)proxy.get()).getCommittedOffsets();
            }

            public void resetOffsets(List<Offset> offsets) {
                ((ObserveHandle)proxy.get()).resetOffsets(offsets);
            }

            public List<Offset> getCurrentOffsets() {
                return ((ObserveHandle)proxy.get()).getCurrentOffsets();
            }

            public void waitUntilReady() throws InterruptedException {
                ((ObserveHandle)proxy.get()).waitUntilReady();
            }
        };
    }

    private OffsetCommitter<TopicPartition> createOffsetCommitter() {
        return new OffsetCommitter<TopicPartition>(this.accessor.getLogStaleCommitIntervalNs(), this.accessor.getAutoCommitIntervalNs());
    }

    private ConsumerRebalanceListener listener(final String name, final AtomicReference<KafkaConsumer<Object, Object>> kafka, final ElementConsumer<Object, Object> consumer, final Map<Integer, Integer> emptyPollCount, final AtomicReference<PartitionedWatermarkEstimator> watermarkEstimator) {
        return new ConsumerRebalanceListener(){

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> parts) {
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> parts) {
                List partitions = parts.stream().map(TopicPartition::partition).sorted().collect(Collectors.toList());
                emptyPollCount.clear();
                for (Integer partition : partitions) {
                    emptyPollCount.put(partition, 0);
                }
                if (partitions.isEmpty()) {
                    watermarkEstimator.set(KafkaLogReader.createWatermarkEstimatorForEmptyParts());
                } else {
                    watermarkEstimator.set(new MinimalPartitionWatermarkEstimator(partitions.stream().collect(Collectors.toMap(Functions.identity(), item -> this.createWatermarkEstimator()))));
                }
                Optional.ofNullable((KafkaConsumer)kafka.get()).ifPresent(c -> consumer.onAssign(c, (Collection<TopicOffset>)(name != null ? this.getCommittedTopicOffsets(parts, (KafkaConsumer<Object, Object>)c) : this.getCurrentTopicOffsets(parts, (KafkaConsumer<Object, Object>)c))));
            }

            List<TopicOffset> getCurrentTopicOffsets(Collection<TopicPartition> parts, KafkaConsumer<Object, Object> c) {
                return parts.stream().map(tp -> new TopicOffset(tp.partition(), c.position((TopicPartition)tp), ((PartitionedWatermarkEstimator)watermarkEstimator.get()).getWatermark())).collect(Collectors.toList());
            }

            List<TopicOffset> getCommittedTopicOffsets(Collection<TopicPartition> parts, KafkaConsumer<Object, Object> c) {
                HashMap<TopicPartition, OffsetAndMetadata> committed = new HashMap<TopicPartition, OffsetAndMetadata>(c.committed(new HashSet<TopicPartition>(parts)));
                for (TopicPartition tp : parts) {
                    committed.putIfAbsent(tp, null);
                }
                return committed.entrySet().stream().map(entry -> {
                    long offset = entry.getValue() == null ? 0L : ((OffsetAndMetadata)entry.getValue()).offset();
                    return new TopicOffset(((TopicPartition)entry.getKey()).partition(), offset, ((PartitionedWatermarkEstimator)watermarkEstimator.get()).getWatermark());
                }).collect(Collectors.toList());
            }

            private WatermarkEstimator createWatermarkEstimator() {
                WatermarkIdlePolicyFactory idlePolicyFactory = KafkaLogReader.this.accessor.getWatermarkConfiguration().getWatermarkIdlePolicyFactory();
                WatermarkEstimatorFactory estimatorFactory = KafkaLogReader.this.accessor.getWatermarkConfiguration().getWatermarkEstimatorFactory();
                return estimatorFactory.create(KafkaLogReader.this.cfg, idlePolicyFactory);
            }
        };
    }

    private static PartitionedWatermarkEstimator createWatermarkEstimatorForEmptyParts() {
        return (PartitionedWatermarkEstimator & Serializable)() -> Long.MAX_VALUE;
    }

    public KafkaAccessor getAccessor() {
        return this.accessor;
    }

    public Context getContext() {
        return this.context;
    }
}

