/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.folsom.ketama;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.spotify.folsom.AbstractRawMemcacheClient;
import com.spotify.folsom.ConnectionChangeListener;
import com.spotify.folsom.ObservableClient;
import com.spotify.folsom.RawMemcacheClient;
import com.spotify.folsom.Resolver;
import com.spotify.folsom.client.NotConnectedClient;
import com.spotify.folsom.client.Request;
import com.spotify.folsom.guava.HostAndPort;
import com.spotify.folsom.ketama.AddressAndClient;
import com.spotify.folsom.ketama.KetamaMemcacheClient;
import com.spotify.folsom.ketama.NodeLocator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResolvingKetamaClient
extends AbstractRawMemcacheClient {
    private static final Logger log = LoggerFactory.getLogger(ResolvingKetamaClient.class);
    public static final int MIN_RESOLVE_WAIT_TIME = 10;
    public static final int MAX_RESOLVE_WAIT_TIME = 3600;
    private final ScheduledExecutorService executor;
    private final Resolver resolver;
    private final long ttl;
    private final Connector connector;
    private final long shutdownDelay;
    private final TimeUnit shutdownUnit;
    private final MyConnectionChangeListener listener = new MyConnectionChangeListener();
    private ScheduledFuture<?> refreshJob;
    private final Object sync = new Object();
    private final Map<HostAndPort, RawMemcacheClient> clients = new HashMap<HostAndPort, RawMemcacheClient>();
    private final Collection<RawMemcacheClient> shutdownQueue = new ArrayList<RawMemcacheClient>();
    private volatile RawMemcacheClient currentClient;
    private volatile RawMemcacheClient pendingClient = null;
    private boolean shutdown = false;
    private final Function<Collection<AddressAndClient>, NodeLocator> nodeLocator;

    public ResolvingKetamaClient(Resolver resolver, ScheduledExecutorService executor, long period, TimeUnit periodUnit, Connector connector, long shutdownDelay, TimeUnit shutdownUnit, Function<Collection<AddressAndClient>, NodeLocator> nodeLocator) {
        this.resolver = resolver;
        this.connector = connector;
        this.shutdownDelay = shutdownDelay;
        this.shutdownUnit = shutdownUnit;
        this.executor = executor;
        this.currentClient = NotConnectedClient.INSTANCE;
        this.ttl = TimeUnit.SECONDS.convert(period, periodUnit);
        this.nodeLocator = nodeLocator;
    }

    public void start() {
        if (this.refreshJob != null) {
            throw new RuntimeException("You may only start this once");
        }
        this.refreshJob = this.executor.schedule(this::resolve, 0L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resolve() {
        Object object = this.sync;
        synchronized (object) {
            if (this.shutdown) {
                return;
            }
            long ttl = this.ttl;
            try {
                List<Resolver.ResolveResult> lookupResults = this.resolver.resolve();
                if (lookupResults.isEmpty()) {
                    return;
                }
                Set newAddresses = lookupResults.stream().map(result -> HostAndPort.fromParts(result.getHost(), result.getPort())).collect(Collectors.toSet());
                long resolvedTtl = lookupResults.stream().mapToLong(Resolver.ResolveResult::getTtl).min().orElse(Long.MAX_VALUE);
                ttl = Math.min(ttl, resolvedTtl);
                Set<HostAndPort> currentAddresses = this.clients.keySet();
                if (!newAddresses.equals(currentAddresses)) {
                    ImmutableSet toRemove = Sets.difference(currentAddresses, newAddresses).immutableCopy();
                    Sets.SetView toAdd = Sets.difference(newAddresses, currentAddresses);
                    if (!toAdd.isEmpty()) {
                        log.info("Connecting to " + toAdd);
                    }
                    if (!toRemove.isEmpty()) {
                        log.info("Scheduling disconnect from " + toRemove);
                    }
                    for (HostAndPort host : toAdd) {
                        RawMemcacheClient newClient = this.connector.connect(host);
                        newClient.registerForConnectionChanges(this.listener);
                        this.clients.put(host, newClient);
                    }
                    ImmutableList.Builder removedClients = ImmutableList.builder();
                    for (HostAndPort host : toRemove) {
                        RawMemcacheClient removed = this.clients.remove(host);
                        removed.unregisterForConnectionChanges(this.listener);
                        removedClients.add((Object)removed);
                    }
                    this.setPendingClient((ImmutableList.Builder<RawMemcacheClient>)removedClients);
                }
            }
            finally {
                long delay = this.clamp(10, 3600, ttl);
                this.refreshJob = this.executor.schedule(this::resolve, delay, TimeUnit.SECONDS);
            }
        }
    }

    private long clamp(int min, int max, long value) {
        return Math.max((long)min, Math.min((long)max, value));
    }

    @Override
    public <T> CompletionStage<T> send(Request<T> request) {
        return this.currentClient.send(request);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        Object object = this.sync;
        synchronized (object) {
            this.shutdown = true;
            if (this.refreshJob != null) {
                this.refreshJob.cancel(false);
            }
            this.clients.values().forEach(RawMemcacheClient::shutdown);
        }
    }

    @Override
    public boolean isConnected() {
        return this.currentClient.isConnected();
    }

    @Override
    public Throwable getConnectionFailure() {
        return this.currentClient.getConnectionFailure();
    }

    @Override
    public int numTotalConnections() {
        return this.currentClient.numTotalConnections();
    }

    @Override
    public int numActiveConnections() {
        return this.currentClient.numActiveConnections();
    }

    @Override
    public int numPendingRequests() {
        return this.currentClient.numPendingRequests();
    }

    @Override
    public Stream<AddressAndClient> streamNodes() {
        return this.currentClient.streamNodes();
    }

    private void setPendingClient(ImmutableList.Builder<RawMemcacheClient> removedClients) {
        this.shutdownQueue.addAll((Collection<RawMemcacheClient>)removedClients.build());
        List<AddressAndClient> addressAndClients = this.clients.entrySet().stream().map(e -> new AddressAndClient((HostAndPort)e.getKey(), (RawMemcacheClient)e.getValue())).collect(Collectors.toList());
        KetamaMemcacheClient newClient = new KetamaMemcacheClient(addressAndClients, this.nodeLocator.apply(addressAndClients));
        this.pendingClient = newClient;
        newClient.connectFuture().thenRun(() -> {
            ImmutableList shutdownJob;
            Object object = this.sync;
            synchronized (object) {
                if (this.pendingClient != newClient) {
                    return;
                }
                this.currentClient = newClient;
                this.pendingClient = null;
                shutdownJob = ImmutableList.copyOf(this.shutdownQueue);
                this.shutdownQueue.clear();
            }
            this.executor.schedule(() -> shutdownJob.forEach(RawMemcacheClient::shutdown), this.shutdownDelay, this.shutdownUnit);
            this.notifyConnectionChange();
        });
    }

    private class MyConnectionChangeListener
    implements ConnectionChangeListener {
        private MyConnectionChangeListener() {
        }

        @Override
        public void connectionChanged(ObservableClient client) {
            ResolvingKetamaClient.this.notifyConnectionChange();
        }
    }

    public static interface Connector {
        public RawMemcacheClient connect(HostAndPort var1);
    }
}

