/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.kafka;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.wire.Assigner;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.PartitionAssignmentReceiver;
import com.google.cloud.pubsublite.kafka.ConsumerFactory;
import com.google.cloud.pubsublite.kafka.KafkaExceptionUtils;
import com.google.cloud.pubsublite.kafka.SharedBehavior;
import com.google.cloud.pubsublite.kafka.SingleSubscriptionConsumer;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.protobuf.util.Timestamps;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;

class PubsubLiteConsumer
implements Consumer<byte[], byte[]> {
    private static final Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
    private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
    private final SubscriptionPath subscriptionPath;
    private final TopicPath topicPath;
    private final SharedBehavior shared;
    private final ConsumerFactory consumerFactory;
    private final AssignerFactory assignerFactory;
    private final CursorClient cursorClient;
    private final TopicStatsClient topicStatsClient;
    private final List<AutoCloseable> toClose;
    private Optional<Assigner> assigner = Optional.empty();
    private Optional<SingleSubscriptionConsumer> consumer = Optional.empty();

    PubsubLiteConsumer(SubscriptionPath subscriptionPath, TopicPath topicPath, SharedBehavior shared, ConsumerFactory consumerFactory, AssignerFactory assignerFactory, CursorClient cursorClient, TopicStatsClient topicStatsClient, AutoCloseable ... resources) {
        this.subscriptionPath = subscriptionPath;
        this.topicPath = topicPath;
        this.shared = shared;
        this.consumerFactory = consumerFactory;
        this.assignerFactory = assignerFactory;
        this.cursorClient = cursorClient;
        this.topicStatsClient = topicStatsClient;
        this.toClose = ImmutableList.builder().add((Object[])resources).add((Object)cursorClient).add((Object)topicStatsClient).add((Object)shared).build();
    }

    private TopicPartition toTopicPartition(Partition partition) {
        return new TopicPartition(this.topicPath.toString(), (int)partition.value());
    }

    private SingleSubscriptionConsumer requireValidConsumer() {
        if (!this.consumer.isPresent()) {
            throw new IllegalStateException("Neither subscribe nor assign has been called.");
        }
        return this.consumer.get();
    }

    public Uuid clientInstanceId(Duration timeout) {
        throw new IllegalStateException("Pub/Sub Lite Kafka Connector does not support telemetry");
    }

    public Set<TopicPartition> assignment() {
        return this.requireValidConsumer().assignment().stream().map(this::toTopicPartition).collect(Collectors.toSet());
    }

    public Set<String> subscription() {
        if (this.consumer.isPresent()) {
            return ImmutableSet.of((Object)this.topicPath.toString());
        }
        return ImmutableSet.of();
    }

    public void subscribe(Pattern pattern) {
        this.subscribe(pattern, (ConsumerRebalanceListener)new NoOpRebalanceListener());
    }

    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
        throw new UnsupportedOperationException("Pattern assignment is not available for Pub/Sub Lite.");
    }

    private void checkTopic(String topic) {
        try {
            TopicPath path = TopicPath.parse((String)topic);
            if (!path.equals(this.topicPath)) {
                throw new UnsupportedOperationException("Pub/Sub Lite consumers may only interact with the one topic they are configured for.");
            }
        }
        catch (ApiException e) {
            throw KafkaExceptionUtils.toKafka(e);
        }
    }

    private Partition checkTopicGetPartition(TopicPartition topicPartition) {
        this.checkTopic(topicPartition.topic());
        try {
            return Partition.of((long)topicPartition.partition());
        }
        catch (ApiException e) {
            throw KafkaExceptionUtils.toKafka(e);
        }
    }

    private PartitionAssignmentReceiver newAssignmentReceiver(ConsumerRebalanceListener listener) {
        AtomicReference<ImmutableSet> lastPartitions = new AtomicReference<ImmutableSet>(ImmutableSet.of());
        return newAssignment -> {
            Set previousAssignment = (Set)lastPartitions.get();
            HashSet removed = new HashSet(previousAssignment);
            removed.removeAll(newAssignment);
            HashSet added = new HashSet(newAssignment);
            added.removeAll(previousAssignment);
            if (!removed.isEmpty()) {
                listener.onPartitionsLost((Collection)removed.stream().map(this::toTopicPartition).collect(Collectors.toSet()));
            }
            if (!added.isEmpty()) {
                listener.onPartitionsAssigned((Collection)added.stream().map(this::toTopicPartition).collect(Collectors.toSet()));
            }
            this.consumer.get().setAssignment(newAssignment);
            lastPartitions.set((ImmutableSet)newAssignment);
        };
    }

    public void subscribe(Collection<String> collection) {
        this.subscribe(collection, (ConsumerRebalanceListener)new NoOpRebalanceListener());
    }

    public void subscribe(Collection<String> collection, ConsumerRebalanceListener consumerRebalanceListener) {
        if (collection.size() != 1) {
            throw new UnsupportedOperationException("Subscribing to multiple topics is not available for Pub/Sub Lite.");
        }
        this.checkTopic(collection.iterator().next());
        if (this.consumer.isPresent()) {
            if (this.assigner.isPresent()) {
                return;
            }
            throw new IllegalStateException("Called subscribe after calling assign.");
        }
        this.consumer = Optional.of(this.consumerFactory.newConsumer());
        try {
            this.assigner = Optional.of(this.assignerFactory.New(this.newAssignmentReceiver(consumerRebalanceListener)));
            this.assigner.get().startAsync().awaitRunning();
        }
        catch (ApiException e) {
            throw KafkaExceptionUtils.toKafka(e);
        }
    }

    public void assign(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            this.unsubscribe();
            return;
        }
        if (this.assigner.isPresent()) {
            throw new IllegalStateException("Called assign after calling subscribe.");
        }
        Set<Partition> partitions = collection.stream().map(this::checkTopicGetPartition).collect(Collectors.toSet());
        if (!this.consumer.isPresent()) {
            this.consumer = Optional.of(this.consumerFactory.newConsumer());
        }
        this.consumer.get().setAssignment(partitions);
    }

    public void unsubscribe() {
        this.assigner.ifPresent(instance -> instance.stopAsync().awaitTerminated());
        this.assigner = Optional.empty();
        this.consumer.ifPresent(instance -> instance.close(INFINITE_DURATION));
        this.consumer = Optional.empty();
    }

    public ConsumerRecords<byte[], byte[]> poll(long l) {
        return this.poll(Duration.ofMillis(l));
    }

    public ConsumerRecords<byte[], byte[]> poll(Duration timeout) {
        return this.requireValidConsumer().poll(timeout);
    }

    Map<Partition, Offset> checkAndTransformOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
        ImmutableMap.Builder output = ImmutableMap.builder();
        try {
            map.forEach((topicPartition, offsetAndMetadata) -> output.put((Object)this.checkTopicGetPartition((TopicPartition)topicPartition), (Object)Offset.of((long)offsetAndMetadata.offset())));
        }
        catch (Throwable t) {
            throw KafkaExceptionUtils.toKafka(t);
        }
        return output.build();
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
        this.commitSync(map, INFINITE_DURATION);
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map, Duration duration) {
        try {
            this.requireValidConsumer().commit(this.checkAndTransformOffsets(map)).get(duration.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (Throwable t) {
            throw KafkaExceptionUtils.toKafka(t);
        }
    }

    public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> map, final OffsetCommitCallback offsetCommitCallback) {
        ApiFutures.addCallback(this.requireValidConsumer().commit(this.checkAndTransformOffsets(map)), (ApiFutureCallback)new ApiFutureCallback<Void>(){

            public void onFailure(Throwable throwable) {
                offsetCommitCallback.onComplete(null, (Exception)KafkaExceptionUtils.toKafka(throwable));
            }

            public void onSuccess(Void result) {
                offsetCommitCallback.onComplete(map, null);
            }
        });
    }

    public void commitSync() {
        this.commitSync(INFINITE_DURATION);
    }

    public void commitSync(Duration duration) {
        try {
            this.requireValidConsumer().commitAll().get(duration.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (Throwable t) {
            throw KafkaExceptionUtils.toKafka(t);
        }
    }

    public void commitAsync(final OffsetCommitCallback offsetCommitCallback) {
        ApiFutures.addCallback(this.requireValidConsumer().commitAll(), (ApiFutureCallback)new ApiFutureCallback<Map<Partition, Offset>>(){

            public void onFailure(Throwable throwable) {
                offsetCommitCallback.onComplete(null, (Exception)KafkaExceptionUtils.toKafka(throwable));
            }

            public void onSuccess(Map<Partition, Offset> map) {
                ImmutableMap.Builder result = ImmutableMap.builder();
                map.forEach((partition, offset) -> result.put((Object)PubsubLiteConsumer.this.toTopicPartition(partition), (Object)new OffsetAndMetadata(offset.value())));
                offsetCommitCallback.onComplete((Map)result.build(), null);
            }
        });
    }

    public void commitAsync() {
        this.commitAsync((map, e) -> {
            if (e != null) {
                ((GoogleLogger.Api)((GoogleLogger.Api)logger.atWarning()).withCause((Throwable)e)).log("Failed to commit offsets.");
            }
        });
    }

    public void seek(TopicPartition topicPartition, long l) {
        Partition partition = this.checkTopicGetPartition(topicPartition);
        this.requireValidConsumer().doSeek(partition, SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(l)).build());
    }

    public void seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        this.seek(topicPartition, offsetAndMetadata.offset());
    }

    public void seekToBeginning(Collection<TopicPartition> collection) {
        collection.forEach(topicPartition -> this.requireValidConsumer().doSeek(this.checkTopicGetPartition((TopicPartition)topicPartition), SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(0L).build()).build()));
    }

    public void seekToEnd(Collection<TopicPartition> collection) {
        collection.forEach(topicPartition -> this.requireValidConsumer().doSeek(this.checkTopicGetPartition((TopicPartition)topicPartition), SeekRequest.newBuilder().setNamedTarget(SeekRequest.NamedTarget.HEAD).build()));
    }

    public long position(TopicPartition topicPartition) {
        return this.position(topicPartition, INFINITE_DURATION);
    }

    public long position(TopicPartition partition, Duration timeout) {
        Partition litePartition = this.checkTopicGetPartition(partition);
        Optional<Long> consumerPosition = this.requireValidConsumer().position(litePartition);
        if (consumerPosition.isPresent()) {
            return consumerPosition.get();
        }
        return this.committed(partition, timeout).offset();
    }

    public OffsetAndMetadata committed(TopicPartition topicPartition) {
        return this.committed(topicPartition, INFINITE_DURATION);
    }

    public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
        return this.committed((Set<TopicPartition>)ImmutableSet.of((Object)topicPartition), duration).get(topicPartition);
    }

    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) {
        return this.committed(set, INFINITE_DURATION);
    }

    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration timeout) {
        Set<Partition> targets = partitions.stream().map(this::checkTopicGetPartition).collect(Collectors.toSet());
        try {
            Map full_map = (Map)this.cursorClient.listPartitionCursors(this.subscriptionPath).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
            ImmutableMap.Builder output = ImmutableMap.builder();
            targets.forEach(partition -> output.put((Object)this.toTopicPartition((Partition)partition), (Object)new OffsetAndMetadata(full_map.getOrDefault(partition, Offset.of((long)0L)).value())));
            return output.build();
        }
        catch (Throwable t) {
            throw KafkaExceptionUtils.toKafka(t);
        }
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return ImmutableMap.of();
    }

    public List<PartitionInfo> partitionsFor(String s) {
        return this.partitionsFor(s, INFINITE_DURATION);
    }

    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
        this.checkTopic(topic);
        return this.shared.partitionsFor(this.topicPath, timeout);
    }

    public Map<String, List<PartitionInfo>> listTopics() {
        return this.listTopics(INFINITE_DURATION);
    }

    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
        return ImmutableMap.of((Object)this.topicPath.toString(), this.partitionsFor(this.topicPath.toString(), timeout));
    }

    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map) {
        return this.offsetsForTimes(map, INFINITE_DURATION);
    }

    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Duration duration) {
        try {
            Map<TopicPartition, ApiFuture> cursors = map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> this.topicStatsClient.computeCursorForEventTime(this.topicPath, this.checkTopicGetPartition((TopicPartition)entry.getKey()), Timestamps.fromMillis((long)((Long)entry.getValue())))));
            ApiFutures.allAsList(cursors.values()).get(duration.toMillis(), TimeUnit.MILLISECONDS);
            HashMap<TopicPartition, OffsetAndTimestamp> output = new HashMap<TopicPartition, OffsetAndTimestamp>();
            for (Map.Entry<TopicPartition, ApiFuture> entry2 : cursors.entrySet()) {
                OffsetAndTimestamp offsetAndTime = null;
                Optional cursor = (Optional)entry2.getValue().get();
                if (cursor.isPresent()) {
                    offsetAndTime = new OffsetAndTimestamp(((Cursor)cursor.get()).getOffset(), ((Long)Preconditions.checkNotNull((Object)map.get(entry2.getKey()))).longValue());
                }
                output.put(entry2.getKey(), offsetAndTime);
            }
            return output;
        }
        catch (Throwable t) {
            throw KafkaExceptionUtils.toKafka(t);
        }
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection) {
        return this.beginningOffsets(collection, INFINITE_DURATION);
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Duration duration) {
        ImmutableMap.Builder results = ImmutableMap.builder();
        collection.forEach(topicPartition -> {
            this.checkTopic(topicPartition.topic());
            results.put(topicPartition, (Object)0L);
        });
        return results.build();
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection) {
        return this.endOffsets(collection, INFINITE_DURATION);
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Duration duration) {
        try {
            Map<TopicPartition, ApiFuture> cursors = collection.stream().collect(Collectors.toMap(topicPartition -> topicPartition, topicPartition -> this.topicStatsClient.computeHeadCursor(this.topicPath, this.checkTopicGetPartition((TopicPartition)topicPartition))));
            ApiFutures.allAsList(cursors.values()).get(duration.toMillis(), TimeUnit.MILLISECONDS);
            ImmutableMap.Builder output = ImmutableMap.builder();
            for (Map.Entry<TopicPartition, ApiFuture> entry : cursors.entrySet()) {
                output.put((Object)entry.getKey(), (Object)((Cursor)entry.getValue().get()).getOffset());
            }
            return output.build();
        }
        catch (Throwable t) {
            throw KafkaExceptionUtils.toKafka(t);
        }
    }

    public void close() {
        this.close(INFINITE_DURATION);
    }

    public void close(Duration timeout) {
        this.unsubscribe();
        for (AutoCloseable closeable : this.toClose) {
            try {
                closeable.close();
            }
            catch (Exception e) {
                ((GoogleLogger.Api)((GoogleLogger.Api)logger.atSevere()).withCause((Throwable)e)).log("Error closing %s during Consumer shutdown.", (Object)closeable.getClass().getSimpleName());
            }
        }
    }

    public ConsumerGroupMetadata groupMetadata() {
        return new ConsumerGroupMetadata(this.subscriptionPath.toString());
    }

    public Set<TopicPartition> paused() {
        return ImmutableSet.of();
    }

    public void pause(Collection<TopicPartition> collection) {
        ((GoogleLogger.Api)logger.atWarning()).log("Calling pause on a Pub/Sub Lite Consumer is a no-op. Configure the amount of outstanding bytes and messages instead.");
    }

    public void resume(Collection<TopicPartition> collection) {
        ((GoogleLogger.Api)logger.atWarning()).log("Calling resume on a Pub/Sub Lite Consumer is a no-op. Configure the amount of outstanding bytes and messages instead.");
    }

    public void enforceRebalance() {
        ((GoogleLogger.Api)logger.atWarning()).log("Calling enforceRebalance on a Pub/Sub Lite Consumer is a no-op.");
    }

    public void enforceRebalance(String reason) {
        ((GoogleLogger.Api)logger.atWarning()).log("Calling enforceRebalance on a Pub/Sub Lite Consumer is a no-op.");
    }

    public OptionalLong currentLag(TopicPartition topicPartition) {
        ((GoogleLogger.Api)logger.atWarning()).log("Calling currentLag on a Pub/Sub Lite Consumer always returns empty.");
        return OptionalLong.empty();
    }

    public void wakeup() {
        this.requireValidConsumer().wakeup();
    }

    private static class NoOpRebalanceListener
    implements ConsumerRebalanceListener {
        private NoOpRebalanceListener() {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        }
    }
}

