/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client;

import java.util.BitSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.ClientContext;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.SpeculativeRequestExecutor;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ReadOpBase
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ReadOpBase.class);
    protected ScheduledFuture<?> speculativeTask = null;
    protected final CompletableFuture<LedgerEntries> future;
    protected final Set<BookieId> heardFromHosts;
    protected final BitSet heardFromHostsBitSet;
    protected final Set<BookieId> sentToHosts = new HashSet<BookieId>();
    LedgerHandle lh;
    protected ClientContext clientCtx;
    protected final long startEntryId;
    protected long requestTimeNanos;
    protected final int requiredBookiesMissingEntryForRecovery;
    protected final boolean isRecoveryRead;
    protected final AtomicBoolean complete = new AtomicBoolean(false);
    protected boolean allowFailFast = false;
    long numPendingEntries;
    final long endEntryId;

    protected ReadOpBase(LedgerHandle lh, ClientContext clientCtx, long startEntryId, long endEntryId, boolean isRecoveryRead) {
        this.lh = lh;
        this.future = new CompletableFuture();
        this.startEntryId = startEntryId;
        this.endEntryId = endEntryId;
        this.isRecoveryRead = isRecoveryRead;
        this.requiredBookiesMissingEntryForRecovery = this.getLedgerMetadata().getWriteQuorumSize() - this.getLedgerMetadata().getAckQuorumSize() + 1;
        this.heardFromHosts = new HashSet<BookieId>();
        this.heardFromHostsBitSet = new BitSet(this.getLedgerMetadata().getEnsembleSize());
        this.allowFailFast = false;
        this.clientCtx = clientCtx;
    }

    protected LedgerMetadata getLedgerMetadata() {
        return this.lh.getLedgerMetadata();
    }

    protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
        if (this.speculativeTask != null) {
            this.speculativeTask.cancel(mayInterruptIfRunning);
            this.speculativeTask = null;
        }
    }

    public ScheduledFuture<?> getSpeculativeTask() {
        return this.speculativeTask;
    }

    CompletableFuture<LedgerEntries> future() {
        return this.future;
    }

    void allowFailFastOnUnwritableChannel() {
        this.allowFailFast = true;
    }

    public void submit() {
        this.clientCtx.getMainWorkerPool().executeOrdered(this.lh.ledgerId, (Runnable)this);
    }

    @Override
    public void run() {
        this.initiate();
    }

    abstract void initiate();

    protected abstract void submitCallback(int var1);

    protected static class ReadContext
    implements BookkeeperInternalCallbacks.ReadEntryCallbackCtx {
        final int bookieIndex;
        final BookieId to;
        final LedgerEntryRequest entry;
        long lac = -1L;

        ReadContext(int bookieIndex, BookieId to, LedgerEntryRequest entry) {
            this.bookieIndex = bookieIndex;
            this.to = to;
            this.entry = entry;
        }

        @Override
        public void setLastAddConfirmed(long lac) {
            this.lac = lac;
        }

        @Override
        public long getLastAddConfirmed() {
            return this.lac;
        }
    }

    abstract class LedgerEntryRequest
    implements SpeculativeRequestExecutor {
        final AtomicBoolean complete = new AtomicBoolean(false);
        int rc = 0;
        int firstError = 0;
        int numBookiesMissingEntry = 0;
        final long eId;
        final List<BookieId> ensemble;
        final DistributionSchedule.WriteSet writeSet;

        LedgerEntryRequest(List<BookieId> ensemble, long eId) {
            this.ensemble = ensemble;
            this.eId = eId;
            this.writeSet = ReadOpBase.this.clientCtx.getConf().enableReorderReadSequence ? ReadOpBase.this.clientCtx.getPlacementPolicy().reorderReadSequence(ensemble, ReadOpBase.this.lh.getBookiesHealthInfo(), ReadOpBase.this.lh.getWriteSetForReadOperation(eId)) : ReadOpBase.this.lh.getWriteSetForReadOperation(eId);
        }

        public void close() {
            if (this.complete.compareAndSet(false, true)) {
                this.rc = -999;
                this.writeSet.recycle();
            }
        }

        abstract void read();

        boolean fail(int rc) {
            if (this.complete.compareAndSet(false, true)) {
                this.rc = rc;
                this.writeSet.recycle();
                ReadOpBase.this.submitCallback(rc);
                return true;
            }
            return false;
        }

        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) {
            if (0 == this.firstError || -13 == this.firstError || -7 == this.firstError) {
                this.firstError = rc;
            } else if (-8 == this.firstError && -13 != rc && -7 != rc) {
                this.firstError = rc;
            }
            if (-13 == rc || -7 == rc) {
                ++this.numBookiesMissingEntry;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("No such entry found on bookie.  L{} E{} bookie: {}", new Object[]{ReadOpBase.this.lh.ledgerId, this.eId, host});
                }
            } else if (LOG.isInfoEnabled()) {
                LOG.info("{} while reading L{} E{} from bookie: {}", new Object[]{errMsg, ReadOpBase.this.lh.ledgerId, this.eId, host});
            }
            ReadOpBase.this.lh.recordReadErrorOnBookie(bookieIndex);
        }

        abstract BookieId maybeSendSpeculativeRead(BitSet var1);

        boolean isComplete() {
            return this.complete.get();
        }

        int getRc() {
            return this.rc;
        }

        public String toString() {
            return String.format("L%d-E%d", ReadOpBase.this.lh.getId(), this.eId);
        }

        @Override
        public ListenableFuture<Boolean> issueSpeculativeRequest() {
            return ReadOpBase.this.clientCtx.getMainWorkerPool().submitOrdered(ReadOpBase.this.lh.getId(), new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    if (!LedgerEntryRequest.this.isComplete() && null != LedgerEntryRequest.this.maybeSendSpeculativeRead(ReadOpBase.this.heardFromHostsBitSet)) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Send speculative read for {}. Hosts sent are {},  Hosts heard are {}, ensemble is {}.", new Object[]{this, ReadOpBase.this.sentToHosts, ReadOpBase.this.heardFromHostsBitSet, LedgerEntryRequest.this.ensemble});
                        }
                        return true;
                    }
                    return false;
                }
            });
        }
    }
}

