/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.internal.AckSetTracker;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Optional;
import java.util.PriorityQueue;

public class AckSetTrackerImpl
extends TrivialProxyService
implements AckSetTracker {
    private final CloseableMonitor monitor = new CloseableMonitor();
    @GuardedBy(value="monitor.monitor")
    private final Committer committer;
    @GuardedBy(value="monitor.monitor")
    private final Deque<Receipt> receipts = new ArrayDeque<Receipt>();
    @GuardedBy(value="monitor.monitor")
    private final PriorityQueue<Offset> acks = new PriorityQueue();

    public AckSetTrackerImpl(Committer committer) throws ApiException {
        super(committer);
        this.committer = committer;
    }

    @Override
    public Runnable track(SequencedMessage message) throws CheckedApiException {
        Offset messageOffset = message.offset();
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            CheckedApiPreconditions.checkArgument(this.receipts.isEmpty() || this.receipts.peekLast().offset.value() < messageOffset.value());
            Receipt receipt = new Receipt(messageOffset, this);
            this.receipts.addLast(receipt);
            Runnable runnable = receipt::onAck;
            return runnable;
        }
    }

    @Override
    public void waitUntilCommitted() throws CheckedApiException {
        ImmutableList receiptsCopy;
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            receiptsCopy = ImmutableList.copyOf(this.receipts);
        }
        receiptsCopy.forEach(Receipt::clear);
        h = this.monitor.enter();
        var3_2 = null;
        try {
            this.receipts.clear();
            this.acks.clear();
            this.committer.waitUntilEmpty();
        }
        catch (Throwable throwable) {
            var3_2 = throwable;
            throw throwable;
        }
        finally {
            if (h != null) {
                if (var3_2 != null) {
                    try {
                        h.close();
                    }
                    catch (Throwable throwable) {
                        var3_2.addSuppressed(throwable);
                    }
                } else {
                    h.close();
                }
            }
        }
    }

    private void onAck(Offset offset) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.acks.add(offset);
            Optional<Object> prefixAckedOffset = Optional.empty();
            while (!this.receipts.isEmpty() && !this.acks.isEmpty() && this.receipts.peekFirst().offset.value() == this.acks.peek().value()) {
                prefixAckedOffset = Optional.of(this.acks.remove());
                this.receipts.removeFirst();
            }
            if (prefixAckedOffset.isPresent()) {
                ApiFuture<Void> future = this.committer.commitOffset(Offset.of(((Offset)prefixAckedOffset.get()).value() + 1L));
                ExtractStatus.addFailureHandler(future, this::onPermanentError);
            }
        }
    }

    private static class Receipt {
        final Offset offset;
        private final CloseableMonitor m = new CloseableMonitor();
        @GuardedBy(value="m.monitor")
        private boolean wasAcked = false;
        @GuardedBy(value="m.monitor")
        private Optional<AckSetTrackerImpl> tracker;

        Receipt(Offset offset, AckSetTrackerImpl tracker) {
            this.offset = offset;
            this.tracker = Optional.of(tracker);
        }

        void clear() {
            try (CloseableMonitor.Hold h = this.m.enter();){
                this.tracker = Optional.empty();
            }
        }

        void onAck() {
            try (CloseableMonitor.Hold h = this.m.enter();){
                if (!this.tracker.isPresent()) {
                    return;
                }
                if (this.wasAcked) {
                    CheckedApiException e = new CheckedApiException("Duplicate acks are not allowed.", StatusCode.Code.FAILED_PRECONDITION);
                    this.tracker.get().onPermanentError(e);
                    throw e.underlying;
                }
                this.wasAcked = true;
                this.tracker.get().onAck(this.offset);
            }
        }
    }
}

