/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.inventory.base;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.hawkular.inventory.api.Action;
import org.hawkular.inventory.api.Interest;
import org.hawkular.inventory.api.Log;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.observers.SafeSubscriber;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

final class ObservableContext {
    private final Map<Interest<?, ?>, SubjectAndWrapper<?>> observables = new ConcurrentHashMap();

    ObservableContext() {
    }

    public <C> Observable<C> getObservableFor(Interest<C, ?> interest) {
        SubjectAndWrapper<C> sub = this.getSubjectAndWrapper(interest, true);
        return sub.wrapper;
    }

    public boolean isObserved(Interest<?, ?> interest) {
        return this.observables.containsKey(interest);
    }

    public <C, T> Iterator<Subject<C, C>> matchingSubjects(Action<C, T> action, T object) {
        return this.observables.entrySet().stream().filter(e -> ((Interest)e.getKey()).matches(action, object)).map(e -> ((SubjectAndWrapper)e.getValue()).subject).iterator();
    }

    private <C> SubjectAndWrapper<C> getSubjectAndWrapper(Interest<C, ?> interest, boolean initialize) {
        SubjectAndWrapper<Object> sub = this.observables.get(interest);
        if (initialize && sub == null) {
            SubscriptionTracker tracker = new SubscriptionTracker(() -> this.observables.remove(interest));
            SerializedSubject subject = PublishSubject.create().toSerialized();
            Observable wrapper = null;
            wrapper = subject.lift(new OperatorIgnoreError()).doOnSubscribe(tracker.onSubscribe()).doOnUnsubscribe(tracker.onUnsubscribe());
            sub = new SubjectAndWrapper(subject, wrapper);
            this.observables.put(interest, sub);
        }
        return sub;
    }

    private static final class OperatorIgnoreError<T>
    implements Observable.Operator<T, T> {
        private OperatorIgnoreError() {
        }

        @Override
        public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
            return new SafeSubscriber<T>(subscriber){
                private boolean done;
                private final Subscriber<? super T> actual;
                {
                    super(x0);
                    this.done = false;
                    Subscriber s = subscriber;
                    while (s instanceof SafeSubscriber) {
                        s = ((SafeSubscriber)s).getActual();
                    }
                    this.actual = s;
                }

                @Override
                public void onNext(T t) {
                    if (this.done) {
                        return;
                    }
                    try {
                        this.actual.onNext(t);
                    }
                    catch (Exception e) {
                        Log.LOGGER.debugf((Throwable)e, "Subscriber %s failed to process %s.", (Object)this.actual, t);
                    }
                }

                @Override
                protected void _onError(Throwable e) {
                    this.done = true;
                    super._onError(e);
                }
            };
        }
    }

    private static class SubjectAndWrapper<T> {
        final Subject<T, T> subject;
        final Observable<T> wrapper;

        private SubjectAndWrapper(Subject<T, T> subject, Observable<T> wrapper) {
            this.subject = subject;
            this.wrapper = wrapper;
        }
    }

    private static class SubscriptionTracker {
        private final AtomicLong counter = new AtomicLong(0L);
        private final Runnable action;

        public SubscriptionTracker(Runnable action) {
            this.action = action;
        }

        public Action0 onSubscribe() {
            return this.counter::incrementAndGet;
        }

        public Action0 onUnsubscribe() {
            return () -> {
                if (this.counter.decrementAndGet() == 0L) {
                    this.action.run();
                }
            };
        }
    }
}

