/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.kafka;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.ApiServiceUtils;
import com.google.cloud.pubsublite.kafka.ApiFuturesExtensions;
import com.google.cloud.pubsublite.kafka.CommitterFactory;
import com.google.cloud.pubsublite.kafka.KafkaExceptionUtils;
import com.google.cloud.pubsublite.kafka.PullSubscriberFactory;
import com.google.cloud.pubsublite.kafka.RecordTransforms;
import com.google.cloud.pubsublite.kafka.SinglePartitionSubscriber;
import com.google.cloud.pubsublite.kafka.SingleSubscriptionConsumer;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

class SingleSubscriptionConsumerImpl
implements SingleSubscriptionConsumer {
    private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
    private final TopicPath topic;
    private final boolean autocommit;
    private final PullSubscriberFactory subscriberFactory;
    private final CommitterFactory committerFactory;
    private final CloseableMonitor monitor = new CloseableMonitor();
    private final Map<Partition, SinglePartitionSubscriber> partitions = new HashMap<Partition, SinglePartitionSubscriber>();
    private SettableApiFuture<Void> assignmentChanged = SettableApiFuture.create();
    private final SettableApiFuture<Void> wakeupTriggered = SettableApiFuture.create();

    SingleSubscriptionConsumerImpl(TopicPath topic, boolean autocommit, PullSubscriberFactory subscriberFactory, CommitterFactory committerFactory) {
        this.topic = topic;
        this.autocommit = autocommit;
        this.subscriberFactory = subscriberFactory;
        this.committerFactory = committerFactory;
    }

    @Override
    public void setAssignment(Set<Partition> assignment) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            List unassigned = ImmutableSet.copyOf(this.partitions.keySet()).stream().filter(p -> !assignment.contains(p)).map(this.partitions::remove).collect(Collectors.toList());
            ApiServiceUtils.blockingShutdown(unassigned);
            assignment.stream().filter(p -> !this.partitions.containsKey(p)).forEach(ExtractStatus.rethrowAsRuntime(partition -> {
                SinglePartitionSubscriber subscriber = new SinglePartitionSubscriber(this.subscriberFactory, (Partition)partition, SeekRequest.newBuilder().setNamedTarget(SeekRequest.NamedTarget.COMMITTED_CURSOR).build(), this.committerFactory.newCommitter((Partition)partition), this.autocommit);
                subscriber.startAsync().awaitRunning();
                this.partitions.put((Partition)partition, subscriber);
            }));
            this.assignmentChanged.set(null);
            this.assignmentChanged = SettableApiFuture.create();
        }
        catch (Throwable t) {
            throw ExtractStatus.toCanonical((Throwable)t).underlying;
        }
    }

    @Override
    public Set<Partition> assignment() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            Set<Partition> set = this.partitions.keySet();
            return set;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Map<Partition, Queue<SequencedMessage>> doPoll(Duration duration) {
        try {
            ImmutableList.Builder stopSleepingSignals = ImmutableList.builder();
            try (CloseableMonitor.Hold h = this.monitor.enter();){
                stopSleepingSignals.add(this.wakeupTriggered);
                stopSleepingSignals.add(this.assignmentChanged);
                this.partitions.values().forEach(subscriber -> stopSleepingSignals.add(subscriber.onData()));
            }
            try {
                ApiFuturesExtensions.whenFirstDone(stopSleepingSignals.build()).get(duration.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                return ImmutableMap.of();
            }
            h = this.monitor.enter();
            var4_6 = null;
            try {
                if (this.wakeupTriggered.isDone()) {
                    throw new WakeupException();
                }
                HashMap<Partition, Queue<SequencedMessage>> partitionQueues = new HashMap<Partition, Queue<SequencedMessage>>();
                this.partitions.forEach(ExtractStatus.rethrowAsRuntime((partition, subscriber) -> {
                    Queue cfr_ignored_0 = partitionQueues.put((Partition)partition, (Queue<SequencedMessage>)subscriber.getMessages());
                }));
                HashMap<Partition, Queue<SequencedMessage>> hashMap = partitionQueues;
                return hashMap;
            }
            catch (Throwable throwable) {
                var4_6 = throwable;
                throw throwable;
            }
            finally {
                if (h != null) {
                    if (var4_6 != null) {
                        try {
                            h.close();
                        }
                        catch (Throwable throwable) {
                            var4_6.addSuppressed(throwable);
                        }
                    } else {
                        h.close();
                    }
                }
            }
        }
        catch (Throwable t) {
            throw KafkaExceptionUtils.toKafka(t);
        }
    }

    @Override
    public ConsumerRecords<byte[], byte[]> poll(Duration duration) {
        if (this.autocommit) {
            ApiFuture<Map<Partition, Offset>> future = this.commitAll();
            ApiFutures.addCallback(future, (ApiFutureCallback)new ApiFutureCallback<Object>(){

                public void onFailure(Throwable throwable) {
                    ((GoogleLogger.Api)((GoogleLogger.Api)logger.atWarning()).withCause(throwable)).log("Failed to commit offsets.");
                }

                public void onSuccess(Object result) {
                }
            }, (Executor)MoreExecutors.directExecutor());
        }
        Map<Partition, Queue<SequencedMessage>> partitionQueues = this.doPoll(duration);
        HashMap records = new HashMap();
        partitionQueues.forEach((partition, queue) -> {
            if (queue.isEmpty()) {
                return;
            }
            List partitionRecords = queue.stream().map(message -> RecordTransforms.fromMessage(message, this.topic, partition)).collect(Collectors.toList());
            records.put(new TopicPartition(this.topic.toString(), (int)partition.value()), partitionRecords);
        });
        return new ConsumerRecords(records);
    }

    @Override
    public ApiFuture<Map<Partition, Offset>> commitAll() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            ArrayList commitFutures = new ArrayList();
            this.partitions.forEach((partition, subscriber) -> {
                Optional<ApiFuture<Offset>> commitFuture = subscriber.autoCommit();
                if (!commitFuture.isPresent()) {
                    return;
                }
                commitFutures.add(ApiFutures.transform(commitFuture.get(), offset -> new AbstractMap.SimpleEntry<Partition, Offset>((Partition)partition, (Offset)offset), (Executor)MoreExecutors.directExecutor()));
            });
            ApiFuture apiFuture = ApiFutures.transform((ApiFuture)ApiFutures.allAsList(commitFutures), results -> ImmutableMap.copyOf(results.stream().collect(Collectors.toMap(e -> (Partition)e.getKey(), e -> (Offset)e.getValue()))), (Executor)MoreExecutors.directExecutor());
            return apiFuture;
        }
    }

    @Override
    public ApiFuture<Void> commit(Map<Partition, Offset> commitOffsets) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            ImmutableList.Builder commitFutures = ImmutableList.builder();
            commitOffsets.forEach((partition, offset) -> {
                if (!this.partitions.containsKey(partition)) {
                    throw new CommitFailedException("Tried to commit to partition " + partition.value() + " which is not assigned to this consumer.");
                }
                commitFutures.add(this.partitions.get(partition).commitOffset((Offset)offset));
            });
            ApiFuture apiFuture = ApiFutures.transform((ApiFuture)ApiFutures.allAsList((Iterable)commitFutures.build()), ignored -> null, (Executor)MoreExecutors.directExecutor());
            return apiFuture;
        }
    }

    @Override
    public void doSeek(Partition partition, SeekRequest request) throws KafkaException {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (!this.partitions.containsKey(partition)) {
                throw new IllegalStateException("Received seek for partition " + partition.value() + " which is not assigned to this consumer.");
            }
            this.partitions.get(partition).clientSeek(request);
        }
        catch (IllegalStateException e) {
            throw e;
        }
        catch (Throwable t) {
            throw KafkaExceptionUtils.toKafka(t);
        }
    }

    @Override
    public Optional<Long> position(Partition partition) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (!this.partitions.containsKey(partition)) {
                Optional<Long> optional = Optional.empty();
                return optional;
            }
            Optional<Long> optional = this.partitions.get(partition).position();
            return optional;
        }
    }

    @Override
    public void close(Duration duration) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            ApiServiceUtils.blockingShutdown(this.partitions.values());
        }
        catch (Throwable t) {
            throw KafkaExceptionUtils.toKafka(t);
        }
    }

    @Override
    public void wakeup() {
        this.wakeupTriggered.set(null);
    }
}

