/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.client.interest;

import com.netflix.eureka2.client.interest.BatchingRegistry;
import com.netflix.eureka2.interests.ChangeNotification;
import com.netflix.eureka2.interests.ChangeNotifications;
import com.netflix.eureka2.interests.Interest;
import com.netflix.eureka2.interests.MultipleInterests;
import com.netflix.eureka2.interests.StreamStateNotification;
import com.netflix.eureka2.utils.rx.PauseableSubject;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Notification;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

public class BatchingRegistryImpl<T>
implements BatchingRegistry<T> {
    private static final Logger logger = LoggerFactory.getLogger(BatchingRegistryImpl.class);
    private final Map<Interest<T>, StreamStateNotification.BufferState> interestsBufferingState = new ConcurrentHashMap<Interest<T>, StreamStateNotification.BufferState>();
    private final PauseableSubject<Boolean> updatesSubject = PauseableSubject.create();
    private final PublishSubject<Observable<ChangeNotification<T>>> notificationSources = PublishSubject.create();

    public BatchingRegistryImpl() {
        this.notificationSources.switchMap(new Func1<Observable<ChangeNotification<T>>, Observable<Boolean>>(){

            public Observable<Boolean> call(Observable<ChangeNotification<T>> notifications) {
                return notifications.filter(ChangeNotifications.streamStateFilter()).map(new Func1<ChangeNotification<T>, Boolean>(){

                    public Boolean call(ChangeNotification<T> notification) {
                        StreamStateNotification stateNotification = (StreamStateNotification)notification;
                        BatchingRegistryImpl.this.interestsBufferingState.put(stateNotification.getInterest(), stateNotification.getBufferState());
                        return true;
                    }
                }).materialize().map((Func1)new Func1<Notification<Boolean>, Boolean>(){

                    public Boolean call(Notification<Boolean> materialized) {
                        switch (materialized.getKind()) {
                            case OnNext: {
                                return (Boolean)materialized.getValue();
                            }
                            case OnCompleted: {
                                break;
                            }
                            case OnError: {
                                if (!logger.isDebugEnabled()) break;
                                logger.debug("Swallowed observable error", materialized.getThrowable());
                            }
                        }
                        return false;
                    }
                }).doOnTerminate(new Action0(){

                    public void call() {
                        BatchingRegistryImpl.this.interestsBufferingState.clear();
                    }
                });
            }
        }).subscribe(this.updatesSubject);
    }

    @Override
    public void connectTo(Observable<ChangeNotification<T>> changeNotifications) {
        this.notificationSources.onNext(changeNotifications);
    }

    @Override
    public Observable<StreamStateNotification.BufferState> forInterest(final Interest<T> interest) {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<StreamStateNotification.BufferState>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void call(Subscriber<? super StreamStateNotification.BufferState> subscriber) {
                BatchingRegistryImpl.this.updatesSubject.pause();
                try {
                    StreamStateNotification.BufferState current = BatchingRegistryImpl.this.shouldBatch(interest);
                    subscriber.onNext((Object)current);
                    BatchingRegistryImpl.this.updatesSubject.map((Func1)new Func1<Boolean, StreamStateNotification.BufferState>(){

                        public StreamStateNotification.BufferState call(Boolean tick) {
                            return BatchingRegistryImpl.this.shouldBatch(interest);
                        }
                    }).subscribe(subscriber);
                }
                finally {
                    BatchingRegistryImpl.this.updatesSubject.resume();
                }
            }
        }).distinctUntilChanged();
    }

    @Override
    public void retainAll(Interest<T> interest) {
        Set<Interest<T>> toKeep = interest instanceof MultipleInterests ? ((MultipleInterests)interest).flatten() : Collections.singleton(interest);
        this.interestsBufferingState.keySet().retainAll(toKeep);
    }

    @Override
    public StreamStateNotification.BufferState shouldBatch(Interest<T> interest) {
        if (interest instanceof MultipleInterests) {
            Set interests = ((MultipleInterests)interest).getInterests();
            boolean allUnknown = true;
            for (Interest atomic : interests) {
                StreamStateNotification.BufferState state = this.shouldBatchAtomic(atomic);
                if (state == StreamStateNotification.BufferState.BufferStart) {
                    return StreamStateNotification.BufferState.BufferStart;
                }
                allUnknown &= state == StreamStateNotification.BufferState.Unknown;
            }
            return allUnknown ? StreamStateNotification.BufferState.Unknown : StreamStateNotification.BufferState.BufferEnd;
        }
        return this.shouldBatchAtomic(interest);
    }

    @Override
    public void shutdown() {
        this.updatesSubject.onCompleted();
        this.notificationSources.onCompleted();
    }

    private StreamStateNotification.BufferState shouldBatchAtomic(Interest<T> atomic) {
        StreamStateNotification.BufferState state = this.interestsBufferingState.get(atomic);
        if (state == null || state == StreamStateNotification.BufferState.Unknown) {
            return StreamStateNotification.BufferState.Unknown;
        }
        return state;
    }
}

