/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.replication;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerChecker;
import org.apache.bookkeeper.client.LedgerFragment;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.replication.BookieLedgerIndexer;
import org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Auditor {
    private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
    private final ServerConfiguration conf;
    private BookKeeper bkc;
    private BookKeeperAdmin admin;
    private BookieLedgerIndexer bookieLedgerIndexer;
    private LedgerManager ledgerManager;
    private LedgerUnderreplicationManager ledgerUnderreplicationManager;
    private final ScheduledExecutorService executor;
    private List<String> knownBookies = new ArrayList<String>();
    private final String bookieIdentifier;
    private final StatsLogger statsLogger;
    private final OpStatsLogger numUnderReplicatedLedger;
    private final OpStatsLogger uRLPublishTimeForLostBookies;
    private final OpStatsLogger bookieToLedgersMapCreationTime;
    private final OpStatsLogger checkAllLedgersTime;
    private final Counter numLedgersChecked;
    private final OpStatsLogger numFragmentsPerLedger;
    private final OpStatsLogger numBookiesPerLedger;
    private final Counter numBookieAuditsDelayed;
    private final Counter numDelayedBookieAuditsCancelled;
    private volatile Future<?> auditTask;
    private Set<String> bookiesToBeAudited = Sets.newHashSet();
    private volatile int lostBookieRecoveryDelayBeforeChange;
    private final Runnable bookieCheck = new Runnable(){

        @Override
        public void run() {
            if (Auditor.this.auditTask == null) {
                Auditor.this.startAudit(true);
            } else {
                LOG.info("Audit already scheduled; skipping periodic bookie check");
            }
        }
    };

    public Auditor(final String bookieIdentifier, ServerConfiguration conf, ZooKeeper zkc, StatsLogger statsLogger) throws ReplicationException.UnavailableException {
        this.conf = conf;
        this.bookieIdentifier = bookieIdentifier;
        this.statsLogger = statsLogger;
        this.numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger("NUM_UNDER_REPLICATED_LEDGERS");
        this.uRLPublishTimeForLostBookies = this.statsLogger.getOpStatsLogger("URL_PUBLISH_TIME_FOR_LOST_BOOKIE");
        this.bookieToLedgersMapCreationTime = this.statsLogger.getOpStatsLogger("BOOKIE_TO_LEDGERS_MAP_CREATION_TIME");
        this.checkAllLedgersTime = this.statsLogger.getOpStatsLogger("CHECK_ALL_LEDGERS_TIME");
        this.numLedgersChecked = this.statsLogger.getCounter("NUM_LEDGERS_CHECKED");
        this.numFragmentsPerLedger = statsLogger.getOpStatsLogger("NUM_FRAGMENTS_PER_LEDGER");
        this.numBookiesPerLedger = statsLogger.getOpStatsLogger("NUM_BOOKIES_PER_LEDGER");
        this.numBookieAuditsDelayed = this.statsLogger.getCounter("NUM_BOOKIE_AUDITS_DELAYED");
        this.numDelayedBookieAuditsCancelled = this.statsLogger.getCounter("NUM_DELAYED_BOOKIE_AUDITS_CANCELLED");
        this.initialize(conf, zkc);
        this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "AuditorBookie-" + bookieIdentifier);
                t.setDaemon(true);
                return t;
            }
        });
    }

    private void initialize(ServerConfiguration conf, ZooKeeper zkc) throws ReplicationException.UnavailableException {
        try {
            ClientConfiguration clientConfiguration = new ClientConfiguration(conf);
            clientConfiguration.setClientRole("system");
            LOG.info("AuthProvider used by the Auditor is {}", (Object)clientConfiguration.getClientAuthProviderFactoryClass());
            this.bkc = new BookKeeper(clientConfiguration, zkc);
            LedgerManagerFactory ledgerManagerFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, this.bkc.getMetadataClientDriver().getLayoutManager());
            this.ledgerManager = ledgerManagerFactory.newLedgerManager();
            this.bookieLedgerIndexer = new BookieLedgerIndexer(this.ledgerManager);
            this.ledgerUnderreplicationManager = ledgerManagerFactory.newLedgerUnderreplicationManager();
            this.admin = new BookKeeperAdmin(this.bkc, this.statsLogger);
            if (this.ledgerUnderreplicationManager.initializeLostBookieRecoveryDelay(conf.getLostBookieRecoveryDelay())) {
                LOG.info("Initializing lostBookieRecoveryDelay zNode to the conif value: {}", (Object)conf.getLostBookieRecoveryDelay());
            } else {
                LOG.info("Valid lostBookieRecoveryDelay zNode is available, so not creating lostBookieRecoveryDelay zNode as part of Auditor initialization ");
            }
            this.lostBookieRecoveryDelayBeforeChange = this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
        }
        catch (ReplicationException.CompatibilityException ce) {
            throw new ReplicationException.UnavailableException("CompatibilityException while initializing Auditor", ce);
        }
        catch (IOException | BKException | KeeperException ioe) {
            throw new ReplicationException.UnavailableException("Exception while initializing Auditor", ioe);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while initializing Auditor", ie);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submitShutdownTask() {
        Auditor auditor = this;
        synchronized (auditor) {
            if (this.executor.isShutdown()) {
                return;
            }
            this.executor.submit(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Auditor auditor = Auditor.this;
                    synchronized (auditor) {
                        Auditor.this.executor.shutdown();
                    }
                }
            });
        }
    }

    @VisibleForTesting
    synchronized Future<?> submitAuditTask() {
        if (this.executor.isShutdown()) {
            SettableFuture f = SettableFuture.create();
            f.setException((Throwable)new ReplicationException.BKAuditException("Auditor shutting down"));
            return f;
        }
        return this.executor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    Auditor.this.waitIfLedgerReplicationDisabled();
                    int lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
                    List availableBookies = Auditor.this.getAvailableBookies();
                    Collection newBookies = CollectionUtils.subtract((Iterable)availableBookies, (Iterable)Auditor.this.knownBookies);
                    Auditor.this.knownBookies.addAll(newBookies);
                    if (!Auditor.this.bookiesToBeAudited.isEmpty() && Auditor.this.knownBookies.containsAll(Auditor.this.bookiesToBeAudited)) {
                        if (Auditor.this.auditTask != null && Auditor.this.auditTask.cancel(false)) {
                            Auditor.this.auditTask = null;
                            Auditor.this.numDelayedBookieAuditsCancelled.inc();
                        }
                        Auditor.this.bookiesToBeAudited.clear();
                    }
                    Auditor.this.bookiesToBeAudited.addAll(CollectionUtils.subtract((Iterable)Auditor.this.knownBookies, (Iterable)availableBookies));
                    if (Auditor.this.bookiesToBeAudited.size() == 0) {
                        return;
                    }
                    Auditor.this.knownBookies.removeAll(Auditor.this.bookiesToBeAudited);
                    if (lostBookieRecoveryDelay == 0) {
                        Auditor.this.startAudit(false);
                        Auditor.this.bookiesToBeAudited.clear();
                        return;
                    }
                    if (Auditor.this.bookiesToBeAudited.size() > 1) {
                        LOG.info("Multiple bookie failure; not delaying bookie audit. Bookies lost now: {}; All lost bookies: {}", (Object)CollectionUtils.subtract((Iterable)Auditor.this.knownBookies, (Iterable)availableBookies), (Object)Auditor.this.bookiesToBeAudited);
                        if (Auditor.this.auditTask != null && Auditor.this.auditTask.cancel(false)) {
                            Auditor.this.auditTask = null;
                            Auditor.this.numDelayedBookieAuditsCancelled.inc();
                        }
                        Auditor.this.startAudit(false);
                        Auditor.this.bookiesToBeAudited.clear();
                        return;
                    }
                    if (Auditor.this.auditTask == null) {
                        Auditor.this.auditTask = Auditor.this.executor.schedule(new Runnable(){

                            @Override
                            public void run() {
                                Auditor.this.startAudit(false);
                                Auditor.this.auditTask = null;
                                Auditor.this.bookiesToBeAudited.clear();
                            }
                        }, (long)lostBookieRecoveryDelay, TimeUnit.SECONDS);
                        Auditor.this.numBookieAuditsDelayed.inc();
                        LOG.info("Delaying bookie audit by {} secs for {}", (Object)lostBookieRecoveryDelay, (Object)Auditor.this.bookiesToBeAudited);
                    }
                }
                catch (BKException bke) {
                    LOG.error("Exception getting bookie list", (Throwable)bke);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    LOG.error("Interrupted while watching available bookies ", (Throwable)ie);
                }
                catch (ReplicationException.UnavailableException ue) {
                    LOG.error("Exception while watching available bookies", (Throwable)ue);
                }
            }
        });
    }

    synchronized Future<?> submitLostBookieRecoveryDelayChangedEvent() {
        if (this.executor.isShutdown()) {
            SettableFuture f = SettableFuture.create();
            f.setException((Throwable)new ReplicationException.BKAuditException("Auditor shutting down"));
            return f;
        }
        return this.executor.submit(new Runnable(){
            int lostBookieRecoveryDelay = -1;

            @Override
            public void run() {
                try {
                    Auditor.this.waitIfLedgerReplicationDisabled();
                    this.lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
                    if (Auditor.this.auditTask != null) {
                        LOG.info("lostBookieRecoveryDelay period has been changed so canceling the pending AuditTask");
                        Auditor.this.auditTask.cancel(false);
                        Auditor.this.numDelayedBookieAuditsCancelled.inc();
                    }
                    if (this.lostBookieRecoveryDelay == 0 || this.lostBookieRecoveryDelay == Auditor.this.lostBookieRecoveryDelayBeforeChange) {
                        LOG.info("lostBookieRecoveryDelay has been set to 0 or reset to its previous value, so starting AuditTask. Current lostBookieRecoveryDelay: {}, previous lostBookieRecoveryDelay: {}", (Object)this.lostBookieRecoveryDelay, (Object)Auditor.this.lostBookieRecoveryDelayBeforeChange);
                        Auditor.this.startAudit(false);
                        Auditor.this.auditTask = null;
                        Auditor.this.bookiesToBeAudited.clear();
                    } else if (Auditor.this.auditTask != null) {
                        LOG.info("lostBookieRecoveryDelay has been set to {}, so rescheduling AuditTask accordingly", (Object)this.lostBookieRecoveryDelay);
                        Auditor.this.auditTask = Auditor.this.executor.schedule(new Runnable(){

                            @Override
                            public void run() {
                                Auditor.this.startAudit(false);
                                Auditor.this.auditTask = null;
                                Auditor.this.bookiesToBeAudited.clear();
                            }
                        }, (long)this.lostBookieRecoveryDelay, TimeUnit.SECONDS);
                        Auditor.this.numBookieAuditsDelayed.inc();
                    }
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    LOG.error("Interrupted while for LedgersReplication to be enabled ", (Throwable)ie);
                }
                catch (ReplicationException.UnavailableException ue) {
                    LOG.error("Exception while reading from ZK", (Throwable)ue);
                }
                finally {
                    if (this.lostBookieRecoveryDelay != -1) {
                        Auditor.this.lostBookieRecoveryDelayBeforeChange = this.lostBookieRecoveryDelay;
                    }
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        LOG.info("I'm starting as Auditor Bookie. ID: {}", (Object)this.bookieIdentifier);
        Auditor auditor = this;
        synchronized (auditor) {
            if (this.executor.isShutdown()) {
                return;
            }
            long interval = this.conf.getAuditorPeriodicCheckInterval();
            if (interval > 0L) {
                LOG.info("Auditor periodic ledger checking enabled 'auditorPeriodicCheckInterval' {} seconds", (Object)interval);
                this.executor.scheduleAtFixedRate(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            if (!Auditor.this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                                LOG.info("Ledger replication disabled, skipping");
                                return;
                            }
                            Stopwatch stopwatch = Stopwatch.createStarted();
                            Auditor.this.checkAllLedgers();
                            Auditor.this.checkAllLedgersTime.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
                        }
                        catch (KeeperException ke) {
                            LOG.error("Exception while running periodic check", (Throwable)ke);
                        }
                        catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                            LOG.error("Interrupted while running periodic check", (Throwable)ie);
                        }
                        catch (BKException bke) {
                            LOG.error("Exception running periodic check", (Throwable)bke);
                        }
                        catch (IOException ioe) {
                            LOG.error("I/O exception running periodic check", (Throwable)ioe);
                        }
                        catch (ReplicationException.UnavailableException ue) {
                            LOG.error("Underreplication manager unavailable running periodic check", (Throwable)ue);
                        }
                    }
                }, interval, interval, TimeUnit.SECONDS);
            } else {
                LOG.info("Periodic checking disabled");
            }
            try {
                this.watchBookieChanges();
                this.knownBookies = this.getAvailableBookies();
            }
            catch (BKException bke) {
                LOG.error("Couldn't get bookie list, exiting", (Throwable)bke);
                this.submitShutdownTask();
            }
            try {
                this.ledgerUnderreplicationManager.notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb());
            }
            catch (ReplicationException.UnavailableException ue) {
                LOG.error("Exception while registering for LostBookieRecoveryDelay change notification", (Throwable)ue);
                this.submitShutdownTask();
            }
            long bookieCheckInterval = this.conf.getAuditorPeriodicBookieCheckInterval();
            if (bookieCheckInterval == 0L) {
                LOG.info("Auditor periodic bookie checking disabled, running once check now anyhow");
                this.executor.submit(this.bookieCheck);
            } else {
                LOG.info("Auditor periodic bookie checking enabled 'auditorPeriodicBookieCheckInterval' {} seconds", (Object)bookieCheckInterval);
                this.executor.scheduleAtFixedRate(this.bookieCheck, 0L, bookieCheckInterval, TimeUnit.SECONDS);
            }
        }
    }

    private void waitIfLedgerReplicationDisabled() throws ReplicationException.UnavailableException, InterruptedException {
        ReplicationEnableCb cb = new ReplicationEnableCb();
        if (!this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
            LOG.info("LedgerReplication is disabled externally through Zookeeper, since DISABLE_NODE ZNode is created, so waiting untill it is enabled");
            this.ledgerUnderreplicationManager.notifyLedgerReplicationEnabled(cb);
            cb.await();
        }
    }

    private List<String> getAvailableBookies() throws BKException {
        Collection<BookieSocketAddress> availableBkAddresses = this.admin.getAvailableBookies();
        Collection<BookieSocketAddress> readOnlyBkAddresses = this.admin.getReadOnlyBookies();
        availableBkAddresses.addAll(readOnlyBkAddresses);
        ArrayList<String> availableBookies = new ArrayList<String>();
        for (BookieSocketAddress addr : availableBkAddresses) {
            availableBookies.add(addr.toString());
        }
        return availableBookies;
    }

    private void watchBookieChanges() throws BKException {
        this.admin.watchWritableBookiesChanged(bookies -> this.submitAuditTask());
        this.admin.watchReadOnlyBookiesChanged(bookies -> this.submitAuditTask());
    }

    private void startAudit(boolean shutDownTask) {
        try {
            this.auditBookies();
            shutDownTask = false;
        }
        catch (BKException bke) {
            LOG.error("Exception getting bookie list", (Throwable)bke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOG.error("Interrupted while watching available bookies ", (Throwable)ie);
        }
        catch (ReplicationException.BKAuditException bke) {
            LOG.error("Exception while watching available bookies", (Throwable)bke);
        }
        if (shutDownTask) {
            this.submitShutdownTask();
        }
    }

    private void auditBookies() throws ReplicationException.BKAuditException, InterruptedException, BKException {
        try {
            this.waitIfLedgerReplicationDisabled();
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Underreplication unavailable, skipping audit.Will retry after a period");
            return;
        }
        Stopwatch stopwatch = Stopwatch.createStarted();
        Map<String, Set<Long>> ledgerDetails = this.generateBookie2LedgersIndex();
        try {
            if (!this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                this.executor.submit(this.bookieCheck);
                return;
            }
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Underreplication unavailable, skipping audit.Will retry after a period");
            return;
        }
        List<String> availableBookies = this.getAvailableBookies();
        Set<String> knownBookies = ledgerDetails.keySet();
        Collection lostBookies = CollectionUtils.subtract(knownBookies, availableBookies);
        this.bookieToLedgersMapCreationTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        if (lostBookies.size() > 0) {
            try {
                FutureUtils.result(this.handleLostBookiesAsync(lostBookies, ledgerDetails), ReplicationException.EXCEPTION_HANDLER);
            }
            catch (ReplicationException e) {
                throw new ReplicationException.BKAuditException(e.getMessage(), e.getCause());
            }
            this.uRLPublishTimeForLostBookies.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        }
    }

    private Map<String, Set<Long>> generateBookie2LedgersIndex() throws ReplicationException.BKAuditException {
        return this.bookieLedgerIndexer.getBookieToLedgerIndex();
    }

    private CompletableFuture<?> handleLostBookiesAsync(Collection<String> lostBookies, Map<String, Set<Long>> ledgerDetails) {
        LOG.info("Following are the failed bookies: {}, and searching its ledgers for re-replication", lostBookies);
        return FutureUtils.processList((List)Lists.newArrayList(lostBookies), bookieIP -> this.publishSuspectedLedgersAsync(Lists.newArrayList((Object[])new String[]{bookieIP}), (Set)ledgerDetails.get(bookieIP)), null);
    }

    private CompletableFuture<?> publishSuspectedLedgersAsync(Collection<String> missingBookies, Set<Long> ledgers) {
        if (null == ledgers || ledgers.size() == 0) {
            LOG.info("There is no ledgers for the failed bookie: {}", missingBookies);
            return FutureUtils.Void();
        }
        LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, missingBookies);
        this.numUnderReplicatedLedger.registerSuccessfulValue((long)ledgers.size());
        return FutureUtils.processList((List)Lists.newArrayList(ledgers), ledgerId -> this.ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync((long)ledgerId, missingBookies), null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
        ZooKeeperClient newzk = ZooKeeperClient.newBuilder().connectString(ZKMetadataDriverBase.resolveZkServers(this.conf)).sessionTimeoutMs(this.conf.getZkTimeout()).build();
        BookKeeper client = new BookKeeper(new ClientConfiguration(this.conf), newzk);
        BookKeeperAdmin admin = new BookKeeperAdmin(client, this.statsLogger);
        try {
            LedgerChecker checker = new LedgerChecker(client);
            CompletableFuture processFuture = new CompletableFuture();
            BookkeeperInternalCallbacks.Processor<Long> checkLedgersProcessor = (ledgerId, callback) -> {
                try {
                    if (!this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                        LOG.info("Ledger rereplication has been disabled, aborting periodic check");
                        FutureUtils.complete((CompletableFuture)processFuture, null);
                        return;
                    }
                }
                catch (ReplicationException.UnavailableException ue) {
                    LOG.error("Underreplication manager unavailable running periodic check", (Throwable)ue);
                    FutureUtils.complete((CompletableFuture)processFuture, null);
                    return;
                }
                admin.asyncOpenLedgerNoRecovery((long)ledgerId, (rc, lh, ctx) -> {
                    if (0 == rc) {
                        checker.checkLedger(lh, new ProcessLostFragmentsCb(lh, callback), this.conf.getAuditorLedgerVerificationPercentage());
                        this.numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
                        this.numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
                        this.numLedgersChecked.inc();
                    } else if (-7 == rc) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Ledger {} was deleted before we could check it", ledgerId);
                        }
                        callback.processResult(0, null, null);
                    } else {
                        LOG.error("Couldn't open ledger {} to check : {}", ledgerId, (Object)BKException.getMessage(rc));
                        callback.processResult(rc, null, null);
                    }
                }, null);
            };
            this.ledgerManager.asyncProcessLedgers(checkLedgersProcessor, (rc, path, ctx) -> {
                if (0 == rc) {
                    FutureUtils.complete((CompletableFuture)processFuture, null);
                } else {
                    FutureUtils.completeExceptionally((CompletableFuture)processFuture, (Throwable)BKException.create(rc));
                }
            }, null, 0, -1);
            FutureUtils.result(processFuture, BKException.HANDLER);
        }
        finally {
            admin.close();
            client.close();
            newzk.close();
        }
    }

    public void shutdown() {
        LOG.info("Shutting down auditor");
        this.executor.shutdown();
        try {
            while (!this.executor.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOG.warn("Executor not shutting down, interrupting");
                this.executor.shutdownNow();
            }
            this.admin.close();
            this.bkc.close();
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOG.warn("Interrupted while shutting down auditor bookie", (Throwable)ie);
        }
        catch (BKException bke) {
            LOG.warn("Exception while shutting down auditor bookie", (Throwable)bke);
        }
    }

    public boolean isRunning() {
        return !this.executor.isShutdown();
    }

    int getLostBookieRecoveryDelayBeforeChange() {
        return this.lostBookieRecoveryDelayBeforeChange;
    }

    Future<?> getAuditTask() {
        return this.auditTask;
    }

    private class ProcessLostFragmentsCb
    implements BookkeeperInternalCallbacks.GenericCallback<Set<LedgerFragment>> {
        final LedgerHandle lh;
        final AsyncCallback.VoidCallback callback;

        ProcessLostFragmentsCb(LedgerHandle lh, AsyncCallback.VoidCallback callback) {
            this.lh = lh;
            this.callback = callback;
        }

        @Override
        public void operationComplete(int rc, Set<LedgerFragment> fragments) {
            if (rc == 0) {
                HashSet bookies = Sets.newHashSet();
                for (LedgerFragment f : fragments) {
                    bookies.addAll(f.getAddresses());
                }
                Auditor.this.publishSuspectedLedgersAsync(bookies.stream().map(BookieSocketAddress::toString).collect(Collectors.toList()), Sets.newHashSet((Object[])new Long[]{this.lh.getId()})).whenComplete((result, cause) -> {
                    if (null != cause) {
                        LOG.error("Auditor exception publishing suspected ledger {} with lost bookies {}", new Object[]{this.lh.getId(), bookies, cause});
                        this.callback.processResult(-200, null, null);
                    } else {
                        this.callback.processResult(0, null, null);
                    }
                });
            } else {
                this.callback.processResult(rc, null, null);
            }
            this.lh.closeAsync().whenComplete((result, cause) -> {
                if (null != cause) {
                    LOG.warn("Error closing ledger {} : {}", (Object)this.lh.getId(), (Object)cause.getMessage());
                }
            });
        }
    }

    private class LostBookieRecoveryDelayChangedCb
    implements BookkeeperInternalCallbacks.GenericCallback<Void> {
        private LostBookieRecoveryDelayChangedCb() {
        }

        @Override
        public void operationComplete(int rc, Void result) {
            try {
                Auditor.this.ledgerUnderreplicationManager.notifyLostBookieRecoveryDelayChanged(this);
            }
            catch (ReplicationException.UnavailableException ae) {
                LOG.error("Exception while registering for a LostBookieRecoveryDelay notification", (Throwable)ae);
            }
            Auditor.this.submitLostBookieRecoveryDelayChangedEvent();
        }
    }
}

