/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import io.streamnative.pulsar.handlers.kop.MessageFetchContext;
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
import io.streamnative.pulsar.handlers.kop.storage.ReplicaManager;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DelayedFetch
extends DelayedOperation {
    private static final Logger log = LoggerFactory.getLogger(DelayedFetch.class);
    private final CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> callback;
    private final ReplicaManager replicaManager;
    private final long bytesReadable;
    private final int fetchMaxBytes;
    private final boolean readCommitted;
    private final Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo;
    private final Map<TopicPartition, PartitionLog.ReadRecordsResult> readRecordsResult;
    private final MessageFetchContext context;
    protected volatile Boolean hasError;
    protected volatile Boolean produceHappened;
    protected volatile int maxReadEntriesNum;
    protected static final AtomicReferenceFieldUpdater<DelayedFetch, Boolean> HAS_ERROR_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DelayedFetch.class, Boolean.class, "hasError");
    protected static final AtomicReferenceFieldUpdater<DelayedFetch, Boolean> IS_PRODUCE_HAPPENED_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DelayedFetch.class, Boolean.class, "produceHappened");
    protected static final AtomicIntegerFieldUpdater<DelayedFetch> MAX_READ_ENTRIES_NUM_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DelayedFetch.class, "maxReadEntriesNum");

    public DelayedFetch(long delayMs, int fetchMaxBytes, long bytesReadable, boolean readCommitted, MessageFetchContext context, ReplicaManager replicaManager, Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo, Map<TopicPartition, PartitionLog.ReadRecordsResult> readRecordsResult, CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> callback) {
        super(delayMs, Optional.empty());
        this.readCommitted = readCommitted;
        this.context = context;
        this.callback = callback;
        this.readRecordsResult = readRecordsResult;
        this.readPartitionInfo = readPartitionInfo;
        this.replicaManager = replicaManager;
        this.bytesReadable = bytesReadable;
        this.fetchMaxBytes = fetchMaxBytes;
        this.maxReadEntriesNum = context.getMaxReadEntriesNum();
        this.hasError = false;
        this.produceHappened = false;
    }

    @Override
    public void onExpiration() {
        if (log.isDebugEnabled()) {
            log.debug("Delayed fetch on expiration triggered.");
        }
    }

    @Override
    public void onComplete() {
        if (this.callback.isDone()) {
            return;
        }
        if (HAS_ERROR_UPDATER.get(this).booleanValue() || !IS_PRODUCE_HAPPENED_UPDATER.get(this).booleanValue()) {
            this.callback.complete(this.readRecordsResult);
            return;
        }
        ((CompletableFuture)this.replicaManager.readFromLocalLog(this.readCommitted, this.fetchMaxBytes, this.maxReadEntriesNum, this.readPartitionInfo, this.context).thenAccept(readRecordsResult -> {
            this.context.getStatsLogger().getWaitingFetchesTriggered().add(1L);
            this.callback.complete((Map<TopicPartition, PartitionLog.ReadRecordsResult>)readRecordsResult);
        })).thenAccept(__ -> this.readRecordsResult.forEach((ignore, result) -> result.recycle()));
    }

    @Override
    public boolean tryComplete() {
        if (this.callback.isDone()) {
            return true;
        }
        for (Map.Entry<TopicPartition, PartitionLog.ReadRecordsResult> entry : this.readRecordsResult.entrySet()) {
            TopicPartition tp = entry.getKey();
            PartitionLog.ReadRecordsResult result = entry.getValue();
            PartitionLog partitionLog = this.replicaManager.getPartitionLog(tp, this.context.getNamespacePrefix());
            PositionImpl currLastPosition = (PositionImpl)partitionLog.getLastPosition(this.context.getTopicManager());
            if (currLastPosition.compareTo(PositionImpl.EARLIEST) == 0) {
                HAS_ERROR_UPDATER.set(this, true);
                return this.forceComplete();
            }
            PositionImpl lastPosition = (PositionImpl)result.lastPosition();
            if (currLastPosition.compareTo(lastPosition) <= 0) continue;
            int diffBytes = (int)((long)this.fetchMaxBytes - this.bytesReadable);
            if (diffBytes != this.fetchMaxBytes) {
                int adjustedMaxReadEntriesNum = diffBytes / this.fetchMaxBytes * this.maxReadEntriesNum * 2 + this.maxReadEntriesNum;
                if (log.isDebugEnabled()) {
                    log.debug("The fetch max bytes is {}, byte readable is {}, try to adjust the max read entries num to: {}.", new Object[]{this.fetchMaxBytes, this.bytesReadable, adjustedMaxReadEntriesNum});
                }
                MAX_READ_ENTRIES_NUM_UPDATER.set(this, adjustedMaxReadEntriesNum);
            }
            return IS_PRODUCE_HAPPENED_UPDATER.compareAndSet(this, false, true) && this.forceComplete();
        }
        return false;
    }
}

