/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.ClientContext;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.ReadOpBase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.ByteBufList;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchedReadOp
extends ReadOpBase
implements BookkeeperInternalCallbacks.BatchedReadEntryCallback {
    private static final Logger LOG = LoggerFactory.getLogger(BatchedReadOp.class);
    final int maxCount;
    final long maxSize;
    BatchedLedgerEntryRequest request;

    BatchedReadOp(LedgerHandle lh, ClientContext clientCtx, long startEntryId, int maxCount, long maxSize, boolean isRecoveryRead) {
        super(lh, clientCtx, startEntryId, -1L, isRecoveryRead);
        this.maxCount = maxCount;
        this.maxSize = maxSize;
    }

    @Override
    void initiate() {
        this.requestTimeNanos = MathUtils.nowInNano();
        List<BookieId> ensemble = this.getLedgerMetadata().getEnsembleAt(this.startEntryId);
        this.request = new SequenceReadRequest(ensemble, this.lh.ledgerId, this.startEntryId, this.maxCount, this.maxSize);
        this.request.read();
        if (this.clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) {
            this.speculativeTask = this.clientCtx.getConf().readSpeculativeRequestPolicy.get().initiateSpeculativeRequest(this.clientCtx.getScheduler(), this.request);
        }
    }

    @Override
    protected void submitCallback(int code) {
        if (!this.complete.compareAndSet(false, true)) {
            return;
        }
        this.cancelSpeculativeTask(true);
        long latencyNanos = MathUtils.elapsedNanos(this.requestTimeNanos);
        if (code != 0) {
            LOG.error("Read of ledger entry failed: L{} E{}-E{}, Sent to {}, Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", new Object[]{this.lh.getId(), this.startEntryId, this.endEntryId, this.sentToHosts, this.heardFromHosts, this.heardFromHostsBitSet, BKException.getMessage(code), this.startEntryId, code});
            this.clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
            this.request.close();
            this.future.completeExceptionally(BKException.create(code));
        } else {
            this.clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
            this.future.complete(LedgerEntriesImpl.create(this.request.entries));
        }
    }

    @Override
    public void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBufList bufList, Object ctx) {
        ReadOpBase.ReadContext rctx = (ReadOpBase.ReadContext)ctx;
        BatchedLedgerEntryRequest entry = (BatchedLedgerEntryRequest)rctx.entry;
        if (rc != 0) {
            entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc);
            return;
        }
        this.heardFromHosts.add(rctx.to);
        this.heardFromHostsBitSet.set(rctx.bookieIndex, true);
        bufList.retain();
        if (entry.complete(rctx.bookieIndex, rctx.to, bufList)) {
            if (!this.isRecoveryRead) {
                this.lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
            }
            this.submitCallback(0);
        } else {
            bufList.release();
        }
    }

    void sendReadTo(int bookieIndex, BookieId to, BatchedLedgerEntryRequest entry) throws InterruptedException {
        if (this.lh.throttler != null) {
            this.lh.throttler.acquire();
        }
        if (this.isRecoveryRead) {
            int flags = 5;
            this.clientCtx.getBookieClient().batchReadEntries(to, this.lh.ledgerId, entry.eId, this.maxCount, this.maxSize, this, new ReadOpBase.ReadContext(bookieIndex, to, entry), flags, this.lh.ledgerKey);
        } else {
            this.clientCtx.getBookieClient().batchReadEntries(to, this.lh.ledgerId, entry.eId, this.maxCount, this.maxSize, this, new ReadOpBase.ReadContext(bookieIndex, to, entry), 0);
        }
    }

    class SequenceReadRequest
    extends BatchedLedgerEntryRequest {
        static final int NOT_FOUND = -1;
        int nextReplicaIndexToReadFrom;
        final BitSet sentReplicas;
        final BitSet erroredReplicas;

        SequenceReadRequest(List<BookieId> ensemble, long lId, long eId, int maxCount, long maxSize) {
            super(ensemble, lId, eId, maxCount, maxSize);
            this.nextReplicaIndexToReadFrom = 0;
            this.sentReplicas = new BitSet(BatchedReadOp.this.lh.getLedgerMetadata().getWriteQuorumSize());
            this.erroredReplicas = new BitSet(BatchedReadOp.this.lh.getLedgerMetadata().getWriteQuorumSize());
        }

        @Override
        void read() {
            this.sendNextRead();
        }

        private synchronized int getNextReplicaIndexToReadFrom() {
            return this.nextReplicaIndexToReadFrom;
        }

        private BitSet getSentToBitSet() {
            BitSet b = new BitSet(this.ensemble.size());
            for (int i = 0; i < this.sentReplicas.length(); ++i) {
                if (!this.sentReplicas.get(i)) continue;
                b.set(this.writeSet.get(i));
            }
            return b;
        }

        private boolean readsOutstanding() {
            return this.sentReplicas.cardinality() - this.erroredReplicas.cardinality() > 0;
        }

        @Override
        synchronized BookieId maybeSendSpeculativeRead(BitSet heardFrom) {
            if (this.nextReplicaIndexToReadFrom >= BatchedReadOp.this.getLedgerMetadata().getWriteQuorumSize()) {
                return null;
            }
            BitSet sentTo = this.getSentToBitSet();
            sentTo.and(heardFrom);
            if (sentTo.cardinality() == 0) {
                BatchedReadOp.this.clientCtx.getClientStats().getSpeculativeReadCounter().inc();
                return this.sendNextRead();
            }
            return null;
        }

        synchronized BookieId sendNextRead() {
            if (this.nextReplicaIndexToReadFrom >= BatchedReadOp.this.getLedgerMetadata().getWriteQuorumSize()) {
                this.fail(this.firstError);
                return null;
            }
            int replica = this.nextReplicaIndexToReadFrom;
            int bookieIndex = this.writeSet.get(this.nextReplicaIndexToReadFrom);
            ++this.nextReplicaIndexToReadFrom;
            try {
                BookieId to = (BookieId)this.ensemble.get(bookieIndex);
                BatchedReadOp.this.sendReadTo(bookieIndex, to, this);
                BatchedReadOp.this.sentToHosts.add(to);
                this.sentReplicas.set(replica);
                return to;
            }
            catch (InterruptedException ie) {
                LOG.error("Interrupted reading entry " + this, (Throwable)ie);
                Thread.currentThread().interrupt();
                this.fail(-15);
                return null;
            }
        }

        @Override
        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) {
            super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
            int replica = this.writeSet.indexOf(bookieIndex);
            if (replica == -1) {
                LOG.error("Received error from a host which is not in the ensemble {} {}.", (Object)host, (Object)this.ensemble);
                return;
            }
            this.erroredReplicas.set(replica);
            if (BatchedReadOp.this.isRecoveryRead && this.numBookiesMissingEntry >= BatchedReadOp.this.requiredBookiesMissingEntryForRecovery) {
                this.fail(-13);
                return;
            }
            if (!this.readsOutstanding()) {
                this.sendNextRead();
            }
        }

        @Override
        boolean complete(int bookieIndex, BookieId host, ByteBufList bufList) {
            boolean completed = super.complete(bookieIndex, host, bufList);
            if (completed) {
                int numReplicasTried = this.getNextReplicaIndexToReadFrom();
                for (int i = 0; i < numReplicasTried - 1; ++i) {
                    int slowBookieIndex = this.writeSet.get(i);
                    BookieId slowBookieSocketAddress = (BookieId)this.ensemble.get(slowBookieIndex);
                    BatchedReadOp.this.clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, this.eId);
                }
            }
            return completed;
        }
    }

    abstract class BatchedLedgerEntryRequest
    extends ReadOpBase.LedgerEntryRequest {
        final long lId;
        final int maxCount;
        final long maxSize;
        final List<LedgerEntry> entries;

        BatchedLedgerEntryRequest(List<BookieId> ensemble, long lId, long eId, int maxCount, long maxSize) {
            super(BatchedReadOp.this, ensemble, eId);
            this.lId = lId;
            this.maxCount = maxCount;
            this.maxSize = maxSize;
            this.entries = new ArrayList<LedgerEntry>(maxCount);
        }

        boolean complete(int bookieIndex, BookieId host, ByteBufList bufList) {
            if (this.isComplete()) {
                return false;
            }
            if (!this.complete.getAndSet(true)) {
                for (int i = 0; i < bufList.size(); ++i) {
                    ByteBuf content;
                    ByteBuf buffer = bufList.getBuffer(i);
                    try {
                        content = BatchedReadOp.this.lh.macManager.verifyDigestAndReturnData(this.eId + (long)i, buffer);
                    }
                    catch (BKException.BKDigestMatchException e) {
                        BatchedReadOp.this.clientCtx.getClientStats().getReadOpDmCounter().inc();
                        this.logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", -5);
                        return false;
                    }
                    this.rc = 0;
                    LedgerEntryImpl entryImpl = LedgerEntryImpl.create(BatchedReadOp.this.lh.ledgerId, BatchedReadOp.this.startEntryId + (long)i);
                    entryImpl.setLength(buffer.getLong(24));
                    entryImpl.setEntryBuf(content);
                    this.entries.add(entryImpl);
                }
                this.writeSet.recycle();
                return true;
            }
            this.writeSet.recycle();
            return false;
        }

        @Override
        public String toString() {
            return String.format("L%d-E%d~%d s-%d", BatchedReadOp.this.lh.getId(), this.eId, this.eId + (long)this.maxCount, this.maxSize);
        }
    }
}

