/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BookieWatcher {
    private static final Logger log = LoggerFactory.getLogger(BookieWatcher.class);
    private static final Function<Throwable, BKException> EXCEPTION_FUNC = cause -> {
        if (cause instanceof BKException) {
            log.error("Failed to get bookie list : ", cause);
            return (BKException)cause;
        }
        if (cause instanceof InterruptedException) {
            log.error("Interrupted reading bookie list : ", cause);
            return new BKException.BKInterruptedException();
        }
        return new BKException.MetaStoreException();
    };
    private final ClientConfiguration conf;
    private final RegistrationClient registrationClient;
    private final EnsemblePlacementPolicy placementPolicy;
    private final OpStatsLogger newEnsembleTimer;
    private final OpStatsLogger replaceBookieTimer;
    final Cache<BookieSocketAddress, Boolean> quarantinedBookies;
    private volatile Set<BookieSocketAddress> writableBookies = Collections.emptySet();
    private volatile Set<BookieSocketAddress> readOnlyBookies = Collections.emptySet();
    private CompletableFuture<?> initialWritableBookiesFuture = null;
    private CompletableFuture<?> initialReadonlyBookiesFuture = null;

    public BookieWatcher(ClientConfiguration conf, EnsemblePlacementPolicy placementPolicy, RegistrationClient registrationClient, StatsLogger statsLogger) {
        this.conf = conf;
        this.placementPolicy = placementPolicy;
        this.registrationClient = registrationClient;
        this.quarantinedBookies = CacheBuilder.newBuilder().expireAfterWrite((long)conf.getBookieQuarantineTimeSeconds(), TimeUnit.SECONDS).removalListener((RemovalListener)new RemovalListener<BookieSocketAddress, Boolean>(){

            public void onRemoval(RemovalNotification<BookieSocketAddress, Boolean> bookie) {
                log.info("Bookie {} is no longer quarantined", bookie.getKey());
            }
        }).build();
        this.newEnsembleTimer = statsLogger.getOpStatsLogger("NEW_ENSEMBLE_TIME");
        this.replaceBookieTimer = statsLogger.getOpStatsLogger("REPLACE_BOOKIE_TIME");
    }

    public Set<BookieSocketAddress> getBookies() throws BKException {
        try {
            return (Set)((Versioned)FutureUtils.result(this.registrationClient.getWritableBookies(), EXCEPTION_FUNC)).getValue();
        }
        catch (BKException.BKInterruptedException ie) {
            Thread.currentThread().interrupt();
            throw ie;
        }
    }

    public Set<BookieSocketAddress> getReadOnlyBookies() throws BKException {
        try {
            return (Set)((Versioned)FutureUtils.result(this.registrationClient.getReadOnlyBookies(), EXCEPTION_FUNC)).getValue();
        }
        catch (BKException.BKInterruptedException ie) {
            Thread.currentThread().interrupt();
            throw ie;
        }
    }

    private synchronized void processWritableBookiesChanged(Set<BookieSocketAddress> newBookieAddrs) {
        this.writableBookies = newBookieAddrs;
        this.placementPolicy.onClusterChanged(newBookieAddrs, this.readOnlyBookies);
    }

    private synchronized void processReadOnlyBookiesChanged(Set<BookieSocketAddress> readOnlyBookies) {
        this.readOnlyBookies = readOnlyBookies;
        this.placementPolicy.onClusterChanged(this.writableBookies, readOnlyBookies);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initialBlockingBookieRead() throws BKException {
        CompletableFuture<Object> readonly;
        CompletableFuture<Object> writable;
        BookieWatcher bookieWatcher = this;
        synchronized (bookieWatcher) {
            if (this.initialReadonlyBookiesFuture == null) {
                assert (this.initialWritableBookiesFuture == null);
                writable = this.registrationClient.watchWritableBookies(bookies -> this.processWritableBookiesChanged((Set)bookies.getValue()));
                readonly = this.registrationClient.watchReadOnlyBookies(bookies -> this.processReadOnlyBookiesChanged((Set)bookies.getValue()));
                this.initialWritableBookiesFuture = writable;
                this.initialReadonlyBookiesFuture = readonly;
            } else {
                writable = this.initialWritableBookiesFuture;
                readonly = this.initialReadonlyBookiesFuture;
            }
        }
        try {
            FutureUtils.result(writable, EXCEPTION_FUNC);
        }
        catch (BKException.BKInterruptedException ie) {
            Thread.currentThread().interrupt();
            throw ie;
        }
        try {
            FutureUtils.result(readonly, EXCEPTION_FUNC);
        }
        catch (BKException.BKInterruptedException ie) {
            Thread.currentThread().interrupt();
            throw ie;
        }
        catch (Exception e) {
            log.error("Failed getReadOnlyBookies: ", (Throwable)e);
        }
    }

    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map<String, byte[]> customMetadata) throws BKException.BKNotEnoughBookiesException {
        ArrayList<BookieSocketAddress> socketAddresses;
        long startTime = MathUtils.nowInNano();
        try {
            socketAddresses = this.placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<BookieSocketAddress>(this.quarantinedBookies.asMap().keySet()));
            this.newEnsembleTimer.registerSuccessfulEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
        }
        catch (BKException.BKNotEnoughBookiesException e) {
            if (log.isDebugEnabled()) {
                log.debug("Not enough healthy bookies available, using quarantined bookies");
            }
            socketAddresses = this.placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<BookieSocketAddress>());
            this.newEnsembleTimer.registerFailedEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
        }
        return socketAddresses;
    }

    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map<String, byte[]> customMetadata, List<BookieSocketAddress> existingBookies, int bookieIdx, Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
        BookieSocketAddress socketAddress;
        long startTime = MathUtils.nowInNano();
        BookieSocketAddress addr = existingBookies.get(bookieIdx);
        try {
            HashSet<BookieSocketAddress> existingAndQuarantinedBookies = new HashSet<BookieSocketAddress>(existingBookies);
            existingAndQuarantinedBookies.addAll(this.quarantinedBookies.asMap().keySet());
            socketAddress = this.placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, existingAndQuarantinedBookies, addr, excludeBookies);
            this.replaceBookieTimer.registerSuccessfulEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
        }
        catch (BKException.BKNotEnoughBookiesException e) {
            if (log.isDebugEnabled()) {
                log.debug("Not enough healthy bookies available, using quarantined bookies");
            }
            socketAddress = this.placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<BookieSocketAddress>(existingBookies), addr, excludeBookies);
            this.replaceBookieTimer.registerFailedEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
        }
        return socketAddress;
    }

    public void quarantineBookie(BookieSocketAddress bookie) {
        if (this.quarantinedBookies.getIfPresent((Object)bookie) == null) {
            this.quarantinedBookies.put((Object)bookie, (Object)Boolean.TRUE);
            log.warn("Bookie {} has been quarantined because of read/write errors.", (Object)bookie);
        }
    }
}

