/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.utils;

import com.netflix.eureka2.interests.ChangeNotification;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;

public abstract class StreamedDataCollector<R> {
    public abstract List<R> latestSnapshot();

    public abstract void close();

    public static <R> StreamedDataCollector<R> from(Collection<R> collection) {
        final List<R> copy = Collections.unmodifiableList(new ArrayList<R>(collection));
        return new StreamedDataCollector<R>(){

            @Override
            public List<R> latestSnapshot() {
                return copy;
            }

            @Override
            public void close() {
            }
        };
    }

    public static <E, R> StreamedDataCollector<R> from(Observable<ChangeNotification<E>> notifications, Func1<E, R> converter) {
        return new ChangeNotificationCollector<E, R>(notifications, converter);
    }

    static class ChangeNotificationCollector<E, R>
    extends StreamedDataCollector<R> {
        private static final Logger logger = LoggerFactory.getLogger(ChangeNotificationCollector.class);
        private final ConcurrentSkipListSet<R> servers = new ConcurrentSkipListSet();
        private final Subscription subscription;

        ChangeNotificationCollector(Observable<ChangeNotification<E>> notifications, final Func1<E, R> converter) {
            this.subscription = notifications.subscribe(new Subscriber<ChangeNotification<E>>(){

                public void onCompleted() {
                }

                public void onError(Throwable e) {
                    logger.error("Change notification stream terminated with error", e);
                }

                public void onNext(ChangeNotification<E> notification) {
                    Object converted = converter.call(notification.getData());
                    switch (notification.getKind()) {
                        case Add: 
                        case Modify: {
                            ChangeNotificationCollector.this.servers.add(converted);
                            break;
                        }
                        case Delete: {
                            ChangeNotificationCollector.this.servers.remove(converted);
                        }
                    }
                }
            });
        }

        @Override
        public List<R> latestSnapshot() {
            if (this.subscription.isUnsubscribed()) {
                throw new IllegalStateException("change notification stream is closed");
            }
            return new ArrayList<R>(this.servers);
        }

        @Override
        public void close() {
            this.subscription.unsubscribe();
        }
    }
}

