/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.client.resolver;

import com.netflix.eureka2.Server;
import com.netflix.eureka2.client.resolver.ServerResolver;
import com.netflix.eureka2.interests.ChangeNotification;
import com.netflix.eureka2.interests.ChangeNotifications;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Notification;
import rx.Observable;
import rx.functions.Func1;

public class RoundRobinServerResolver
implements ServerResolver {
    private static final Logger logger = LoggerFactory.getLogger(RoundRobinServerResolver.class);
    private static final Exception SERVER_CACHE_EMPTY_EXCEPTION = new NoSuchElementException("No servers available for this resolver");
    private final int cacheRefreshTimeout;
    private final TimeUnit timeUnit;
    private final Observable<ChangeNotification<Server>> serverSource;
    private final AtomicReference<List<Server>> serverCacheRef;
    private final AtomicInteger positionRef;

    protected RoundRobinServerResolver(Server ... servers) {
        this((Observable<ChangeNotification<Server>>)Observable.from((Object[])servers).map(ChangeNotifications.toAddChangeNotification()), 10, TimeUnit.SECONDS);
    }

    protected RoundRobinServerResolver(Observable<ChangeNotification<Server>> serverSource) {
        this(serverSource, 10, TimeUnit.SECONDS);
    }

    protected RoundRobinServerResolver(Observable<ChangeNotification<Server>> serverSource, int cacheRefreshTimeout, TimeUnit timeUnit) {
        this.serverSource = serverSource;
        this.cacheRefreshTimeout = cacheRefreshTimeout;
        this.timeUnit = timeUnit;
        this.serverCacheRef = new AtomicReference(new ArrayList());
        this.positionRef = new AtomicInteger(new Random().nextInt(1000));
    }

    public RoundRobinServerResolver withWarmUpConfiguration(int newWarmUpTimeout, TimeUnit newTimeUnit) {
        return new RoundRobinServerResolver(this.serverSource, newWarmUpTimeout, newTimeUnit);
    }

    @Override
    public void close() {
        this.serverCacheRef.get().clear();
    }

    @Override
    public Observable<Server> resolve() {
        return this.refreshServerCache().concatMap((Func1)new Func1<List<Server>, Observable<? extends Server>>(){

            public Observable<? extends Server> call(List<Server> servers) {
                if (servers.isEmpty()) {
                    return Observable.error((Throwable)SERVER_CACHE_EMPTY_EXCEPTION);
                }
                int currentPos = Math.abs(RoundRobinServerResolver.this.positionRef.getAndIncrement());
                Server toReturn = servers.get(currentPos % servers.size());
                return Observable.just((Object)toReturn);
            }
        });
    }

    private Observable<List<Server>> refreshServerCache() {
        return this.serverSource.compose(ChangeNotifications.buffers()).compose(ChangeNotifications.snapshots()).materialize().concatMap((Func1)new Func1<Notification<LinkedHashSet<Server>>, Observable<? extends List<Server>>>(){

            public Observable<? extends List<Server>> call(Notification<LinkedHashSet<Server>> rxNotification) {
                switch (rxNotification.getKind()) {
                    case OnNext: {
                        LinkedHashSet servers = (LinkedHashSet)rxNotification.getValue();
                        if (servers.isEmpty()) {
                            return Observable.never();
                        }
                        logger.info("Populating the serverCache with {} servers", (Object)servers.size());
                        ArrayList newServerCache = new ArrayList(servers);
                        Collections.sort(newServerCache);
                        RoundRobinServerResolver.this.serverCacheRef.set(newServerCache);
                        break;
                    }
                    case OnCompleted: {
                        break;
                    }
                    case OnError: {
                        return Observable.error((Throwable)rxNotification.getThrowable());
                    }
                }
                return Observable.just(RoundRobinServerResolver.this.serverCacheRef.get());
            }
        }).timeout((long)this.cacheRefreshTimeout, this.timeUnit).onErrorResumeNext((Func1)new Func1<Throwable, Observable<? extends List<Server>>>(){

            public Observable<? extends List<Server>> call(Throwable throwable) {
                if (!(throwable instanceof TimeoutException)) {
                    logger.warn("Exception thrown when connecting the serverCache to the serverSource, using backup values", throwable);
                }
                return Observable.just(RoundRobinServerResolver.this.serverCacheRef.get());
            }
        }).take(1);
    }
}

