/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.DigestManager;
import org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.bookkeeper.client.ExplicitLacFlushPolicy;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.client.LedgerRecoveryOp;
import org.apache.bookkeeper.client.MacDigestManager;
import org.apache.bookkeeper.client.PendingAddOp;
import org.apache.bookkeeper.client.PendingReadLacOp;
import org.apache.bookkeeper.client.PendingReadOp;
import org.apache.bookkeeper.client.ReadLastConfirmedAndEntryOp;
import org.apache.bookkeeper.client.ReadLastConfirmedOp;
import org.apache.bookkeeper.client.RoundRobinDistributionSchedule;
import org.apache.bookkeeper.client.SyncCallbackUtils;
import org.apache.bookkeeper.client.TryReadLastConfirmedOp;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.collections4.IteratorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LedgerHandle
implements WriteHandle {
    static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);
    final byte[] ledgerKey;
    LedgerMetadata metadata;
    final BookKeeper bk;
    final long ledgerId;
    long lastAddPushed;
    volatile long lastAddConfirmed;
    long length;
    final DigestManager macManager;
    final DistributionSchedule distributionSchedule;
    final RateLimiter throttler;
    final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
    final boolean enableParallelRecoveryRead;
    final int recoveryReadBatchSize;
    public static final long INVALID_ENTRY_ID = -1L;
    final AtomicInteger blockAddCompletions = new AtomicInteger(0);
    final AtomicInteger numEnsembleChanges = new AtomicInteger(0);
    Queue<PendingAddOp> pendingAddOps;
    ExplicitLacFlushPolicy explicitLacFlushPolicy;
    final Counter ensembleChangeCounter;
    final Counter lacUpdateHitsCounter;
    final Counter lacUpdateMissesCounter;
    private static final byte[] emptyLedgerKey;

    LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata, BookKeeper.DigestType digestType, byte[] password) throws GeneralSecurityException, NumberFormatException {
        this.bk = bk;
        this.metadata = metadata;
        this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
        this.enableParallelRecoveryRead = bk.getConf().getEnableParallelRecoveryRead();
        this.recoveryReadBatchSize = bk.getConf().getRecoveryReadBatchSize();
        if (metadata.isClosed()) {
            this.lastAddConfirmed = this.lastAddPushed = metadata.getLastEntryId();
            this.length = metadata.getLength();
        } else {
            this.lastAddPushed = -1L;
            this.lastAddConfirmed = -1L;
            this.length = 0L;
        }
        this.ledgerId = ledgerId;
        this.throttler = bk.getConf().getThrottleValue() > 0 ? RateLimiter.create((double)bk.getConf().getThrottleValue()) : null;
        this.macManager = DigestManager.instantiate(ledgerId, password, digestType);
        this.ledgerKey = password.length > 0 ? MacDigestManager.genDigest("ledger", password) : emptyLedgerKey;
        this.distributionSchedule = new RoundRobinDistributionSchedule(metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(), metadata.getEnsembleSize());
        this.bookieFailureHistory = CacheBuilder.newBuilder().expireAfterWrite((long)bk.getConf().getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS).build((CacheLoader)new CacheLoader<BookieSocketAddress, Long>(){

            public Long load(BookieSocketAddress key) {
                return -1L;
            }
        });
        this.ensembleChangeCounter = bk.getStatsLogger().getCounter("NUM_ENSEMBLE_CHANGE");
        this.lacUpdateHitsCounter = bk.getStatsLogger().getCounter("LAC_UPDATE_HITS");
        this.lacUpdateMissesCounter = bk.getStatsLogger().getCounter("LAC_UPDATE_MISSES");
        bk.getStatsLogger().registerGauge("NUM_PENDING_ADD", (Gauge)new Gauge<Integer>(){

            public Integer getDefaultValue() {
                return 0;
            }

            public Integer getSample() {
                return LedgerHandle.this.pendingAddOps.size();
            }
        });
        this.initializeExplicitLacFlushPolicy();
    }

    protected void initializeExplicitLacFlushPolicy() {
        this.explicitLacFlushPolicy = !this.metadata.isClosed() && this.bk.getExplicitLacInterval() > 0 ? new ExplicitLacFlushPolicy.ExplicitLacFlushPolicyImpl(this) : ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY;
    }

    @Override
    public long getId() {
        return this.ledgerId;
    }

    @Override
    public synchronized long getLastAddConfirmed() {
        return this.lastAddConfirmed;
    }

    synchronized void setLastAddConfirmed(long lac) {
        this.lastAddConfirmed = lac;
    }

    @Override
    public synchronized long getLastAddPushed() {
        return this.lastAddPushed;
    }

    public byte[] getLedgerKey() {
        return Arrays.copyOf(this.ledgerKey, this.ledgerKey.length);
    }

    @Override
    public LedgerMetadata getLedgerMetadata() {
        return this.metadata;
    }

    public Map<String, byte[]> getCustomMetadata() {
        return this.metadata.getCustomMetadata();
    }

    public synchronized long getNumFragments() {
        return this.metadata.getEnsembles().size();
    }

    public synchronized long getNumBookies() {
        TreeMap<Long, ArrayList<BookieSocketAddress>> m = this.metadata.getEnsembles();
        HashSet s = Sets.newHashSet();
        for (ArrayList aList : m.values()) {
            s.addAll(aList);
        }
        return s.size();
    }

    DigestManager getDigestManager() {
        return this.macManager;
    }

    synchronized long addToLength(long delta) {
        this.length += delta;
        return this.length;
    }

    @Override
    public synchronized long getLength() {
        return this.length;
    }

    DistributionSchedule getDistributionSchedule() {
        return this.distributionSchedule;
    }

    void writeLedgerConfig(BookkeeperInternalCallbacks.GenericCallback<Void> writeCb) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Writing metadata to ledger manager: {}, {}", (Object)this.ledgerId, (Object)this.metadata.getVersion());
        }
        this.bk.getLedgerManager().writeLedgerMetadata(this.ledgerId, this.metadata, writeCb);
    }

    @Override
    public void close() throws InterruptedException, BKException {
        SyncCallbackUtils.waitForResult(this.asyncClose());
    }

    @Override
    public CompletableFuture<Void> asyncClose() {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        SyncCallbackUtils.SyncCloseCallback callback = new SyncCallbackUtils.SyncCloseCallback(result);
        this.asyncClose(callback, null);
        this.explicitLacFlushPolicy.stopExplicitLacFlush();
        return result;
    }

    public void asyncClose(AsyncCallback.CloseCallback cb, Object ctx) {
        this.asyncCloseInternal(cb, ctx, -11);
    }

    @Override
    public synchronized boolean isClosed() {
        return this.metadata.isClosed();
    }

    void asyncCloseInternal(AsyncCallback.CloseCallback cb, Object ctx, int rc) {
        try {
            this.doAsyncCloseInternal(cb, ctx, rc);
        }
        catch (RejectedExecutionException re) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed to close ledger {} : ", (Object)this.ledgerId, (Object)re);
            }
            this.errorOutPendingAdds(this.bk.getReturnRc(rc));
            cb.closeComplete(this.bk.getReturnRc(-15), this, ctx);
        }
    }

    void doAsyncCloseInternal(final AsyncCallback.CloseCallback cb, final Object ctx, final int rc) {
        this.bk.getMainWorkerPool().submitOrdered(this.ledgerId, new SafeRunnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void safeRun() {
                List<PendingAddOp> pendingAdds;
                long prevLength;
                long prevLastEntryId;
                DataFormats.LedgerMetadataFormat.State prevState;
                if (LedgerHandle.this.isClosed()) {
                    List<PendingAddOp> pendingAdds2;
                    LedgerHandle ledgerHandle = LedgerHandle.this;
                    synchronized (ledgerHandle) {
                        pendingAdds2 = LedgerHandle.this.drainPendingAddsToErrorOut();
                    }
                    LedgerHandle.this.errorOutPendingAdds(rc, pendingAdds2);
                    cb.closeComplete(0, LedgerHandle.this, ctx);
                    return;
                }
                LedgerHandle ledgerHandle = LedgerHandle.this;
                synchronized (ledgerHandle) {
                    prevState = LedgerHandle.this.metadata.getState();
                    prevLastEntryId = LedgerHandle.this.metadata.getLastEntryId();
                    prevLength = LedgerHandle.this.metadata.getLength();
                    pendingAdds = LedgerHandle.this.drainPendingAddsToErrorOut();
                    LedgerHandle.this.metadata.setLength(LedgerHandle.this.length);
                    LedgerHandle.this.metadata.close(LedgerHandle.this.lastAddConfirmed);
                    LedgerHandle.this.lastAddPushed = LedgerHandle.this.lastAddConfirmed;
                }
                LedgerHandle.this.errorOutPendingAdds(rc, pendingAdds);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Closing ledger: " + LedgerHandle.this.ledgerId + " at entryId: " + LedgerHandle.this.metadata.getLastEntryId() + " with this many bytes: " + LedgerHandle.this.metadata.getLength());
                }
                final class CloseCb
                extends OrderedSafeExecutor.OrderedSafeGenericCallback<Void> {
                    final /* synthetic */ DataFormats.LedgerMetadataFormat.State val$prevState;
                    final /* synthetic */ long val$prevLastEntryId;
                    final /* synthetic */ long val$prevLength;

                    CloseCb() {
                        this.val$prevState = state;
                        this.val$prevLastEntryId = l;
                        this.val$prevLength = l2;
                        super(LedgerHandle.this.bk.getMainWorkerPool(), LedgerHandle.this.ledgerId);
                    }

                    @Override
                    public void safeOperationComplete(final int rc, Void result) {
                        if (rc == -17) {
                            LedgerHandle.this.rereadMetadata((BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>)new OrderedSafeExecutor.OrderedSafeGenericCallback<LedgerMetadata>(LedgerHandle.this.bk.getMainWorkerPool(), LedgerHandle.this.ledgerId){

                                @Override
                                public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
                                    if (newrc != 0) {
                                        LOG.error("Error reading new metadata from ledger {} when closing, code={}", (Object)LedgerHandle.this.ledgerId, (Object)newrc);
                                        cb.closeComplete(rc, LedgerHandle.this, ctx);
                                    } else {
                                        LedgerHandle.this.metadata.setState(val$prevState);
                                        if (val$prevState.equals((Object)DataFormats.LedgerMetadataFormat.State.CLOSED)) {
                                            LedgerHandle.this.metadata.close(val$prevLastEntryId);
                                        }
                                        LedgerHandle.this.metadata.setLength(val$prevLength);
                                        if (!LedgerHandle.this.metadata.isNewerThan(newMeta) && !LedgerHandle.this.metadata.isConflictWith(newMeta)) {
                                            LedgerHandle.this.metadata.setEnsembles(newMeta.getEnsembles());
                                            LedgerHandle.this.metadata.setVersion(newMeta.version);
                                            LedgerHandle.this.metadata.setLength(LedgerHandle.this.length);
                                            LedgerHandle.this.metadata.close(LedgerHandle.this.getLastAddConfirmed());
                                            LedgerHandle.this.writeLedgerConfig(new CloseCb(this, val$prevState, val$prevLastEntryId, val$prevLength));
                                            return;
                                        }
                                        LedgerHandle.this.metadata.setLength(LedgerHandle.this.length);
                                        LedgerHandle.this.metadata.close(LedgerHandle.this.getLastAddConfirmed());
                                        LOG.warn("Conditional update ledger metadata for ledger {} failed.", (Object)LedgerHandle.this.ledgerId);
                                        cb.closeComplete(rc, LedgerHandle.this, ctx);
                                    }
                                }

                                public String toString() {
                                    return String.format("ReReadMetadataForClose(%d)", LedgerHandle.this.ledgerId);
                                }
                            });
                        } else if (rc != 0) {
                            LOG.error("Error update ledger metadata for ledger {} : {}", (Object)LedgerHandle.this.ledgerId, (Object)rc);
                            cb.closeComplete(rc, LedgerHandle.this, ctx);
                        } else {
                            cb.closeComplete(0, LedgerHandle.this, ctx);
                        }
                    }

                    public String toString() {
                        return String.format("WriteLedgerConfigForClose(%d)", LedgerHandle.this.ledgerId);
                    }
                }
                LedgerHandle.this.writeLedgerConfig(new CloseCb());
            }

            public String toString() {
                return String.format("CloseLedgerHandle(%d)", LedgerHandle.this.ledgerId);
            }
        });
    }

    public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry) throws InterruptedException, BKException {
        CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<Enumeration<LedgerEntry>>();
        this.asyncReadEntries(firstEntry, lastEntry, new SyncCallbackUtils.SyncReadCallback(result), null);
        return SyncCallbackUtils.waitForResult(result);
    }

    public Enumeration<LedgerEntry> readUnconfirmedEntries(long firstEntry, long lastEntry) throws InterruptedException, BKException {
        CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<Enumeration<LedgerEntry>>();
        this.asyncReadUnconfirmedEntries(firstEntry, lastEntry, new SyncCallbackUtils.SyncReadCallback(result), null);
        return SyncCallbackUtils.waitForResult(result);
    }

    public void asyncReadEntries(long firstEntry, long lastEntry, AsyncCallback.ReadCallback cb, Object ctx) {
        if (firstEntry < 0L || firstEntry > lastEntry) {
            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}", new Object[]{this.ledgerId, firstEntry, lastEntry});
            cb.readComplete(-14, this, null, ctx);
            return;
        }
        if (lastEntry > this.lastAddConfirmed) {
            LOG.error("ReadException on ledgerId:{} firstEntry:{} lastEntry:{}", new Object[]{this.ledgerId, firstEntry, lastEntry});
            cb.readComplete(-1, this, null, ctx);
            return;
        }
        this.asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx);
    }

    public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, AsyncCallback.ReadCallback cb, Object ctx) {
        if (firstEntry < 0L || firstEntry > lastEntry) {
            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}", new Object[]{this.ledgerId, firstEntry, lastEntry});
            cb.readComplete(-14, this, null, ctx);
            return;
        }
        this.asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx);
    }

    @Override
    public CompletableFuture<LedgerEntries> read(long firstEntry, long lastEntry) {
        if (firstEntry < 0L || firstEntry > lastEntry) {
            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}", new Object[]{this.ledgerId, firstEntry, lastEntry});
            return FutureUtils.exception((Throwable)new BKException.BKIncorrectParameterException());
        }
        if (lastEntry > this.lastAddConfirmed) {
            LOG.error("ReadException on ledgerId:{} firstEntry:{} lastEntry:{}", new Object[]{this.ledgerId, firstEntry, lastEntry});
            return FutureUtils.exception((Throwable)new BKException.BKReadException());
        }
        return this.readEntriesInternalAsync(firstEntry, lastEntry);
    }

    @Override
    public CompletableFuture<LedgerEntries> readUnconfirmed(long firstEntry, long lastEntry) {
        if (firstEntry < 0L || firstEntry > lastEntry) {
            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}", new Object[]{this.ledgerId, firstEntry, lastEntry});
            return FutureUtils.exception((Throwable)new BKException.BKIncorrectParameterException());
        }
        return this.readEntriesInternalAsync(firstEntry, lastEntry);
    }

    void asyncReadEntriesInternal(long firstEntry, long lastEntry, final AsyncCallback.ReadCallback cb, final Object ctx) {
        if (!this.bk.isClosed()) {
            this.readEntriesInternalAsync(firstEntry, lastEntry).whenCompleteAsync((BiConsumer)new FutureEventListener<LedgerEntries>(){

                public void onSuccess(LedgerEntries entries) {
                    cb.readComplete(0, LedgerHandle.this, IteratorUtils.asEnumeration((Iterator)Iterators.transform(entries.iterator(), le -> {
                        LedgerEntry entry = new LedgerEntry((LedgerEntryImpl)le);
                        le.close();
                        return entry;
                    })), ctx);
                }

                public void onFailure(Throwable cause) {
                    if (cause instanceof BKException) {
                        BKException bke = (BKException)cause;
                        cb.readComplete(bke.getCode(), LedgerHandle.this, null, ctx);
                    } else {
                        cb.readComplete(-999, LedgerHandle.this, null, ctx);
                    }
                }
            }, (Executor)this.bk.getMainWorkerPool().chooseThread(this.ledgerId));
        } else {
            cb.readComplete(-19, this, null, ctx);
        }
    }

    CompletableFuture<LedgerEntries> readEntriesInternalAsync(long firstEntry, long lastEntry) {
        PendingReadOp op = new PendingReadOp(this, this.bk.getScheduler(), firstEntry, lastEntry);
        if (!this.bk.isClosed()) {
            this.bk.getMainWorkerPool().submitOrdered(this.ledgerId, op);
        } else {
            op.future().completeExceptionally(BKException.create(-19));
        }
        return op.future();
    }

    public long addEntry(byte[] data) throws InterruptedException, BKException {
        return this.addEntry(data, 0, data.length);
    }

    @Override
    public CompletableFuture<Long> append(ByteBuf data) {
        SyncCallbackUtils.SyncAddCallback callback = new SyncCallbackUtils.SyncAddCallback();
        this.asyncAddEntry(data, (AsyncCallback.AddCallback)callback, null);
        return callback;
    }

    public long addEntry(long entryId, byte[] data) throws InterruptedException, BKException {
        LOG.error("To use this feature Ledger must be created with createLedgerAdv interface.");
        throw BKException.create(-100);
    }

    public long addEntry(byte[] data, int offset, int length) throws InterruptedException, BKException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Adding entry {}", (Object)data);
        }
        SyncCallbackUtils.SyncAddCallback callback = new SyncCallbackUtils.SyncAddCallback();
        this.asyncAddEntry(data, offset, length, callback, null);
        return SyncCallbackUtils.waitForResult(callback);
    }

    public long addEntry(long entryId, byte[] data, int offset, int length) throws InterruptedException, BKException {
        LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
        throw BKException.create(-100);
    }

    public void asyncAddEntry(byte[] data, AsyncCallback.AddCallback cb, Object ctx) {
        this.asyncAddEntry(data, 0, data.length, cb, ctx);
    }

    public void asyncAddEntry(long entryId, byte[] data, AsyncCallback.AddCallback cb, Object ctx) {
        LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
        cb.addComplete(-100, this, entryId, ctx);
    }

    public void asyncAddEntry(byte[] data, int offset, int length, AsyncCallback.AddCallback cb, Object ctx) {
        if (offset < 0 || length < 0 || offset + length > data.length) {
            throw new ArrayIndexOutOfBoundsException("Invalid values for offset(" + offset + ") or length(" + length + ")");
        }
        this.asyncAddEntry(Unpooled.wrappedBuffer((byte[])data, (int)offset, (int)length), cb, ctx);
    }

    public void asyncAddEntry(ByteBuf data, AsyncCallback.AddCallback cb, Object ctx) {
        data.retain();
        PendingAddOp op = PendingAddOp.create(this, data, cb, ctx);
        this.doAsyncAddEntry(op);
    }

    public void asyncAddEntry(long entryId, byte[] data, int offset, int length, AsyncCallback.AddCallback cb, Object ctx) throws BKException {
        LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
        cb.addComplete(-100, this, entryId, ctx);
    }

    void asyncRecoveryAddEntry(byte[] data, int offset, int length, AsyncCallback.AddCallback cb, Object ctx) {
        PendingAddOp op = PendingAddOp.create(this, Unpooled.wrappedBuffer((byte[])data, (int)offset, (int)length), cb, ctx).enableRecoveryAdd();
        this.doAsyncAddEntry(op);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doAsyncAddEntry(final PendingAddOp op) {
        if (this.throttler != null) {
            this.throttler.acquire();
        }
        boolean wasClosed = false;
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            if (this.metadata.isClosed()) {
                wasClosed = true;
            } else {
                long entryId = ++this.lastAddPushed;
                long currentLedgerLength = this.addToLength(op.payload.readableBytes());
                op.setEntryId(entryId);
                op.setLedgerLength(currentLedgerLength);
                this.pendingAddOps.add(op);
            }
        }
        if (wasClosed) {
            try {
                this.bk.getMainWorkerPool().submit(new SafeRunnable(){

                    public void safeRun() {
                        LOG.warn("Attempt to add to closed ledger: {}", (Object)LedgerHandle.this.ledgerId);
                        op.cb.addComplete(-11, LedgerHandle.this, -1L, op.ctx);
                    }

                    public String toString() {
                        return String.format("AsyncAddEntryToClosedLedger(lid=%d)", LedgerHandle.this.ledgerId);
                    }
                });
            }
            catch (RejectedExecutionException e) {
                op.cb.addComplete(this.bk.getReturnRc(-15), this, -1L, op.ctx);
            }
            return;
        }
        try {
            this.bk.getMainWorkerPool().submitOrdered(this.ledgerId, (SafeRunnable)op);
        }
        catch (RejectedExecutionException e) {
            op.cb.addComplete(this.bk.getReturnRc(-15), this, -1L, op.ctx);
        }
    }

    synchronized void updateLastConfirmed(long lac, long len) {
        if (lac > this.lastAddConfirmed) {
            this.lastAddConfirmed = lac;
            this.lacUpdateHitsCounter.inc();
        } else {
            this.lacUpdateMissesCounter.inc();
        }
        this.lastAddPushed = Math.max(this.lastAddPushed, lac);
        this.length = Math.max(this.length, len);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncReadLastConfirmed(final AsyncCallback.ReadLastConfirmedCallback cb, final Object ctx) {
        long lastEntryId;
        boolean isClosed;
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            isClosed = this.metadata.isClosed();
            lastEntryId = this.metadata.getLastEntryId();
        }
        if (isClosed) {
            cb.readLastConfirmedComplete(0, lastEntryId, ctx);
            return;
        }
        ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback(){

            @Override
            public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
                if (rc == 0) {
                    LedgerHandle.this.updateLastConfirmed(data.lastAddConfirmed, data.length);
                    cb.readLastConfirmedComplete(rc, data.lastAddConfirmed, ctx);
                } else {
                    cb.readLastConfirmedComplete(rc, -1L, ctx);
                }
            }
        };
        new ReadLastConfirmedOp(this, innercb).initiate();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncTryReadLastConfirmed(final AsyncCallback.ReadLastConfirmedCallback cb, final Object ctx) {
        long lastEntryId;
        boolean isClosed;
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            isClosed = this.metadata.isClosed();
            lastEntryId = this.metadata.getLastEntryId();
        }
        if (isClosed) {
            cb.readLastConfirmedComplete(0, lastEntryId, ctx);
            return;
        }
        ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback(){
            AtomicBoolean completed = new AtomicBoolean(false);

            @Override
            public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
                if (rc == 0) {
                    LedgerHandle.this.updateLastConfirmed(data.lastAddConfirmed, data.length);
                    if (this.completed.compareAndSet(false, true)) {
                        cb.readLastConfirmedComplete(rc, data.lastAddConfirmed, ctx);
                    }
                } else if (this.completed.compareAndSet(false, true)) {
                    cb.readLastConfirmedComplete(rc, -1L, ctx);
                }
            }
        };
        new TryReadLastConfirmedOp(this, innercb, this.getLastAddConfirmed()).initiate();
    }

    @Override
    public CompletableFuture<Long> tryReadLastAddConfirmed() {
        SyncCallbackUtils.FutureReadLastConfirmed result = new SyncCallbackUtils.FutureReadLastConfirmed();
        this.asyncTryReadLastConfirmed(result, null);
        return result;
    }

    @Override
    public CompletableFuture<Long> readLastAddConfirmed() {
        SyncCallbackUtils.FutureReadLastConfirmed result = new SyncCallbackUtils.FutureReadLastConfirmed();
        this.asyncReadLastConfirmed(result, null);
        return result;
    }

    @Override
    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntry(long entryId, long timeOutInMillis, boolean parallel) {
        SyncCallbackUtils.FutureReadLastConfirmedAndEntry result = new SyncCallbackUtils.FutureReadLastConfirmedAndEntry();
        this.asyncReadLastConfirmedAndEntry(entryId, timeOutInMillis, parallel, result, null);
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncReadLastConfirmedAndEntry(long entryId, long timeOutInMillis, boolean parallel, final AsyncCallback.ReadLastConfirmedAndEntryCallback cb, final Object ctx) {
        long lac;
        boolean isClosed;
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            isClosed = this.metadata.isClosed();
            lac = this.metadata.getLastEntryId();
        }
        if (isClosed) {
            if (entryId > lac) {
                cb.readLastConfirmedAndEntryComplete(0, lac, null, ctx);
                return;
            }
        } else {
            lac = this.getLastAddConfirmed();
        }
        if (entryId <= lac) {
            this.asyncReadEntries(entryId, entryId, new AsyncCallback.ReadCallback(){

                @Override
                public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                    if (0 == rc) {
                        if (seq.hasMoreElements()) {
                            cb.readLastConfirmedAndEntryComplete(rc, LedgerHandle.this.getLastAddConfirmed(), seq.nextElement(), ctx);
                        } else {
                            cb.readLastConfirmedAndEntryComplete(rc, LedgerHandle.this.getLastAddConfirmed(), null, ctx);
                        }
                    } else {
                        cb.readLastConfirmedAndEntryComplete(rc, -1L, null, ctx);
                    }
                }
            }, ctx);
            return;
        }
        ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb = new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback(){
            AtomicBoolean completed = new AtomicBoolean(false);

            @Override
            public void readLastConfirmedAndEntryComplete(int rc, long lastAddConfirmed, LedgerEntry entry) {
                if (rc == 0) {
                    if (this.completed.compareAndSet(false, true)) {
                        cb.readLastConfirmedAndEntryComplete(rc, lastAddConfirmed, entry, ctx);
                    }
                } else if (this.completed.compareAndSet(false, true)) {
                    cb.readLastConfirmedAndEntryComplete(rc, -1L, null, ctx);
                }
            }
        };
        new ReadLastConfirmedAndEntryOp(this, innercb, entryId - 1L, timeOutInMillis, this.bk.getScheduler()).parallelRead(parallel).initiate();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long readLastConfirmed() throws InterruptedException, BKException {
        LastConfirmedCtx ctx = new LastConfirmedCtx();
        this.asyncReadLastConfirmed(new SyncCallbackUtils.SyncReadLastConfirmedCallback(), ctx);
        LastConfirmedCtx lastConfirmedCtx = ctx;
        synchronized (lastConfirmedCtx) {
            while (!ctx.ready()) {
                ctx.wait();
            }
        }
        if (ctx.getRC() != 0) {
            throw BKException.create(ctx.getRC());
        }
        return ctx.getlastConfirmed();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long tryReadLastConfirmed() throws InterruptedException, BKException {
        LastConfirmedCtx ctx = new LastConfirmedCtx();
        this.asyncTryReadLastConfirmed(new SyncCallbackUtils.SyncReadLastConfirmedCallback(), ctx);
        LastConfirmedCtx lastConfirmedCtx = ctx;
        synchronized (lastConfirmedCtx) {
            while (!ctx.ready()) {
                ctx.wait();
            }
        }
        if (ctx.getRC() != 0) {
            throw BKException.create(ctx.getRC());
        }
        return ctx.getlastConfirmed();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncReadExplicitLastConfirmed(final AsyncCallback.ReadLastConfirmedCallback cb, final Object ctx) {
        boolean isClosed;
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            isClosed = this.metadata.isClosed();
            if (isClosed) {
                this.lastAddConfirmed = this.metadata.getLastEntryId();
                this.length = this.metadata.getLength();
            }
        }
        if (isClosed) {
            cb.readLastConfirmedComplete(0, this.lastAddConfirmed, ctx);
            return;
        }
        PendingReadLacOp.LacCallback innercb = new PendingReadLacOp.LacCallback(){

            @Override
            public void getLacComplete(int rc, long lac) {
                if (rc == 0) {
                    LedgerHandle.this.updateLastConfirmed(lac, 0L);
                    cb.readLastConfirmedComplete(rc, lac, ctx);
                } else {
                    cb.readLastConfirmedComplete(rc, -1L, ctx);
                }
            }
        };
        new PendingReadLacOp(this, innercb).initiate();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long readExplicitLastConfirmed() throws InterruptedException, BKException {
        LastConfirmedCtx ctx = new LastConfirmedCtx();
        this.asyncReadExplicitLastConfirmed(new SyncCallbackUtils.SyncReadLastConfirmedCallback(), ctx);
        LastConfirmedCtx lastConfirmedCtx = ctx;
        synchronized (lastConfirmedCtx) {
            while (!ctx.ready()) {
                ctx.wait();
            }
        }
        if (ctx.getRC() != 0) {
            throw BKException.create(ctx.getRC());
        }
        return ctx.getlastConfirmed();
    }

    void handleUnrecoverableErrorDuringAdd(int rc) {
        if (this.metadata.isInRecovery()) {
            this.errorOutPendingAdds(rc);
            return;
        }
        LOG.error("Closing ledger {} due to error {}", (Object)this.ledgerId, (Object)rc);
        this.asyncCloseInternal(NoopCloseCallback.instance, null, rc);
    }

    void errorOutPendingAdds(int rc) {
        this.errorOutPendingAdds(rc, this.drainPendingAddsToErrorOut());
    }

    synchronized List<PendingAddOp> drainPendingAddsToErrorOut() {
        PendingAddOp pendingAddOp;
        ArrayList<PendingAddOp> opsDrained = new ArrayList<PendingAddOp>(this.pendingAddOps.size());
        while ((pendingAddOp = this.pendingAddOps.poll()) != null) {
            this.addToLength(-pendingAddOp.entryLength);
            opsDrained.add(pendingAddOp);
        }
        return opsDrained;
    }

    void errorOutPendingAdds(int rc, List<PendingAddOp> ops) {
        for (PendingAddOp op : ops) {
            op.submitCallback(rc);
        }
    }

    void sendAddSuccessCallbacks() {
        PendingAddOp pendingAddOp;
        while ((pendingAddOp = this.pendingAddOps.peek()) != null && this.blockAddCompletions.get() == 0) {
            if (!pendingAddOp.completed) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("pending add not completed: {}", (Object)pendingAddOp);
                }
                return;
            }
            if (pendingAddOp.entryId != 0L && pendingAddOp.entryId != this.lastAddConfirmed + 1L) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Head of the queue entryId: {} is not lac: {} + 1", (Object)pendingAddOp.entryId, (Object)this.lastAddConfirmed);
                }
                return;
            }
            this.pendingAddOps.remove();
            this.explicitLacFlushPolicy.updatePiggyBackedLac(this.lastAddConfirmed);
            this.lastAddConfirmed = pendingAddOp.entryId;
            pendingAddOp.submitCallback(0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    EnsembleInfo replaceBookieInMetadata(Map<Integer, BookieSocketAddress> failedBookies, int ensembleChangeIdx) throws BKException.BKNotEnoughBookiesException {
        ArrayList<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>();
        long newEnsembleStartEntry = this.getLastAddConfirmed() + 1L;
        HashSet<Integer> replacedBookies = new HashSet<Integer>();
        LedgerMetadata ledgerMetadata = this.metadata;
        synchronized (ledgerMetadata) {
            newEnsemble.addAll(this.metadata.currentEnsemble);
            for (Map.Entry<Integer, BookieSocketAddress> entry : failedBookies.entrySet()) {
                int idx = entry.getKey();
                BookieSocketAddress addr = entry.getValue();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[EnsembleChange-L{}-{}] : replacing bookie: {} index: {}", new Object[]{this.getId(), ensembleChangeIdx, addr, idx});
                }
                if (!newEnsemble.get(idx).equals(addr)) {
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug("Write did not succeed to {}, bookieIndex {}, but we have already fixed it.", (Object)addr, (Object)idx);
                    continue;
                }
                try {
                    BookieSocketAddress newBookie = this.bk.bookieWatcher.replaceBookie(this.metadata.getEnsembleSize(), this.metadata.getWriteQuorumSize(), this.metadata.getAckQuorumSize(), this.metadata.getCustomMetadata(), newEnsemble, idx, new HashSet<BookieSocketAddress>(failedBookies.values()));
                    newEnsemble.set(idx, newBookie);
                    replacedBookies.add(idx);
                }
                catch (BKException.BKNotEnoughBookiesException e) {
                    if (replacedBookies.size() > 0) break;
                    throw e;
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("[EnsembleChange-L{}-{}] : changing ensemble from: {} to: {} starting at entry: {}, failed bookies: {}, replaced bookies: {}", new Object[]{this.ledgerId, ensembleChangeIdx, this.metadata.currentEnsemble, newEnsemble, this.getLastAddConfirmed() + 1L, failedBookies, replacedBookies});
            }
            this.metadata.addEnsemble(newEnsembleStartEntry, newEnsemble);
        }
        return new EnsembleInfo(newEnsemble, failedBookies, replacedBookies);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleBookieFailure(Map<Integer, BookieSocketAddress> failedBookies) {
        int curBlockAddCompletions = this.blockAddCompletions.incrementAndGet();
        if (this.bk.disableEnsembleChangeFeature.isAvailable()) {
            this.blockAddCompletions.decrementAndGet();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Ensemble change is disabled. Retry sending to failed bookies {} for ledger {}.", failedBookies, (Object)this.ledgerId);
            }
            this.unsetSuccessAndSendWriteRequest(failedBookies.keySet());
            return;
        }
        int curNumEnsembleChanges = this.numEnsembleChanges.incrementAndGet();
        LedgerMetadata ledgerMetadata = this.metadata;
        synchronized (ledgerMetadata) {
            try {
                EnsembleInfo ensembleInfo = this.replaceBookieInMetadata(failedBookies, curNumEnsembleChanges);
                if (ensembleInfo.replacedBookies.isEmpty()) {
                    this.blockAddCompletions.decrementAndGet();
                    return;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[EnsembleChange-L{}-{}] : writing new ensemble info = {}, block add completions = {}", new Object[]{this.getId(), curNumEnsembleChanges, ensembleInfo, curBlockAddCompletions});
                }
                this.writeLedgerConfig(new ChangeEnsembleCb(ensembleInfo, curBlockAddCompletions, curNumEnsembleChanges));
            }
            catch (BKException.BKNotEnoughBookiesException e) {
                LOG.error("Could not get additional bookie to remake ensemble, closing ledger: {}", (Object)this.ledgerId);
                this.handleUnrecoverableErrorDuringAdd(e.getCode());
                return;
            }
        }
    }

    void unsetSuccessAndSendWriteRequest(Set<Integer> bookies) {
        for (PendingAddOp pendingAddOp : this.pendingAddOps) {
            for (Integer bookieIndex : bookies) {
                pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
            }
        }
    }

    void rereadMetadata(BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata> cb) {
        this.bk.getLedgerManager().readLedgerMetadata(this.ledgerId, cb);
    }

    void registerOperationFailureOnBookie(BookieSocketAddress bookie, long entryId) {
        if (this.bk.getConf().getEnableBookieFailureTracking()) {
            this.bookieFailureHistory.put((Object)bookie, (Object)entryId);
        }
    }

    void recover(BookkeeperInternalCallbacks.GenericCallback<Void> finalCb) {
        this.recover(finalCb, null, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void recover(BookkeeperInternalCallbacks.GenericCallback<Void> finalCb, final @VisibleForTesting BookkeeperInternalCallbacks.ReadEntryListener listener, final boolean forceRecovery) {
        final BookkeeperInternalCallbacks.TimedGenericCallback<Void> cb = new BookkeeperInternalCallbacks.TimedGenericCallback<Void>(finalCb, 0, this.bk.getRecoverOpLogger());
        boolean wasClosed = false;
        boolean wasInRecovery = false;
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            if (this.metadata.isClosed()) {
                if (forceRecovery) {
                    wasClosed = false;
                    wasInRecovery = false;
                    this.metadata.markLedgerInRecovery();
                } else {
                    this.lastAddConfirmed = this.lastAddPushed = this.metadata.getLastEntryId();
                    this.length = this.metadata.getLength();
                    wasClosed = true;
                }
            } else {
                wasClosed = false;
                if (this.metadata.isInRecovery()) {
                    wasInRecovery = true;
                } else {
                    wasInRecovery = false;
                    this.metadata.markLedgerInRecovery();
                }
            }
        }
        if (wasClosed) {
            cb.operationComplete(0, null);
            return;
        }
        if (wasInRecovery) {
            new LedgerRecoveryOp(this, cb).parallelRead(this.enableParallelRecoveryRead).readBatchSize(this.recoveryReadBatchSize).setEntryListener(listener).initiate();
            return;
        }
        this.writeLedgerConfig((BookkeeperInternalCallbacks.GenericCallback<Void>)new OrderedSafeExecutor.OrderedSafeGenericCallback<Void>(this.bk.getMainWorkerPool(), this.ledgerId){

            @Override
            public void safeOperationComplete(int rc, Void result) {
                if (rc == -17) {
                    LedgerHandle.this.rereadMetadata((BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>)new OrderedSafeExecutor.OrderedSafeGenericCallback<LedgerMetadata>(LedgerHandle.this.bk.getMainWorkerPool(), LedgerHandle.this.ledgerId){

                        @Override
                        public void safeOperationComplete(int rc, LedgerMetadata newMeta) {
                            if (rc != 0) {
                                cb.operationComplete(rc, null);
                            } else {
                                LedgerHandle.this.metadata = newMeta;
                                LedgerHandle.this.recover(cb, listener, forceRecovery);
                            }
                        }

                        public String toString() {
                            return String.format("ReReadMetadataForRecover(%d)", LedgerHandle.this.ledgerId);
                        }
                    });
                } else if (rc == 0) {
                    new LedgerRecoveryOp(LedgerHandle.this, cb).parallelRead(LedgerHandle.this.enableParallelRecoveryRead).readBatchSize(LedgerHandle.this.recoveryReadBatchSize).setEntryListener(listener).initiate();
                } else {
                    LOG.error("Error writing ledger config {} of ledger {}", (Object)rc, (Object)LedgerHandle.this.ledgerId);
                    cb.operationComplete(rc, null);
                }
            }

            public String toString() {
                return String.format("WriteLedgerConfigForRecover(%d)", LedgerHandle.this.ledgerId);
            }
        });
    }

    static {
        try {
            emptyLedgerKey = MacDigestManager.genDigest("ledger", new byte[0]);
        }
        catch (NoSuchAlgorithmException e) {
            throw new RuntimeException(e);
        }
    }

    static class NoopCloseCallback
    implements AsyncCallback.CloseCallback {
        static NoopCloseCallback instance = new NoopCloseCallback();

        NoopCloseCallback() {
        }

        @Override
        public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
            if (rc != 0) {
                LOG.warn("Close failed: " + BKException.getMessage(rc));
            }
        }
    }

    private final class ReReadLedgerMetadataCb
    extends OrderedSafeExecutor.OrderedSafeGenericCallback<LedgerMetadata> {
        private final int rc;
        private final EnsembleInfo ensembleInfo;
        private final int curBlockAddCompletions;
        private final int ensembleChangeIdx;

        ReReadLedgerMetadataCb(int rc, EnsembleInfo ensembleInfo, int curBlockAddCompletions, int ensembleChangeIdx) {
            super(LedgerHandle.this.bk.getMainWorkerPool(), LedgerHandle.this.ledgerId);
            this.rc = rc;
            this.ensembleInfo = ensembleInfo;
            this.curBlockAddCompletions = curBlockAddCompletions;
            this.ensembleChangeIdx = ensembleChangeIdx;
        }

        @Override
        public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
            if (newrc != 0) {
                LOG.error("[EnsembleChange-L{}-{}] : error re-reading metadata to address ensemble change conflicts, code=", new Object[]{LedgerHandle.this.ledgerId, this.ensembleChangeIdx, newrc});
                LedgerHandle.this.handleUnrecoverableErrorDuringAdd(this.rc);
            } else if (!this.resolveConflict(newMeta)) {
                LOG.error("[EnsembleChange-L{}-{}] : could not resolve ledger metadata conflict while changing ensemble to: {}, local meta data is \n {} \n, zk meta data is \n {} \n, closing ledger", new Object[]{LedgerHandle.this.ledgerId, this.ensembleChangeIdx, this.ensembleInfo.newEnsemble, LedgerHandle.this.metadata, newMeta});
                LedgerHandle.this.handleUnrecoverableErrorDuringAdd(this.rc);
            }
        }

        private boolean resolveConflict(LedgerMetadata newMeta) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("[EnsembleChange-L{}-{}] : resolving conflicts - local metadata = \n {} \n, zk metadata = \n {} \n", new Object[]{LedgerHandle.this.ledgerId, this.ensembleChangeIdx, LedgerHandle.this.metadata, newMeta});
            }
            if (LedgerHandle.this.metadata.getState() != newMeta.getState()) {
                if (LOG.isDebugEnabled()) {
                    LOG.info("[EnsembleChange-L{}-{}] : resolving conflicts but state changed, local metadata = \n {} \n, zk metadata = \n {} \n", new Object[]{LedgerHandle.this.ledgerId, this.ensembleChangeIdx, LedgerHandle.this.metadata, newMeta});
                }
                return false;
            }
            int diff = newMeta.getEnsembles().size() - LedgerHandle.this.metadata.getEnsembles().size();
            if (0 != diff) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[EnsembleChange-L{}-{}] : resolving conflicts but ensembles have {} differences, local metadata = \n {} \n, zk metadata = \n {} \n", new Object[]{LedgerHandle.this.ledgerId, this.ensembleChangeIdx, diff, LedgerHandle.this.metadata, newMeta});
                }
                if (-1 == diff) {
                    return this.updateMetadataIfPossible(newMeta);
                }
                return false;
            }
            if (!this.areFailedBookiesReplaced(newMeta, this.ensembleInfo)) {
                if (this.areFailedBookiesReplaced(LedgerHandle.this.metadata, this.ensembleInfo)) {
                    return this.updateMetadataIfPossible(newMeta);
                }
            } else {
                LedgerHandle.this.ensembleChangeCounter.inc();
                int newBlockAddCompletions = LedgerHandle.this.blockAddCompletions.decrementAndGet();
                LedgerHandle.this.unsetSuccessAndSendWriteRequest(this.ensembleInfo.replacedBookies);
                if (LOG.isDebugEnabled()) {
                    LOG.info("[EnsembleChange-L{}-{}] : resolved conflicts, block add complectiosn {} => {}.", new Object[]{LedgerHandle.this.ledgerId, this.ensembleChangeIdx, this.curBlockAddCompletions, newBlockAddCompletions});
                }
            }
            return true;
        }

        private boolean areFailedBookiesReplaced(LedgerMetadata newMeta, EnsembleInfo ensembleInfo) {
            boolean replaced = true;
            for (Integer replacedBookieIdx : ensembleInfo.replacedBookies) {
                BookieSocketAddress failedBookieAddr = (BookieSocketAddress)ensembleInfo.failedBookies.get(replacedBookieIdx);
                BookieSocketAddress replacedBookieAddr = newMeta.currentEnsemble.get(replacedBookieIdx);
                replaced &= !Objects.equal((Object)replacedBookieAddr, (Object)failedBookieAddr);
            }
            return replaced;
        }

        private boolean updateMetadataIfPossible(LedgerMetadata newMeta) {
            if (LedgerHandle.this.metadata.isNewerThan(newMeta)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[EnsembleChange-L{}-{}] : reread metadata because local metadata is newer.", new Object[]{LedgerHandle.this.ledgerId, this.ensembleChangeIdx});
                }
                LedgerHandle.this.rereadMetadata(this);
                return true;
            }
            if (LedgerHandle.this.metadata.isConflictWith(newMeta)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[EnsembleChange-L{}-{}] : metadata is conflicted, local metadata = \n {} \n, zk metadata = \n {} \n", new Object[]{LedgerHandle.this.ledgerId, this.ensembleChangeIdx, LedgerHandle.this.metadata, newMeta});
                }
                return false;
            }
            if (LOG.isDebugEnabled()) {
                LOG.info("[EnsembleChange-L{}-{}] : resolved ledger metadata conflict and writing to zookeeper, local meta data is \n {} \n, zk meta data is \n {}.", new Object[]{LedgerHandle.this.ledgerId, this.ensembleChangeIdx, LedgerHandle.this.metadata, newMeta});
            }
            LedgerHandle.this.metadata.setVersion(newMeta.getVersion());
            LedgerHandle.this.metadata.mergeEnsembles(newMeta.getEnsembles());
            LedgerHandle.this.writeLedgerConfig(new ChangeEnsembleCb(this.ensembleInfo, this.curBlockAddCompletions, this.ensembleChangeIdx));
            return true;
        }

        public String toString() {
            return String.format("ReReadLedgerMetadata(%d)", LedgerHandle.this.ledgerId);
        }
    }

    private final class ChangeEnsembleCb
    extends OrderedSafeExecutor.OrderedSafeGenericCallback<Void> {
        private final EnsembleInfo ensembleInfo;
        private final int curBlockAddCompletions;
        private final int ensembleChangeIdx;

        ChangeEnsembleCb(EnsembleInfo ensembleInfo, int curBlockAddCompletions, int ensembleChangeIdx) {
            super(LedgerHandle.this.bk.getMainWorkerPool(), LedgerHandle.this.ledgerId);
            this.ensembleInfo = ensembleInfo;
            this.curBlockAddCompletions = curBlockAddCompletions;
            this.ensembleChangeIdx = ensembleChangeIdx;
        }

        @Override
        public void safeOperationComplete(int rc, Void result) {
            if (rc == -17) {
                LedgerHandle.this.ensembleChangeCounter.inc();
                if (LOG.isDebugEnabled()) {
                    LOG.info("[EnsembleChange-L{}-{}] : encountered version conflicts, re-read ledger metadata.", (Object)LedgerHandle.this.getId(), (Object)this.ensembleChangeIdx);
                }
                LedgerHandle.this.rereadMetadata(new ReReadLedgerMetadataCb(rc, this.ensembleInfo, this.curBlockAddCompletions, this.ensembleChangeIdx));
                return;
            }
            if (rc != 0) {
                LOG.error("[EnsembleChange-L{}-{}] : could not persist ledger metadata : info = {}, closing ledger : {}.", new Object[]{LedgerHandle.this.getId(), this.ensembleChangeIdx, this.ensembleInfo, rc});
                LedgerHandle.this.handleUnrecoverableErrorDuringAdd(rc);
                return;
            }
            int newBlockAddCompletions = LedgerHandle.this.blockAddCompletions.decrementAndGet();
            if (LOG.isDebugEnabled()) {
                LOG.info("[EnsembleChange-L{}-{}] : completed ensemble change, block add completion {} => {}", new Object[]{LedgerHandle.this.getId(), this.ensembleChangeIdx, this.curBlockAddCompletions, newBlockAddCompletions});
            }
            LedgerHandle.this.ensembleChangeCounter.inc();
            LedgerHandle.this.unsetSuccessAndSendWriteRequest(this.ensembleInfo.replacedBookies);
        }

        public String toString() {
            return String.format("ChangeEnsemble(%d)", LedgerHandle.this.ledgerId);
        }
    }

    static final class EnsembleInfo {
        private final ArrayList<BookieSocketAddress> newEnsemble;
        private final Map<Integer, BookieSocketAddress> failedBookies;
        final Set<Integer> replacedBookies;

        public EnsembleInfo(ArrayList<BookieSocketAddress> newEnsemble, Map<Integer, BookieSocketAddress> failedBookies, Set<Integer> replacedBookies) {
            this.newEnsemble = newEnsemble;
            this.failedBookies = failedBookies;
            this.replacedBookies = replacedBookies;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("Ensemble Info : failed bookies = ").append(this.failedBookies).append(", replaced bookies = ").append(this.replacedBookies).append(", new ensemble = ").append(this.newEnsemble);
            return sb.toString();
        }
    }

    static class LastConfirmedCtx {
        static final long ENTRY_ID_PENDING = -10L;
        long response = -10L;
        int rc;

        LastConfirmedCtx() {
        }

        void setLastConfirmed(long lastConfirmed) {
            this.response = lastConfirmed;
        }

        long getlastConfirmed() {
            return this.response;
        }

        void setRC(int rc) {
            this.rc = rc;
        }

        int getRC() {
            return this.rc;
        }

        boolean ready() {
            return this.response != -10L;
        }
    }
}

