/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.kafka;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.kafka.PullSubscriberFactory;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.concurrent.Executor;

class SinglePartitionSubscriber
extends ProxyService {
    private final PullSubscriberFactory subscriberFactory;
    private final Partition partition;
    private final Committer committer;
    private final boolean enableReset;
    private final CloseableMonitor monitor = new CloseableMonitor();
    private BlockingPullSubscriber subscriber;
    private boolean needsCommitting = false;
    private Optional<Offset> lastReceived = Optional.empty();

    SinglePartitionSubscriber(PullSubscriberFactory subscriberFactory, Partition partition, SeekRequest initialSeek, Committer committer, boolean enableReset) throws CheckedApiException {
        super(new ApiService[0]);
        this.subscriberFactory = subscriberFactory;
        this.partition = partition;
        this.committer = committer;
        this.enableReset = enableReset;
        this.subscriber = subscriberFactory.newPullSubscriber(partition, initialSeek, this::onSubscriberReset);
        this.addServices(new ApiService[]{committer});
    }

    protected void start() {
    }

    protected void stop() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.subscriber.close();
        }
    }

    protected void handlePermanentError(CheckedApiException error) {
        this.stop();
    }

    void clientSeek(SeekRequest request) throws CheckedApiException {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.subscriber.close();
            this.subscriber = this.subscriberFactory.newPullSubscriber(this.partition, request, this::onSubscriberReset);
        }
    }

    ApiFuture<Void> onData() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            ApiFuture apiFuture = this.subscriber.onData();
            return apiFuture;
        }
    }

    private ArrayDeque<SequencedMessage> pullMessages() throws CheckedApiException {
        ArrayDeque<SequencedMessage> messages = new ArrayDeque<SequencedMessage>();
        Optional message = this.subscriber.messageIfAvailable();
        while (message.isPresent()) {
            messages.add((SequencedMessage)message.get());
            message = this.subscriber.messageIfAvailable();
        }
        return messages;
    }

    ArrayDeque<SequencedMessage> getMessages() throws CheckedApiException {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            ArrayDeque<SequencedMessage> messages = this.pullMessages();
            if (!messages.isEmpty()) {
                this.lastReceived = Optional.of(Offset.of((long)((SequencedMessage)Iterables.getLast(messages)).getCursor().getOffset()));
                this.needsCommitting = true;
            }
            ArrayDeque<SequencedMessage> arrayDeque = messages;
            return arrayDeque;
        }
    }

    Optional<Long> position() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            Optional<Long> optional = this.lastReceived.map(lastReceived -> lastReceived.value() + 1L);
            return optional;
        }
    }

    ApiFuture<Void> commitOffset(Offset offset) {
        return this.committer.commitOffset(offset);
    }

    Optional<ApiFuture<Offset>> autoCommit() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (!this.needsCommitting) {
                Optional<ApiFuture<Offset>> optional = Optional.empty();
                return optional;
            }
            Preconditions.checkState((boolean)this.lastReceived.isPresent());
            this.needsCommitting = false;
            Offset toCommit = Offset.of((long)(this.lastReceived.get().value() + 1L));
            Optional<ApiFuture<Offset>> optional = Optional.of(ApiFutures.transform((ApiFuture)this.committer.commitOffset(toCommit), ignored -> toCommit, (Executor)MoreExecutors.directExecutor()));
            return optional;
        }
    }

    private boolean onSubscriberReset() throws CheckedApiException {
        if (!this.enableReset) {
            return false;
        }
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.pullMessages();
            this.needsCommitting = false;
        }
        this.committer.waitUntilEmpty();
        return true;
    }
}

