/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.internal.AckSetTracker;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Preconditions;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Status;
import io.grpc.StatusException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class AckSetTrackerImpl
extends ProxyService
implements AckSetTracker {
    private final CloseableMonitor monitor = new CloseableMonitor();
    @GuardedBy(value="monitor.monitor")
    private final Committer committer;
    @GuardedBy(value="monitor.monitor")
    private final Deque<Offset> receipts = new ArrayDeque<Offset>();
    @GuardedBy(value="monitor.monitor")
    private final PriorityQueue<Offset> acks = new PriorityQueue();

    public AckSetTrackerImpl(Committer committer) throws StatusException {
        this.addServices(committer);
        this.committer = committer;
    }

    @Override
    protected void start() {
    }

    @Override
    protected void stop() {
    }

    @Override
    protected void handlePermanentError(StatusException error) {
    }

    @Override
    public Runnable track(SequencedMessage message) throws StatusException {
        final Offset messageOffset = message.offset();
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            Preconditions.checkArgument(this.receipts.isEmpty() || this.receipts.peekLast().value() < messageOffset.value());
            this.receipts.addLast(messageOffset);
        }
        return new Runnable(){
            private final AtomicBoolean wasAcked = new AtomicBoolean(false);

            @Override
            public void run() {
                if (this.wasAcked.getAndSet(true)) {
                    Status s = Status.FAILED_PRECONDITION.withDescription("Duplicate acks are not allowed.");
                    AckSetTrackerImpl.this.onPermanentError(s.asException());
                    throw s.asRuntimeException();
                }
                AckSetTrackerImpl.this.onAck(messageOffset);
            }
        };
    }

    private void onAck(Offset offset) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.acks.add(offset);
            Optional<Object> prefixAckedOffset = Optional.empty();
            while (!this.receipts.isEmpty() && !this.acks.isEmpty() && this.receipts.peekFirst().value() == this.acks.peek().value()) {
                prefixAckedOffset = Optional.of(this.acks.remove());
                this.receipts.removeFirst();
            }
            if (prefixAckedOffset.isPresent()) {
                ApiFuture<Void> future = this.committer.commitOffset(Offset.of(((Offset)prefixAckedOffset.get()).value() + 1L));
                ExtractStatus.addFailureHandler(future, this::onPermanentError);
            }
        }
    }
}

