/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.ClientContext;
import org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadataUtils;
import org.apache.bookkeeper.client.SpeculativeRequestExecutionPolicy;
import org.apache.bookkeeper.client.SpeculativeRequestExecutor;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PendingReadOp
implements BookkeeperInternalCallbacks.ReadEntryCallback,
SafeRunnable {
    private static final Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
    private ScheduledFuture<?> speculativeTask = null;
    protected final List<LedgerEntryRequest> seq;
    private final CompletableFuture<LedgerEntries> future;
    private final Set<BookieSocketAddress> heardFromHosts;
    private final BitSet heardFromHostsBitSet;
    private final Set<BookieSocketAddress> sentToHosts = new HashSet<BookieSocketAddress>();
    LedgerHandle lh;
    final ClientContext clientCtx;
    long numPendingEntries;
    final long startEntryId;
    final long endEntryId;
    long requestTimeNanos;
    final int requiredBookiesMissingEntryForRecovery;
    final boolean isRecoveryRead;
    boolean parallelRead = false;
    final AtomicBoolean complete = new AtomicBoolean(false);
    boolean allowFailFast = false;

    PendingReadOp(LedgerHandle lh, ClientContext clientCtx, long startEntryId, long endEntryId, boolean isRecoveryRead) {
        this.seq = new ArrayList<LedgerEntryRequest>((int)(endEntryId + 1L - startEntryId));
        this.future = new CompletableFuture();
        this.lh = lh;
        this.clientCtx = clientCtx;
        this.startEntryId = startEntryId;
        this.endEntryId = endEntryId;
        this.isRecoveryRead = isRecoveryRead;
        this.allowFailFast = false;
        this.numPendingEntries = endEntryId - startEntryId + 1L;
        this.requiredBookiesMissingEntryForRecovery = this.getLedgerMetadata().getWriteQuorumSize() - this.getLedgerMetadata().getAckQuorumSize() + 1;
        this.heardFromHosts = new HashSet<BookieSocketAddress>();
        this.heardFromHostsBitSet = new BitSet(this.getLedgerMetadata().getEnsembleSize());
    }

    CompletableFuture<LedgerEntries> future() {
        return this.future;
    }

    protected LedgerMetadata getLedgerMetadata() {
        return this.lh.getLedgerMetadata();
    }

    protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
        if (this.speculativeTask != null) {
            this.speculativeTask.cancel(mayInterruptIfRunning);
            this.speculativeTask = null;
        }
    }

    public ScheduledFuture<?> getSpeculativeTask() {
        return this.speculativeTask;
    }

    PendingReadOp parallelRead(boolean enabled) {
        this.parallelRead = enabled;
        return this;
    }

    void allowFailFastOnUnwritableChannel() {
        this.allowFailFast = true;
    }

    public void submit() {
        this.clientCtx.getMainWorkerPool().executeOrdered(this.lh.ledgerId, (SafeRunnable)this);
    }

    void initiate() {
        long nextEnsembleChange = this.startEntryId;
        long i = this.startEntryId;
        this.requestTimeNanos = MathUtils.nowInNano();
        List<BookieSocketAddress> ensemble = null;
        do {
            if (i == nextEnsembleChange) {
                ensemble = this.getLedgerMetadata().getEnsembleAt(i);
                nextEnsembleChange = LedgerMetadataUtils.getNextEnsembleChange(this.getLedgerMetadata(), i);
            }
            LedgerEntryRequest entry = this.parallelRead ? new ParallelReadRequest(ensemble, this.lh.ledgerId, i) : new SequenceReadRequest(ensemble, this.lh.ledgerId, i);
            this.seq.add(entry);
        } while (++i <= this.endEntryId);
        for (LedgerEntryRequest entry : this.seq) {
            entry.read();
            if (this.parallelRead || !this.clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) continue;
            this.speculativeTask = ((SpeculativeRequestExecutionPolicy)this.clientCtx.getConf().readSpeculativeRequestPolicy.get()).initiateSpeculativeRequest((ScheduledExecutorService)this.clientCtx.getScheduler(), entry);
        }
    }

    public void safeRun() {
        this.initiate();
    }

    void sendReadTo(int bookieIndex, BookieSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
        if (this.lh.throttler != null) {
            this.lh.throttler.acquire();
        }
        int flags = this.isRecoveryRead ? 4 : 0;
        this.clientCtx.getBookieClient().readEntry(to, this.lh.ledgerId, entry.eId, this, new ReadContext(bookieIndex, to, entry), flags);
    }

    @Override
    public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) {
        ReadContext rctx = (ReadContext)ctx;
        LedgerEntryRequest entry = rctx.entry;
        if (rc != 0) {
            entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc);
            return;
        }
        this.heardFromHosts.add(rctx.to);
        this.heardFromHostsBitSet.set(rctx.bookieIndex, true);
        buffer.retain();
        if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
            if (!this.isRecoveryRead) {
                this.lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
            }
            this.submitCallback(0);
        } else {
            buffer.release();
        }
        if (this.numPendingEntries < 0L) {
            LOG.error("Read too many values for ledger {} : [{}, {}].", new Object[]{ledgerId, this.startEntryId, this.endEntryId});
        }
    }

    protected void submitCallback(int code) {
        if (0 == code) {
            --this.numPendingEntries;
            if (this.numPendingEntries != 0L) {
                return;
            }
        }
        if (!this.complete.compareAndSet(false, true)) {
            return;
        }
        this.cancelSpeculativeTask(true);
        long latencyNanos = MathUtils.elapsedNanos((long)this.requestTimeNanos);
        if (code != 0) {
            long firstUnread = -1L;
            Integer firstRc = null;
            for (LedgerEntryRequest req : this.seq) {
                if (req.isComplete()) continue;
                firstUnread = req.eId;
                firstRc = req.rc;
                break;
            }
            LOG.error("Read of ledger entry failed: L{} E{}-E{}, Sent to {}, Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", new Object[]{this.lh.getId(), this.startEntryId, this.endEntryId, this.sentToHosts, this.heardFromHosts, this.heardFromHostsBitSet, BKException.getMessage(code), firstUnread, firstRc});
            this.clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
            this.seq.forEach(LedgerEntryRequest::close);
            this.future.completeExceptionally(BKException.create(code));
        } else {
            this.clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
            this.future.complete(LedgerEntriesImpl.create(Lists.transform(this.seq, input -> input.entryImpl)));
        }
    }

    private static class ReadContext
    implements BookkeeperInternalCallbacks.ReadEntryCallbackCtx {
        final int bookieIndex;
        final BookieSocketAddress to;
        final LedgerEntryRequest entry;
        long lac = -1L;

        ReadContext(int bookieIndex, BookieSocketAddress to, LedgerEntryRequest entry) {
            this.bookieIndex = bookieIndex;
            this.to = to;
            this.entry = entry;
        }

        @Override
        public void setLastAddConfirmed(long lac) {
            this.lac = lac;
        }

        @Override
        public long getLastAddConfirmed() {
            return this.lac;
        }
    }

    class SequenceReadRequest
    extends LedgerEntryRequest {
        static final int NOT_FOUND = -1;
        int nextReplicaIndexToReadFrom;
        final BitSet sentReplicas;
        final BitSet erroredReplicas;

        SequenceReadRequest(List<BookieSocketAddress> ensemble, long lId, long eId) {
            super(ensemble, lId, eId);
            this.nextReplicaIndexToReadFrom = 0;
            this.sentReplicas = new BitSet(PendingReadOp.this.lh.getLedgerMetadata().getWriteQuorumSize());
            this.erroredReplicas = new BitSet(PendingReadOp.this.lh.getLedgerMetadata().getWriteQuorumSize());
        }

        private synchronized int getNextReplicaIndexToReadFrom() {
            return this.nextReplicaIndexToReadFrom;
        }

        private BitSet getSentToBitSet() {
            BitSet b = new BitSet(this.ensemble.size());
            for (int i = 0; i < this.sentReplicas.length(); ++i) {
                if (!this.sentReplicas.get(i)) continue;
                b.set(this.writeSet.get(i));
            }
            return b;
        }

        private boolean readsOutstanding() {
            return this.sentReplicas.cardinality() - this.erroredReplicas.cardinality() > 0;
        }

        @Override
        synchronized BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFrom) {
            if (this.nextReplicaIndexToReadFrom >= PendingReadOp.this.getLedgerMetadata().getWriteQuorumSize()) {
                return null;
            }
            BitSet sentTo = this.getSentToBitSet();
            sentTo.and(heardFrom);
            if (sentTo.cardinality() == 0) {
                PendingReadOp.this.clientCtx.getClientStats().getSpeculativeReadCounter().inc();
                return this.sendNextRead();
            }
            return null;
        }

        @Override
        void read() {
            this.sendNextRead();
        }

        synchronized BookieSocketAddress sendNextRead() {
            if (this.nextReplicaIndexToReadFrom >= PendingReadOp.this.getLedgerMetadata().getWriteQuorumSize()) {
                this.fail(this.firstError);
                return null;
            }
            int replica = this.nextReplicaIndexToReadFrom;
            int bookieIndex = this.writeSet.get(this.nextReplicaIndexToReadFrom);
            ++this.nextReplicaIndexToReadFrom;
            try {
                BookieSocketAddress to = (BookieSocketAddress)this.ensemble.get(bookieIndex);
                PendingReadOp.this.sendReadTo(bookieIndex, to, this);
                PendingReadOp.this.sentToHosts.add(to);
                this.sentReplicas.set(replica);
                return to;
            }
            catch (InterruptedException ie) {
                LOG.error("Interrupted reading entry " + this, (Throwable)ie);
                Thread.currentThread().interrupt();
                this.fail(-15);
                return null;
            }
        }

        @Override
        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) {
            super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
            int replica = this.writeSet.indexOf(bookieIndex);
            if (replica == -1) {
                LOG.error("Received error from a host which is not in the ensemble {} {}.", (Object)host, (Object)this.ensemble);
                return;
            }
            this.erroredReplicas.set(replica);
            if (PendingReadOp.this.isRecoveryRead && this.numBookiesMissingEntry >= PendingReadOp.this.requiredBookiesMissingEntryForRecovery) {
                this.fail(-13);
                return;
            }
            if (!this.readsOutstanding()) {
                this.sendNextRead();
            }
        }

        @Override
        boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer) {
            boolean completed = super.complete(bookieIndex, host, buffer);
            if (completed) {
                int numReplicasTried = this.getNextReplicaIndexToReadFrom();
                for (int i = 0; i < numReplicasTried - 1; ++i) {
                    int slowBookieIndex = this.writeSet.get(i);
                    BookieSocketAddress slowBookieSocketAddress = (BookieSocketAddress)this.ensemble.get(slowBookieIndex);
                    PendingReadOp.this.clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, this.eId);
                }
            }
            return completed;
        }
    }

    class ParallelReadRequest
    extends LedgerEntryRequest {
        int numPendings;

        ParallelReadRequest(List<BookieSocketAddress> ensemble, long lId, long eId) {
            super(ensemble, lId, eId);
            this.numPendings = this.writeSet.size();
        }

        @Override
        void read() {
            for (int i = 0; i < this.writeSet.size(); ++i) {
                BookieSocketAddress to = (BookieSocketAddress)this.ensemble.get(this.writeSet.get(i));
                try {
                    PendingReadOp.this.sendReadTo(this.writeSet.get(i), to, this);
                    continue;
                }
                catch (InterruptedException ie) {
                    LOG.error("Interrupted reading entry {} : ", (Object)this, (Object)ie);
                    Thread.currentThread().interrupt();
                    this.fail(-15);
                    return;
                }
            }
        }

        @Override
        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) {
            super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
            --this.numPendings;
            if (PendingReadOp.this.isRecoveryRead && this.numBookiesMissingEntry >= PendingReadOp.this.requiredBookiesMissingEntryForRecovery) {
                this.fail(-13);
            } else if (this.numPendings == 0) {
                this.fail(this.firstError);
            }
        }

        @Override
        BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFromHostsBitSet) {
            return null;
        }
    }

    abstract class LedgerEntryRequest
    implements SpeculativeRequestExecutor,
    AutoCloseable {
        final AtomicBoolean complete = new AtomicBoolean(false);
        int rc = 0;
        int firstError = 0;
        int numBookiesMissingEntry = 0;
        final List<BookieSocketAddress> ensemble;
        final DistributionSchedule.WriteSet writeSet;
        final LedgerEntryImpl entryImpl;
        final long eId;

        LedgerEntryRequest(List<BookieSocketAddress> ensemble, long lId, long eId) {
            this.entryImpl = LedgerEntryImpl.create(lId, eId);
            this.ensemble = ensemble;
            this.eId = eId;
            this.writeSet = PendingReadOp.this.clientCtx.getConf().enableReorderReadSequence ? PendingReadOp.this.clientCtx.getPlacementPolicy().reorderReadSequence(ensemble, PendingReadOp.this.lh.getBookiesHealthInfo(), PendingReadOp.this.lh.getWriteSetForReadOperation(eId)) : PendingReadOp.this.lh.getWriteSetForReadOperation(eId);
        }

        @Override
        public void close() {
            this.entryImpl.close();
        }

        abstract void read();

        boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer) {
            ByteBuf content;
            try {
                content = PendingReadOp.this.lh.macManager.verifyDigestAndReturnData(this.eId, buffer);
            }
            catch (BKException.BKDigestMatchException e) {
                PendingReadOp.this.clientCtx.getClientStats().getReadOpDmCounter().inc();
                this.logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", -5);
                return false;
            }
            if (!this.complete.getAndSet(true)) {
                this.rc = 0;
                this.entryImpl.setLength(buffer.getLong(24));
                this.entryImpl.setEntryBuf(content);
                this.writeSet.recycle();
                return true;
            }
            return false;
        }

        boolean fail(int rc) {
            if (this.complete.compareAndSet(false, true)) {
                this.rc = rc;
                PendingReadOp.this.submitCallback(rc);
                this.writeSet.recycle();
                return true;
            }
            return false;
        }

        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) {
            if (0 == this.firstError || -13 == this.firstError || -7 == this.firstError) {
                this.firstError = rc;
            } else if (-8 == this.firstError && -13 != rc && -7 != rc) {
                this.firstError = rc;
            }
            if (-13 == rc || -7 == rc) {
                ++this.numBookiesMissingEntry;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("No such entry found on bookie.  L{} E{} bookie: {}", new Object[]{PendingReadOp.this.lh.ledgerId, this.eId, host});
                }
            } else if (LOG.isInfoEnabled()) {
                LOG.info("{} while reading L{} E{} from bookie: {}", new Object[]{errMsg, PendingReadOp.this.lh.ledgerId, this.eId, host});
            }
            PendingReadOp.this.lh.recordReadErrorOnBookie(bookieIndex);
        }

        abstract BookieSocketAddress maybeSendSpeculativeRead(BitSet var1);

        boolean isComplete() {
            return this.complete.get();
        }

        int getRc() {
            return this.rc;
        }

        public String toString() {
            return String.format("L%d-E%d", PendingReadOp.this.lh.getId(), this.eId);
        }

        @Override
        public ListenableFuture<Boolean> issueSpeculativeRequest() {
            return PendingReadOp.this.clientCtx.getMainWorkerPool().submitOrdered(PendingReadOp.this.lh.getId(), (Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    if (!LedgerEntryRequest.this.isComplete() && null != LedgerEntryRequest.this.maybeSendSpeculativeRead(PendingReadOp.this.heardFromHostsBitSet)) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Send speculative read for {}. Hosts sent are {},  Hosts heard are {}, ensemble is {}.", new Object[]{this, PendingReadOp.this.sentToHosts, PendingReadOp.this.heardFromHostsBitSet, LedgerEntryRequest.this.ensemble});
                        }
                        return true;
                    }
                    return false;
                }
            });
        }
    }
}

