/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.kafka;

import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserverUtils;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.kafka.ElementConsumer;
import cz.o2.proxima.direct.kafka.OffsetCommitter;
import cz.o2.proxima.direct.kafka.TopicOffset;
import cz.o2.proxima.direct.kafka.Utils;
import cz.o2.proxima.functional.BiConsumer;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.time.WatermarkSupplier;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Consumers {
    private static final Logger log = LoggerFactory.getLogger(Consumers.class);

    private Consumers() {
    }

    static final class BulkConsumer<K, V>
    extends ConsumerBase<K, V> {
        private final String topic;
        private final LogObserver observer;
        private final BiConsumer<TopicPartition, Long> commit;
        private final Factory<Map<TopicPartition, OffsetAndMetadata>> prepareCommit;
        private final Runnable onStart;

        BulkConsumer(String topic, LogObserver observer, BiConsumer<TopicPartition, Long> commit, Factory<Map<TopicPartition, OffsetAndMetadata>> prepareCommit, Runnable onStart) {
            this.topic = topic;
            this.observer = observer;
            this.commit = commit;
            this.prepareCommit = prepareCommit;
            this.onStart = onStart;
        }

        @Override
        public boolean consumeWithConfirm(@Nullable StreamElement element, TopicPartition tp, long offset, WatermarkSupplier watermarkSupplier, Consumer<Throwable> errorHandler) {
            this.processing.put(tp.partition(), offset);
            this.watermark = watermarkSupplier.getWatermark();
            if (element != null) {
                return this.observer.onNext(element, this.context(tp, offset, watermarkSupplier, errorHandler));
            }
            return true;
        }

        private LogObserver.OnNextContext context(TopicPartition tp, long offset, WatermarkSupplier watermarkSupplier, Consumer<Throwable> errorHandler) {
            HashMap toCommit = new HashMap(this.processing);
            return ObserverUtils.asOnNextContext((LogObserver.OffsetCommitter & Serializable)(succ, err) -> {
                if (succ) {
                    toCommit.forEach((part, off) -> this.committed.compute(part, (k, v) -> Math.max((Long)MoreObjects.firstNonNull((Object)v, (Object)0L), off + 1L)));
                    this.committed.forEach((p, o) -> this.commit.accept((Object)new TopicPartition(tp.topic(), (int)p), o));
                } else {
                    errorHandler.accept(err);
                }
            }, (Offset)new TopicOffset(tp.partition(), offset, watermarkSupplier.getWatermark()));
        }

        @Override
        public List<TopicOffset> getCurrentOffsets() {
            return TopicOffset.fromMap(this.processing, this.watermark);
        }

        @Override
        public List<TopicOffset> getCommittedOffsets() {
            return TopicOffset.fromMap(this.committed, this.watermark);
        }

        @Override
        LogObserver observer() {
            return this.observer;
        }

        @Override
        public Map<TopicPartition, OffsetAndMetadata> prepareOffsetsForCommit() {
            return (Map)this.prepareCommit.apply();
        }

        @Override
        public void onAssign(KafkaConsumer<K, V> consumer, Collection<TopicOffset> offsets) {
            super.onAssign(consumer, offsets);
            this.observer.onRepartition(ObserverUtils.asRepartitionContext((Collection)offsets.stream().map(TopicOffset::getPartition).collect(Collectors.toList())));
            Utils.seekToOffsets(this.topic, (List)offsets, consumer);
        }

        @Override
        public void onStart() {
            this.onStart.run();
        }

        @Override
        public void onIdle(WatermarkSupplier watermarkSupplier) {
            this.observer.onIdle(ObserverUtils.asOnIdleContext((WatermarkSupplier)watermarkSupplier));
        }
    }

    static final class OnlineConsumer<K, V>
    extends ConsumerBase<K, V> {
        private final LogObserver observer;
        private final OffsetCommitter<TopicPartition> committer;
        private final Factory<Map<TopicPartition, OffsetAndMetadata>> prepareCommit;

        OnlineConsumer(LogObserver observer, OffsetCommitter<TopicPartition> committer, Factory<Map<TopicPartition, OffsetAndMetadata>> prepareCommit) {
            this.observer = observer;
            this.committer = committer;
            this.prepareCommit = prepareCommit;
        }

        @Override
        public boolean consumeWithConfirm(@Nullable StreamElement element, TopicPartition tp, long offset, WatermarkSupplier watermarkSupplier, Consumer<Throwable> errorHandler) {
            this.processing.put(tp.partition(), offset);
            this.watermark = watermarkSupplier.getWatermark();
            if (element != null) {
                return this.observer.onNext(element, ObserverUtils.asOnNextContext((LogObserver.OffsetCommitter & Serializable)(succ, exc) -> {
                    if (succ) {
                        this.committed.compute(tp.partition(), (k, v) -> v == null || v <= offset ? offset + 1L : v);
                        this.committer.confirm(tp, offset);
                    } else {
                        errorHandler.accept(exc);
                    }
                }, (Offset)new TopicOffset(tp.partition(), offset, this.watermark)));
            }
            this.committed.compute(tp.partition(), (k, v) -> v == null || v <= offset ? offset + 1L : v);
            this.committer.confirm(tp, offset);
            return true;
        }

        @Override
        public List<TopicOffset> getCurrentOffsets() {
            return TopicOffset.fromMap(this.processing, this.watermark);
        }

        @Override
        public List<TopicOffset> getCommittedOffsets() {
            return TopicOffset.fromMap(this.committed, this.watermark);
        }

        @Override
        public Map<TopicPartition, OffsetAndMetadata> prepareOffsetsForCommit() {
            return (Map)this.prepareCommit.apply();
        }

        @Override
        LogObserver observer() {
            return this.observer;
        }

        @Override
        public void onAssign(KafkaConsumer<K, V> consumer, Collection<TopicOffset> offsets) {
            super.onAssign(consumer, offsets);
            this.committer.clear();
            this.observer.onRepartition(ObserverUtils.asRepartitionContext((Collection)offsets.stream().map(TopicOffset::getPartition).collect(Collectors.toList())));
        }

        @Override
        public void onStart() {
            this.committer.clear();
        }

        @Override
        public void onIdle(WatermarkSupplier watermarkSupplier) {
            this.observer.onIdle(ObserverUtils.asOnIdleContext((WatermarkSupplier)watermarkSupplier));
        }
    }

    private static abstract class ConsumerBase<K, V>
    implements ElementConsumer<K, V> {
        final Map<Integer, Long> committed = Collections.synchronizedMap(new HashMap());
        final Map<Integer, Long> processing = Collections.synchronizedMap(new HashMap());
        long watermark;

        private ConsumerBase() {
        }

        @Override
        public void onCompleted() {
            this.observer().onCompleted();
        }

        @Override
        public void onCancelled() {
            this.observer().onCancelled();
        }

        @Override
        public boolean onError(Throwable err) {
            return this.observer().onError(err);
        }

        @Override
        public void onAssign(KafkaConsumer<K, V> consumer, Collection<TopicOffset> offsets) {
            this.committed.clear();
            this.committed.putAll(offsets.stream().collect(Collectors.toMap(o -> o.getPartition().getId(), TopicOffset::getOffset)));
        }

        abstract LogObserver observer();
    }
}

