/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.remote.observable.reconciliator;

import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.network.Endpoint;
import io.reactivex.mantis.remote.observable.DynamicConnectionSet;
import io.reactivex.mantis.remote.observable.EndpointChange;
import io.reactivex.mantis.remote.observable.EndpointInjector;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;

public class Reconciliator<T> {
    private static final Logger logger = LoggerFactory.getLogger(Reconciliator.class);
    private static final AtomicBoolean startedReconciliation = new AtomicBoolean(false);
    private String name;
    private Subscription subscription;
    private DynamicConnectionSet<T> connectionSet;
    private PublishSubject<Set<Endpoint>> currentExpectedSet = PublishSubject.create();
    private EndpointInjector injector;
    private PublishSubject<EndpointChange> reconciledChanges = PublishSubject.create();
    private Metrics metrics;
    private Counter reconciliationCheck;
    private Gauge running;
    private Gauge expectedSetSize;

    Reconciliator(Builder<T> builder) {
        this.name = ((Builder)builder).name;
        this.injector = ((Builder)builder).injector;
        this.connectionSet = ((Builder)builder).connectionSet;
        this.metrics = new Metrics.Builder().name("Reconciliator_" + this.name).addCounter("reconciliationCheck").addGauge("expectedSetSize").addGauge("running").build();
        this.reconciliationCheck = this.metrics.getCounter("reconciliationCheck");
        this.running = this.metrics.getGauge("running");
        this.expectedSetSize = this.metrics.getGauge("expectedSetSize");
    }

    public Metrics getMetrics() {
        return this.metrics;
    }

    private Observable<EndpointChange> deltas() {
        final HashMap sideEffectState = new HashMap();
        final PublishSubject stopReconciliator = PublishSubject.create();
        return Observable.merge((Observable)this.reconciledChanges.takeUntil((Observable)stopReconciliator).doOnCompleted(() -> logger.info("onComplete triggered for reconciledChanges")).doOnError(e -> logger.error("caught exception for reconciledChanges {}", (Object)e.getMessage(), e)), (Observable)this.injector.deltas().doOnCompleted(new Action0(){

            public void call() {
                logger.info("Stopping reconciliator, injector completed.");
                stopReconciliator.onNext((Object)1);
                Reconciliator.this.stopReconciliation();
            }
        }).doOnError(e -> logger.error("caught exception for injector deltas {}", (Object)e.getMessage(), e)).doOnNext((Action1)new Action1<EndpointChange>(){

            public void call(EndpointChange newEndpointChange) {
                String id = Endpoint.uniqueHost((String)newEndpointChange.getEndpoint().getHost(), (int)newEndpointChange.getEndpoint().getPort(), (String)newEndpointChange.getEndpoint().getSlotId());
                if (sideEffectState.containsKey(id)) {
                    if (newEndpointChange.getType() == EndpointChange.Type.complete) {
                        Reconciliator.this.expectedSetSize.decrement();
                        sideEffectState.remove(id);
                        Reconciliator.this.currentExpectedSet.onNext(new HashSet(sideEffectState.values()));
                    }
                } else if (newEndpointChange.getType() == EndpointChange.Type.add) {
                    Reconciliator.this.expectedSetSize.increment();
                    sideEffectState.put(id, new Endpoint(newEndpointChange.getEndpoint().getHost(), newEndpointChange.getEndpoint().getPort(), newEndpointChange.getEndpoint().getSlotId()));
                    Reconciliator.this.currentExpectedSet.onNext(new HashSet(sideEffectState.values()));
                }
            }
        })).doOnError(t -> logger.error("caught error processing reconciliator deltas {}", (Object)t.getMessage(), t)).doOnSubscribe(new Action0(){

            public void call() {
                logger.info("Subscribed to deltas for {}, clearing active connection set", (Object)Reconciliator.this.name);
                Reconciliator.this.connectionSet.resetActiveConnections();
                Reconciliator.this.startReconciliation();
            }
        }).doOnUnsubscribe(new Action0(){

            public void call() {
                logger.info("Unsubscribed from deltas for {}", (Object)Reconciliator.this.name);
            }
        });
    }

    private void startReconciliation() {
        if (startedReconciliation.compareAndSet(false, true)) {
            logger.info("Starting reconciliation for name: " + this.name);
            this.running.increment();
            this.subscription = Observable.combineLatest(this.currentExpectedSet, this.connectionSet.activeConnections(), (Func2)new Func2<Set<Endpoint>, Set<Endpoint>, Void>(){

                public Void call(Set<Endpoint> expected, Set<Endpoint> actual) {
                    Reconciliator.this.reconciliationCheck.increment();
                    boolean check = expected.equals(actual);
                    logger.debug("Check result: " + check + ", size expected: " + expected.size() + " actual: " + actual.size() + ", for values expected: " + expected + " versus actual: " + actual);
                    if (!check) {
                        HashSet<Endpoint> expectedDiff = new HashSet<Endpoint>(expected);
                        expectedDiff.removeAll(actual);
                        if (expectedDiff.size() > 0) {
                            for (Endpoint endpoint : expectedDiff) {
                                logger.info("Connection missing from expected set, adding missing connection: " + endpoint);
                                Reconciliator.this.reconciledChanges.onNext((Object)new EndpointChange(EndpointChange.Type.add, endpoint));
                            }
                        }
                        HashSet<Endpoint> actualDiff = new HashSet<Endpoint>(actual);
                        actualDiff.removeAll(expected);
                        if (actualDiff.size() > 0) {
                            for (Endpoint endpoint : actualDiff) {
                                logger.info("Unexpected connection in active set, removing connection: " + endpoint);
                                Reconciliator.this.reconciledChanges.onNext((Object)new EndpointChange(EndpointChange.Type.complete, endpoint));
                            }
                        }
                    }
                    return null;
                }
            }).onErrorResumeNext((Func1)new Func1<Throwable, Observable<? extends Void>>(){

                public Observable<? extends Void> call(Throwable throwable) {
                    logger.error("caught error in Reconciliation for {}", (Object)Reconciliator.this.name, (Object)throwable);
                    return Observable.empty();
                }
            }).doOnCompleted(new Action0(){

                public void call() {
                    logger.error("onComplete in Reconciliation observable chain for {}", (Object)Reconciliator.this.name);
                    Reconciliator.this.stopReconciliation();
                }
            }).subscribe();
        } else {
            logger.info("reconciliation already started for {}", (Object)this.name);
        }
    }

    private void stopReconciliation() {
        if (startedReconciliation.compareAndSet(true, false)) {
            logger.info("Stopping reconciliation for name: " + this.name);
            this.running.decrement();
            this.subscription.unsubscribe();
        } else {
            logger.info("reconciliation already stopped for name: " + this.name);
        }
    }

    public Observable<Observable<T>> observables() {
        this.connectionSet.setEndpointInjector(new EndpointInjector(){

            @Override
            public Observable<EndpointChange> deltas() {
                return Reconciliator.this.deltas();
            }
        });
        return this.connectionSet.observables();
    }

    public static class Builder<T> {
        private String name;
        private EndpointInjector injector;
        private DynamicConnectionSet<T> connectionSet;

        public Builder<T> connectionSet(DynamicConnectionSet<T> connectionSet) {
            this.connectionSet = connectionSet;
            return this;
        }

        public Builder<T> name(String name) {
            this.name = name;
            return this;
        }

        public Builder<T> injector(EndpointInjector injector) {
            this.injector = injector;
            return this;
        }

        public Reconciliator<T> build() {
            return new Reconciliator(this);
        }
    }
}

