/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.replication.AuditorBookieCheckTask;
import org.apache.bookkeeper.replication.AuditorCheckAllLedgersTask;
import org.apache.bookkeeper.replication.AuditorPlacementPolicyCheckTask;
import org.apache.bookkeeper.replication.AuditorReplicasCheckTask;
import org.apache.bookkeeper.replication.AuditorStats;
import org.apache.bookkeeper.replication.AuditorTask;
import org.apache.bookkeeper.replication.BookieLedgerIndexer;
import org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.bookkeeper.shaded.com.google.common.collect.Iterators;
import org.apache.bookkeeper.shaded.com.google.common.collect.Lists;
import org.apache.bookkeeper.shaded.com.google.common.collect.Sets;
import org.apache.bookkeeper.shaded.com.google.common.util.concurrent.SettableFuture;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Auditor
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
    private final ServerConfiguration conf;
    private final BookKeeper bkc;
    private final boolean ownBkc;
    private final BookKeeperAdmin admin;
    private final boolean ownAdmin;
    private BookieLedgerIndexer bookieLedgerIndexer;
    private LedgerManager ledgerManager;
    private LedgerUnderreplicationManager ledgerUnderreplicationManager;
    private final ScheduledExecutorService executor;
    private List<String> knownBookies = new ArrayList<String>();
    private final String bookieIdentifier;
    protected volatile Future<?> auditTask;
    private final Set<String> bookiesToBeAudited = Sets.newHashSet();
    private volatile int lostBookieRecoveryDelayBeforeChange;
    protected AuditorBookieCheckTask auditorBookieCheckTask;
    protected AuditorTask auditorCheckAllLedgersTask;
    protected AuditorTask auditorPlacementPolicyCheckTask;
    protected AuditorTask auditorReplicasCheckTask;
    private final List<AuditorTask> allAuditorTasks = Lists.newArrayList();
    private final AuditorStats auditorStats;

    static BookKeeper createBookKeeperClient(ServerConfiguration conf) throws InterruptedException, IOException {
        return Auditor.createBookKeeperClient(conf, NullStatsLogger.INSTANCE);
    }

    static BookKeeper createBookKeeperClient(ServerConfiguration conf, StatsLogger statsLogger) throws InterruptedException, IOException {
        ClientConfiguration clientConfiguration = new ClientConfiguration(conf);
        clientConfiguration.setClientRole("system");
        try {
            return BookKeeper.forConfig(clientConfiguration).statsLogger(statsLogger).build();
        }
        catch (BKException e) {
            throw new IOException("Failed to create bookkeeper client", e);
        }
    }

    static BookKeeper createBookKeeperClientThrowUnavailableException(ServerConfiguration conf) throws ReplicationException.UnavailableException {
        try {
            return Auditor.createBookKeeperClient(conf);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Failed to create bookkeeper client", e);
        }
        catch (IOException e) {
            throw new ReplicationException.UnavailableException("Failed to create bookkeeper client", e);
        }
    }

    public Auditor(String bookieIdentifier, ServerConfiguration conf, StatsLogger statsLogger) throws ReplicationException.UnavailableException {
        this(bookieIdentifier, conf, Auditor.createBookKeeperClientThrowUnavailableException(conf), true, statsLogger);
    }

    public Auditor(String bookieIdentifier, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc, StatsLogger statsLogger) throws ReplicationException.UnavailableException {
        this(bookieIdentifier, conf, bkc, ownBkc, new BookKeeperAdmin(bkc, statsLogger, new ClientConfiguration(conf)), true, statsLogger);
    }

    public Auditor(final String bookieIdentifier, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc, BookKeeperAdmin admin, boolean ownAdmin, StatsLogger statsLogger) throws ReplicationException.UnavailableException {
        this.conf = conf;
        this.bookieIdentifier = bookieIdentifier;
        this.auditorStats = new AuditorStats(statsLogger);
        this.bkc = bkc;
        this.ownBkc = ownBkc;
        this.admin = admin;
        this.ownAdmin = ownAdmin;
        this.initialize(conf, bkc);
        AuditorTask.ShutdownTaskHandler shutdownTaskHandler = this::submitShutdownTask;
        BiConsumer<Void, Throwable> submitBookieCheckTask = (ignore, throwable) -> this.submitBookieCheckTask();
        BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask = (flag, throwable) -> flag.set(this.auditTask != null);
        this.auditorBookieCheckTask = new AuditorBookieCheckTask(conf, this.auditorStats, admin, this.ledgerManager, this.ledgerUnderreplicationManager, shutdownTaskHandler, this.bookieLedgerIndexer, hasAuditCheckTask, submitBookieCheckTask);
        this.allAuditorTasks.add(this.auditorBookieCheckTask);
        this.auditorCheckAllLedgersTask = new AuditorCheckAllLedgersTask(conf, this.auditorStats, admin, this.ledgerManager, this.ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
        this.allAuditorTasks.add(this.auditorCheckAllLedgersTask);
        this.auditorPlacementPolicyCheckTask = new AuditorPlacementPolicyCheckTask(conf, this.auditorStats, admin, this.ledgerManager, this.ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
        this.allAuditorTasks.add(this.auditorPlacementPolicyCheckTask);
        this.auditorReplicasCheckTask = new AuditorReplicasCheckTask(conf, this.auditorStats, admin, this.ledgerManager, this.ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
        this.allAuditorTasks.add(this.auditorReplicasCheckTask);
        this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "AuditorBookie-" + bookieIdentifier);
                t.setDaemon(true);
                return t;
            }
        });
    }

    private void initialize(ServerConfiguration conf, BookKeeper bkc) throws ReplicationException.UnavailableException {
        try {
            LedgerManagerFactory ledgerManagerFactory = bkc.getLedgerManagerFactory();
            this.ledgerManager = ledgerManagerFactory.newLedgerManager();
            this.bookieLedgerIndexer = new BookieLedgerIndexer(this.ledgerManager);
            this.ledgerUnderreplicationManager = ledgerManagerFactory.newLedgerUnderreplicationManager();
            LOG.info("AuthProvider used by the Auditor is {}", (Object)this.admin.getConf().getClientAuthProviderFactoryClass());
            if (this.ledgerUnderreplicationManager.initializeLostBookieRecoveryDelay(conf.getLostBookieRecoveryDelay())) {
                LOG.info("Initializing lostBookieRecoveryDelay zNode to the conf value: {}", (Object)conf.getLostBookieRecoveryDelay());
            } else {
                LOG.info("Valid lostBookieRecoveryDelay zNode is available, so not creating lostBookieRecoveryDelay zNode as part of Auditor initialization ");
            }
            this.lostBookieRecoveryDelayBeforeChange = this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
        }
        catch (ReplicationException.CompatibilityException ce) {
            throw new ReplicationException.UnavailableException("CompatibilityException while initializing Auditor", ce);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while initializing Auditor", ie);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submitShutdownTask() {
        Auditor auditor = this;
        synchronized (auditor) {
            LOG.info("Executing submitShutdownTask");
            if (this.executor.isShutdown()) {
                LOG.info("executor is already shutdown");
                return;
            }
            this.executor.submit(() -> {
                Auditor auditor = this;
                synchronized (auditor) {
                    LOG.info("Shutting down Auditor's Executor");
                    this.executor.shutdown();
                }
            });
        }
    }

    @VisibleForTesting
    synchronized Future<?> submitAuditTask() {
        if (this.executor.isShutdown()) {
            SettableFuture f = SettableFuture.create();
            f.setException(new ReplicationException.BKAuditException("Auditor shutting down"));
            return f;
        }
        return this.executor.submit(() -> {
            try {
                this.waitIfLedgerReplicationDisabled();
                int lostBookieRecoveryDelay = this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
                List<String> availableBookies = this.getAvailableBookies();
                Collection newBookies = CollectionUtils.subtract(availableBookies, this.knownBookies);
                this.knownBookies.addAll(newBookies);
                if (!this.bookiesToBeAudited.isEmpty() && this.knownBookies.containsAll(this.bookiesToBeAudited)) {
                    if (this.auditTask != null && this.auditTask.cancel(false)) {
                        this.auditTask = null;
                        this.auditorStats.getNumDelayedBookieAuditsCancelled().inc();
                    }
                    this.bookiesToBeAudited.clear();
                }
                this.bookiesToBeAudited.addAll(CollectionUtils.subtract(this.knownBookies, availableBookies));
                if (this.bookiesToBeAudited.size() == 0) {
                    return;
                }
                this.knownBookies.removeAll(this.bookiesToBeAudited);
                if (lostBookieRecoveryDelay == 0) {
                    this.auditorBookieCheckTask.startAudit(false);
                    this.bookiesToBeAudited.clear();
                    return;
                }
                if (this.bookiesToBeAudited.size() > 1) {
                    LOG.info("Multiple bookie failure; not delaying bookie audit. Bookies lost now: {}; All lost bookies: {}", (Object)CollectionUtils.subtract(this.knownBookies, availableBookies), this.bookiesToBeAudited);
                    if (this.auditTask != null && this.auditTask.cancel(false)) {
                        this.auditTask = null;
                        this.auditorStats.getNumDelayedBookieAuditsCancelled().inc();
                    }
                    this.auditorBookieCheckTask.startAudit(false);
                    this.bookiesToBeAudited.clear();
                    return;
                }
                if (this.auditTask == null) {
                    this.auditTask = this.executor.schedule(() -> {
                        this.auditorBookieCheckTask.startAudit(false);
                        this.auditTask = null;
                        this.bookiesToBeAudited.clear();
                    }, (long)lostBookieRecoveryDelay, TimeUnit.SECONDS);
                    this.auditorStats.getNumBookieAuditsDelayed().inc();
                    LOG.info("Delaying bookie audit by {} secs for {}", (Object)lostBookieRecoveryDelay, this.bookiesToBeAudited);
                }
            }
            catch (BKException bke) {
                LOG.error("Exception getting bookie list", (Throwable)bke);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                LOG.error("Interrupted while watching available bookies ", (Throwable)ie);
            }
            catch (ReplicationException.UnavailableException ue) {
                LOG.error("Exception while watching available bookies", (Throwable)ue);
            }
        });
    }

    synchronized Future<?> submitLostBookieRecoveryDelayChangedEvent() {
        if (this.executor.isShutdown()) {
            SettableFuture f = SettableFuture.create();
            f.setException(new ReplicationException.BKAuditException("Auditor shutting down"));
            return f;
        }
        return this.executor.submit(() -> {
            int lostBookieRecoveryDelay = -1;
            try {
                this.waitIfLedgerReplicationDisabled();
                lostBookieRecoveryDelay = this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
                if (this.auditTask != null) {
                    LOG.info("lostBookieRecoveryDelay period has been changed so canceling the pending AuditTask");
                    this.auditTask.cancel(false);
                    this.auditorStats.getNumDelayedBookieAuditsCancelled().inc();
                }
                if (lostBookieRecoveryDelay == 0 || lostBookieRecoveryDelay == this.lostBookieRecoveryDelayBeforeChange) {
                    LOG.info("lostBookieRecoveryDelay has been set to 0 or reset to its previous value, so starting AuditTask. Current lostBookieRecoveryDelay: {}, previous lostBookieRecoveryDelay: {}", (Object)lostBookieRecoveryDelay, (Object)this.lostBookieRecoveryDelayBeforeChange);
                    this.auditorBookieCheckTask.startAudit(false);
                    this.auditTask = null;
                    this.bookiesToBeAudited.clear();
                } else if (this.auditTask != null) {
                    LOG.info("lostBookieRecoveryDelay has been set to {}, so rescheduling AuditTask accordingly", (Object)lostBookieRecoveryDelay);
                    this.auditTask = this.executor.schedule(() -> {
                        this.auditorBookieCheckTask.startAudit(false);
                        this.auditTask = null;
                        this.bookiesToBeAudited.clear();
                    }, (long)lostBookieRecoveryDelay, TimeUnit.SECONDS);
                    this.auditorStats.getNumBookieAuditsDelayed().inc();
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                LOG.error("Interrupted while for LedgersReplication to be enabled ", (Throwable)ie);
            }
            catch (ReplicationException.NonRecoverableReplicationException nre) {
                LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                this.submitShutdownTask();
            }
            catch (ReplicationException.UnavailableException ue) {
                LOG.error("Exception while reading from ZK", (Throwable)ue);
            }
            finally {
                if (lostBookieRecoveryDelay != -1) {
                    this.lostBookieRecoveryDelayBeforeChange = lostBookieRecoveryDelay;
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        LOG.info("I'm starting as Auditor Bookie. ID: {}", (Object)this.bookieIdentifier);
        Auditor auditor = this;
        synchronized (auditor) {
            if (this.executor.isShutdown()) {
                return;
            }
            try {
                this.watchBookieChanges();
                this.knownBookies = this.getAvailableBookies();
                this.ledgerUnderreplicationManager.notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb());
                this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged(new UnderReplicatedLedgersChangedCb());
            }
            catch (BKException bke) {
                LOG.error("Couldn't get bookie list, so exiting", (Throwable)bke);
                this.submitShutdownTask();
                return;
            }
            catch (ReplicationException.UnavailableException ue) {
                LOG.error("Exception while registering for change notification, so exiting", (Throwable)ue);
                this.submitShutdownTask();
                return;
            }
            this.scheduleBookieCheckTask();
            this.scheduleCheckAllLedgersTask();
            this.schedulePlacementPolicyCheckTask();
            this.scheduleReplicasCheckTask();
        }
    }

    protected void submitBookieCheckTask() {
        this.executor.submit(this.auditorBookieCheckTask);
    }

    private void scheduleBookieCheckTask() {
        long bookieCheckInterval = this.conf.getAuditorPeriodicBookieCheckInterval();
        if (bookieCheckInterval == 0L) {
            LOG.info("Auditor periodic bookie checking disabled, running once check now anyhow");
            this.submitBookieCheckTask();
        } else {
            LOG.info("Auditor periodic bookie checking enabled 'auditorPeriodicBookieCheckInterval' {} seconds", (Object)bookieCheckInterval);
            this.executor.scheduleAtFixedRate(this.auditorBookieCheckTask, 0L, bookieCheckInterval, TimeUnit.SECONDS);
        }
    }

    private void scheduleCheckAllLedgersTask() {
        long interval = this.conf.getAuditorPeriodicCheckInterval();
        if (interval > 0L) {
            long initialDelay;
            long durationSinceLastExecutionInSecs;
            long checkAllLedgersLastExecutedCTime;
            LOG.info("Auditor periodic ledger checking enabled 'auditorPeriodicCheckInterval' {} seconds", (Object)interval);
            try {
                checkAllLedgersLastExecutedCTime = this.ledgerUnderreplicationManager.getCheckAllLedgersCTime();
            }
            catch (ReplicationException.NonRecoverableReplicationException nre) {
                LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                this.submitShutdownTask();
                return;
            }
            catch (ReplicationException.UnavailableException ue) {
                LOG.error("Got UnavailableException while trying to get checkAllLedgersCTime", (Throwable)ue);
                checkAllLedgersLastExecutedCTime = -1L;
            }
            if (checkAllLedgersLastExecutedCTime == -1L) {
                durationSinceLastExecutionInSecs = -1L;
                initialDelay = 0L;
            } else {
                durationSinceLastExecutionInSecs = (System.currentTimeMillis() - checkAllLedgersLastExecutedCTime) / 1000L;
                if (durationSinceLastExecutionInSecs < 0L) {
                    durationSinceLastExecutionInSecs = 0L;
                }
                initialDelay = durationSinceLastExecutionInSecs > interval ? 0L : interval - durationSinceLastExecutionInSecs;
            }
            LOG.info("checkAllLedgers scheduling info.  checkAllLedgersLastExecutedCTime: {} durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}", new Object[]{checkAllLedgersLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval});
            this.executor.scheduleAtFixedRate(this.auditorCheckAllLedgersTask, initialDelay, interval, TimeUnit.SECONDS);
        } else {
            LOG.info("Periodic checking disabled");
        }
    }

    private void schedulePlacementPolicyCheckTask() {
        long interval = this.conf.getAuditorPeriodicPlacementPolicyCheckInterval();
        if (interval > 0L) {
            long initialDelay;
            long durationSinceLastExecutionInSecs;
            long placementPolicyCheckLastExecutedCTime;
            LOG.info("Auditor periodic placement policy check enabled 'auditorPeriodicPlacementPolicyCheckInterval' {} seconds", (Object)interval);
            try {
                placementPolicyCheckLastExecutedCTime = this.ledgerUnderreplicationManager.getPlacementPolicyCheckCTime();
            }
            catch (ReplicationException.NonRecoverableReplicationException nre) {
                LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                this.submitShutdownTask();
                return;
            }
            catch (ReplicationException.UnavailableException ue) {
                LOG.error("Got UnavailableException while trying to get placementPolicyCheckCTime", (Throwable)ue);
                placementPolicyCheckLastExecutedCTime = -1L;
            }
            if (placementPolicyCheckLastExecutedCTime == -1L) {
                durationSinceLastExecutionInSecs = -1L;
                initialDelay = 0L;
            } else {
                durationSinceLastExecutionInSecs = (System.currentTimeMillis() - placementPolicyCheckLastExecutedCTime) / 1000L;
                if (durationSinceLastExecutionInSecs < 0L) {
                    durationSinceLastExecutionInSecs = 0L;
                }
                initialDelay = durationSinceLastExecutionInSecs > interval ? 0L : interval - durationSinceLastExecutionInSecs;
            }
            LOG.info("placementPolicyCheck scheduling info.  placementPolicyCheckLastExecutedCTime: {} durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}", new Object[]{placementPolicyCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval});
            this.executor.scheduleAtFixedRate(this.auditorPlacementPolicyCheckTask, initialDelay, interval, TimeUnit.SECONDS);
        } else {
            LOG.info("Periodic placementPolicy check disabled");
        }
    }

    private void scheduleReplicasCheckTask() {
        long initialDelay;
        long durationSinceLastExecutionInSecs;
        long replicasCheckLastExecutedCTime;
        long interval = this.conf.getAuditorPeriodicReplicasCheckInterval();
        if (interval <= 0L) {
            LOG.info("Periodic replicas check disabled");
            return;
        }
        LOG.info("Auditor periodic replicas check enabled 'auditorReplicasCheckInterval' {} seconds", (Object)interval);
        try {
            replicasCheckLastExecutedCTime = this.ledgerUnderreplicationManager.getReplicasCheckCTime();
        }
        catch (ReplicationException.NonRecoverableReplicationException nre) {
            LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
            this.submitShutdownTask();
            return;
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Got UnavailableException while trying to get replicasCheckCTime", (Throwable)ue);
            replicasCheckLastExecutedCTime = -1L;
        }
        if (replicasCheckLastExecutedCTime == -1L) {
            durationSinceLastExecutionInSecs = -1L;
            initialDelay = 0L;
        } else {
            durationSinceLastExecutionInSecs = (System.currentTimeMillis() - replicasCheckLastExecutedCTime) / 1000L;
            if (durationSinceLastExecutionInSecs < 0L) {
                durationSinceLastExecutionInSecs = 0L;
            }
            initialDelay = durationSinceLastExecutionInSecs > interval ? 0L : interval - durationSinceLastExecutionInSecs;
        }
        LOG.info("replicasCheck scheduling info. replicasCheckLastExecutedCTime: {} durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}", new Object[]{replicasCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval});
        this.executor.scheduleAtFixedRate(this.auditorReplicasCheckTask, initialDelay, interval, TimeUnit.SECONDS);
    }

    private void waitIfLedgerReplicationDisabled() throws ReplicationException.UnavailableException, InterruptedException {
        if (!this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
            ReplicationEnableCb cb = new ReplicationEnableCb();
            LOG.info("LedgerReplication is disabled externally through Zookeeper, since DISABLE_NODE ZNode is created, so waiting untill it is enabled");
            this.ledgerUnderreplicationManager.notifyLedgerReplicationEnabled(cb);
            cb.await();
        }
    }

    protected List<String> getAvailableBookies() throws BKException {
        Collection<BookieId> availableBkAddresses = this.admin.getAvailableBookies();
        Collection<BookieId> readOnlyBkAddresses = this.admin.getReadOnlyBookies();
        availableBkAddresses.addAll(readOnlyBkAddresses);
        ArrayList<String> availableBookies = new ArrayList<String>();
        for (BookieId addr : availableBkAddresses) {
            availableBookies.add(addr.toString());
        }
        return availableBookies;
    }

    private void watchBookieChanges() throws BKException {
        this.admin.watchWritableBookiesChanged(bookies -> this.submitAuditTask());
        this.admin.watchReadOnlyBookiesChanged(bookies -> this.submitAuditTask());
    }

    public void shutdown() {
        LOG.info("Shutting down auditor");
        this.executor.shutdown();
        try {
            while (!this.executor.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOG.warn("Executor not shutting down, interrupting");
                this.executor.shutdownNow();
            }
            this.allAuditorTasks.forEach(AuditorTask::shutdown);
            this.allAuditorTasks.clear();
            if (this.ownAdmin) {
                this.admin.close();
            }
            if (this.ownBkc) {
                this.bkc.close();
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOG.warn("Interrupted while shutting down auditor bookie", (Throwable)ie);
        }
        catch (BKException bke) {
            LOG.warn("Exception while shutting down auditor bookie", (Throwable)bke);
        }
    }

    @Override
    public void close() {
        this.shutdown();
    }

    public boolean isRunning() {
        return !this.executor.isShutdown();
    }

    int getLostBookieRecoveryDelayBeforeChange() {
        return this.lostBookieRecoveryDelayBeforeChange;
    }

    Future<?> getAuditTask() {
        return this.auditTask;
    }

    private class LostBookieRecoveryDelayChangedCb
    implements BookkeeperInternalCallbacks.GenericCallback<Void> {
        private LostBookieRecoveryDelayChangedCb() {
        }

        @Override
        public void operationComplete(int rc, Void result) {
            try {
                Auditor.this.ledgerUnderreplicationManager.notifyLostBookieRecoveryDelayChanged(this);
            }
            catch (ReplicationException.NonRecoverableReplicationException nre) {
                LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                Auditor.this.submitShutdownTask();
            }
            catch (ReplicationException.UnavailableException ae) {
                LOG.error("Exception while registering for a LostBookieRecoveryDelay notification", (Throwable)ae);
            }
            Auditor.this.submitLostBookieRecoveryDelayChangedEvent();
        }
    }

    private class UnderReplicatedLedgersChangedCb
    implements BookkeeperInternalCallbacks.GenericCallback<Void> {
        private UnderReplicatedLedgersChangedCb() {
        }

        @Override
        public void operationComplete(int rc, Void result) {
            Iterator<UnderreplicatedLedger> underreplicatedLedgersInfo = Auditor.this.ledgerUnderreplicationManager.listLedgersToRereplicate(null);
            Auditor.this.auditorStats.getUnderReplicatedLedgersGuageValue().set(Iterators.size(underreplicatedLedgersInfo));
            Auditor.this.auditorStats.getNumReplicatedLedgers().inc();
        }
    }
}

