/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.client.interest;

import com.netflix.eureka2.client.interest.BatchFunctions;
import com.netflix.eureka2.client.interest.BatchingRegistry;
import com.netflix.eureka2.client.interest.BatchingRegistryImpl;
import com.netflix.eureka2.interests.ChangeNotification;
import com.netflix.eureka2.interests.ChangeNotifications;
import com.netflix.eureka2.interests.Index;
import com.netflix.eureka2.interests.IndexRegistry;
import com.netflix.eureka2.interests.Interest;
import com.netflix.eureka2.interests.MultipleInterests;
import com.netflix.eureka2.interests.SourcedChangeNotification;
import com.netflix.eureka2.interests.SourcedModifyNotification;
import com.netflix.eureka2.interests.StreamStateNotification;
import com.netflix.eureka2.registry.SourcedEurekaRegistry;
import java.util.concurrent.atomic.AtomicBoolean;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.ConnectableObservable;
import rx.subjects.PublishSubject;

public class BatchAwareIndexRegistry<T>
implements IndexRegistry<T> {
    private static final ChangeNotification<?> FINISH_BATCHING_NOTIFICATION = new ChangeNotification<Object>(ChangeNotification.Kind.BufferSentinel, null);
    private final IndexRegistry<T> delegateRegistry;
    private final BatchingRegistry<T> remoteBatchingRegistry;

    public BatchAwareIndexRegistry(IndexRegistry<T> delegateRegistry, BatchingRegistry<T> remoteBatchingRegistry) {
        this.delegateRegistry = delegateRegistry;
        this.remoteBatchingRegistry = remoteBatchingRegistry;
    }

    @Override
    public Observable<ChangeNotification<T>> forInterest(Interest<T> interest, Observable<ChangeNotification<T>> dataSource, Index.InitStateHolder<T> initStateHolder) {
        return this.mergeWithBatchRegistryHints(interest, this.delegateRegistry.forInterest(interest, dataSource, initStateHolder));
    }

    @Override
    public Observable<ChangeNotification<T>> forCompositeInterest(MultipleInterests<T> interest, SourcedEurekaRegistry<T> registry) {
        return this.mergeWithBatchRegistryHints(interest, this.delegateRegistry.forCompositeInterest(interest, registry));
    }

    @Override
    public Observable<Void> shutdown() {
        this.remoteBatchingRegistry.shutdown();
        return this.delegateRegistry.shutdown();
    }

    @Override
    public Observable<Void> shutdown(Throwable cause) {
        this.remoteBatchingRegistry.shutdown();
        return this.delegateRegistry.shutdown(cause);
    }

    private Observable<ChangeNotification<T>> mergeWithBatchRegistryHints(final Interest<T> interest, final Observable<ChangeNotification<T>> changeNotifications) {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<ChangeNotification<T>>(){

            public void call(Subscriber<? super ChangeNotification<T>> subscriber) {
                ConnectableObservable notifications = changeNotifications.publish();
                final BatchingRegistryImpl localBatchingRegistry = new BatchingRegistryImpl();
                localBatchingRegistry.connectTo(notifications);
                final AtomicBoolean batchingMode = new AtomicBoolean();
                Observable finishBatchingObservable = BatchFunctions.combine(localBatchingRegistry.forInterest(interest), (Observable<StreamStateNotification.BufferState>)BatchAwareIndexRegistry.this.remoteBatchingRegistry.forInterest(interest).doOnTerminate(new Action0(){

                    public void call() {
                        localBatchingRegistry.shutdown();
                    }
                })).doOnNext((Action1)new Action1<Boolean>(){

                    public void call(Boolean status) {
                        batchingMode.set(status);
                    }
                }).filter((Func1)new Func1<Boolean, Boolean>(){

                    public Boolean call(Boolean batching) {
                        return batching == false;
                    }
                }).map(new Func1<Boolean, ChangeNotification<T>>(){

                    public ChangeNotification<T> call(Boolean aBoolean) {
                        return FINISH_BATCHING_NOTIFICATION;
                    }
                }).doOnTerminate(new Action0(){

                    public void call() {
                        localBatchingRegistry.shutdown();
                    }
                });
                Observable dataNotifications = notifications.filter(ChangeNotifications.dataOnlyFilter()).flatMap(new Func1<ChangeNotification<T>, Observable<ChangeNotification<T>>>(){

                    public Observable<ChangeNotification<T>> call(ChangeNotification<T> notification) {
                        ChangeNotification result = notification;
                        if (notification instanceof SourcedChangeNotification) {
                            result = ((SourcedChangeNotification)notification).toBaseNotification();
                        }
                        if (notification instanceof SourcedModifyNotification) {
                            result = ((SourcedModifyNotification)notification).toBaseNotification();
                        }
                        if (batchingMode.get()) {
                            return Observable.just(result);
                        }
                        return Observable.just(result, (Object)FINISH_BATCHING_NOTIFICATION);
                    }
                });
                PublishSubject resultSubject = PublishSubject.create();
                resultSubject.subscribe(subscriber);
                finishBatchingObservable.mergeWith(dataNotifications).subscribe((Observer)resultSubject);
                notifications.connect();
            }
        });
    }
}

