/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.event.CouchbaseEvent;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.dcp.conductor.ConfigProvider;
import com.couchbase.client.dcp.conductor.DcpChannel;
import com.couchbase.client.dcp.conductor.HttpStreamingConfigProvider;
import com.couchbase.client.dcp.conductor.NotMyVbucketException;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.events.FailedToAddNodeEvent;
import com.couchbase.client.dcp.events.FailedToMovePartitionEvent;
import com.couchbase.client.dcp.events.FailedToRemoveNodeEvent;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.util.retry.RetryBuilder;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.util.internal.ConcurrentSet;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Observable;
import rx.Single;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Action4;
import rx.functions.Func1;

public class Conductor {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Conductor.class);
    private final ConfigProvider configProvider;
    private final Set<DcpChannel> channels;
    private volatile long configRev = -1L;
    private volatile boolean stopped = true;
    private final ClientEnvironment env;
    private final AtomicReference<CouchbaseBucketConfig> currentConfig;
    private final boolean ownsConfigProvider;
    private final SessionState sessionState;

    public Conductor(ClientEnvironment env, ConfigProvider cp) {
        this.env = env;
        this.currentConfig = new AtomicReference();
        this.sessionState = new SessionState();
        this.configProvider = cp == null ? new HttpStreamingConfigProvider(env) : cp;
        this.ownsConfigProvider = cp == null;
        this.configProvider.configs().forEach((Action1)new Action1<CouchbaseBucketConfig>(){

            public void call(CouchbaseBucketConfig config) {
                if (config.rev() > Conductor.this.configRev) {
                    Conductor.this.configRev = config.rev();
                    LOGGER.trace("Applying new configuration, rev is now {}.", (Object)Conductor.this.configRev);
                    Conductor.this.currentConfig.set(config);
                    Conductor.this.reconfigure(config);
                } else {
                    LOGGER.trace("Ignoring config, since rev has not changed.");
                }
            }
        });
        this.channels = new ConcurrentSet();
    }

    public SessionState sessionState() {
        return this.sessionState;
    }

    public Completable connect() {
        this.stopped = false;
        Completable atLeastOneConfig = this.configProvider.configs().first().toCompletable().timeout(this.env.bootstrapTimeout(), TimeUnit.SECONDS).doOnError((Action1)new Action1<Throwable>(){

            public void call(Throwable throwable) {
                LOGGER.warn("Did not receive initial configuration from provider.");
            }
        });
        return this.configProvider.start().timeout(this.env.connectTimeout(), TimeUnit.SECONDS).doOnError((Action1)new Action1<Throwable>(){

            public void call(Throwable throwable) {
                LOGGER.warn("Cannot connect configuration provider.");
            }
        }).concatWith(atLeastOneConfig);
    }

    public boolean disconnected() {
        if (!this.configProvider.isState((Enum)LifecycleState.DISCONNECTED)) {
            return false;
        }
        for (DcpChannel channel : this.channels) {
            if (channel.isState((Enum)LifecycleState.DISCONNECTED)) continue;
            return false;
        }
        return true;
    }

    public Completable stop() {
        LOGGER.debug("Instructed to shutdown.");
        this.stopped = true;
        Completable channelShutdown = Observable.from(this.channels).flatMap(new Func1<DcpChannel, Observable<?>>(){

            public Observable<?> call(DcpChannel dcpChannel) {
                return dcpChannel.disconnect().toObservable();
            }
        }).toCompletable();
        if (this.ownsConfigProvider) {
            channelShutdown = channelShutdown.andThen(this.configProvider.stop());
        }
        return channelShutdown.doOnCompleted(new Action0(){

            public void call() {
                LOGGER.info("Shutdown complete.");
            }
        });
    }

    public int numberOfPartitions() {
        CouchbaseBucketConfig config = this.currentConfig.get();
        return config.numberOfPartitions();
    }

    public Observable<ByteBuf> getSeqnos() {
        return Observable.from(this.channels).flatMap((Func1)new Func1<DcpChannel, Observable<ByteBuf>>(){

            public Observable<ByteBuf> call(DcpChannel channel) {
                return Conductor.this.getSeqnosForChannel(channel);
            }
        });
    }

    private Observable<ByteBuf> getSeqnosForChannel(final DcpChannel channel) {
        return Observable.just((Object)((Object)channel)).flatMap((Func1)new Func1<DcpChannel, Observable<ByteBuf>>(){

            public Observable<ByteBuf> call(DcpChannel channel) {
                return channel.getSeqnos().toObservable();
            }
        }).retryWhen((Func1)RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed((long)200L, (TimeUnit)TimeUnit.MILLISECONDS)).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>(){

            public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                LOGGER.debug("Rescheduling get Seqnos for channel {}, not connected (yet).", (Object)channel);
            }
        }).build());
    }

    public Single<ByteBuf> getFailoverLog(final short partition) {
        return Observable.just((Object)partition).map((Func1)new Func1<Short, DcpChannel>(){

            public DcpChannel call(Short aShort) {
                return Conductor.this.masterChannelByPartition(partition);
            }
        }).flatMap((Func1)new Func1<DcpChannel, Observable<ByteBuf>>(){

            public Observable<ByteBuf> call(DcpChannel channel) {
                return channel.getFailoverLog(partition).toObservable();
            }
        }).retryWhen((Func1)RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed((long)200L, (TimeUnit)TimeUnit.MILLISECONDS)).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>(){

            public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                LOGGER.debug("Rescheduling Get Failover Log for vbid {}, not connected (yet).", (Object)partition);
            }
        }).build()).toSingle();
    }

    public Completable startStreamForPartition(final short partition, final long vbuuid, final long startSeqno, final long endSeqno, final long snapshotStartSeqno, final long snapshotEndSeqno) {
        return Observable.just((Object)partition).map((Func1)new Func1<Short, DcpChannel>(){

            public DcpChannel call(Short aShort) {
                return Conductor.this.masterChannelByPartition(partition);
            }
        }).flatMap(new Func1<DcpChannel, Observable<?>>(){

            public Observable<?> call(DcpChannel channel) {
                return channel.openStream(partition, vbuuid, startSeqno, endSeqno, snapshotStartSeqno, snapshotEndSeqno).toObservable();
            }
        }).retryWhen((Func1)RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed((long)200L, (TimeUnit)TimeUnit.MILLISECONDS)).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>(){

            public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", (Object)partition);
            }
        }).build()).toCompletable();
    }

    public Completable stopStreamForPartition(short partition) {
        if (this.streamIsOpen(partition)) {
            DcpChannel channel = this.masterChannelByPartition(partition);
            return channel.closeStream(partition);
        }
        return Completable.complete();
    }

    public boolean streamIsOpen(short partition) {
        DcpChannel channel = this.masterChannelByPartition(partition);
        return channel.streamIsOpen(partition);
    }

    private DcpChannel masterChannelByPartition(short partition) {
        CouchbaseBucketConfig config = this.currentConfig.get();
        short index = config.nodeIndexForMaster((int)partition, false);
        NodeInfo node = config.nodeAtIndex((int)index);
        for (DcpChannel ch : this.channels) {
            InetSocketAddress address = new InetSocketAddress(node.hostname().nameOrAddress(), (int)((Integer)(this.env.sslEnabled() ? node.sslServices() : node.services()).get(ServiceType.BINARY)));
            if (!ch.address().equals(address)) continue;
            return ch;
        }
        throw new IllegalStateException("No DcpChannel found for partition " + partition);
    }

    private void reconfigure(CouchbaseBucketConfig config) {
        ArrayList<InetSocketAddress> toAdd = new ArrayList<InetSocketAddress>();
        ArrayList<DcpChannel> toRemove = new ArrayList<DcpChannel>();
        for (NodeInfo node : config.nodes()) {
            if (!node.services().containsKey(ServiceType.BINARY) && !node.sslServices().containsKey(ServiceType.BINARY) || !config.hasPrimaryPartitionsOnNode(node.hostname())) continue;
            InetSocketAddress address = new InetSocketAddress(node.hostname().nameOrAddress(), (int)((Integer)(this.env.sslEnabled() ? node.sslServices() : node.services()).get(ServiceType.BINARY)));
            boolean found = false;
            for (DcpChannel chan : this.channels) {
                if (!chan.address().equals(address)) continue;
                found = true;
                break;
            }
            if (found) continue;
            toAdd.add(address);
            LOGGER.debug("Planning to add {}", (Object)address);
        }
        for (DcpChannel chan : this.channels) {
            boolean found = false;
            for (NodeInfo node : config.nodes()) {
                InetSocketAddress address = new InetSocketAddress(node.hostname().nameOrAddress(), (int)((Integer)(this.env.sslEnabled() ? node.sslServices() : node.services()).get(ServiceType.BINARY)));
                if (!address.equals(chan.address())) continue;
                found = true;
                break;
            }
            if (found) continue;
            LOGGER.debug("Planning to remove {}", (Object)chan);
            toRemove.add(chan);
        }
        for (InetSocketAddress add : toAdd) {
            this.add(add);
        }
        for (DcpChannel remove : toRemove) {
            this.remove(remove);
        }
    }

    private void add(final InetSocketAddress node) {
        if (this.channels.contains(node)) {
            return;
        }
        LOGGER.debug("Adding DCP Channel against {}", (Object)node);
        DcpChannel channel = new DcpChannel(node, this.env, this);
        this.channels.add(channel);
        channel.connect().retryWhen((Func1)RetryBuilder.anyMatches(new Func1<Throwable, Boolean>(){

            public Boolean call(Throwable t) {
                return !Conductor.this.stopped;
            }
        }).max(this.env.dcpChannelsReconnectMaxAttempts()).delay(this.env.dcpChannelsReconnectDelay()).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>(){

            public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", (Object)node);
            }
        }).build()).subscribe(new CompletableSubscriber(){

            public void onCompleted() {
                LOGGER.debug("Completed Node connect for DCP channel {}", (Object)node);
            }

            public void onError(Throwable e) {
                LOGGER.warn("Got error during connect (maybe retried) for node {}" + node, e);
                if (Conductor.this.env.eventBus() != null) {
                    Conductor.this.env.eventBus().publish((CouchbaseEvent)new FailedToAddNodeEvent(node, e));
                }
            }

            public void onSubscribe(Subscription d) {
            }
        });
    }

    private void remove(final DcpChannel node) {
        if (this.channels.remove((Object)node)) {
            LOGGER.debug("Removing DCP Channel against {}", (Object)node);
            node.disconnect().subscribe(new CompletableSubscriber(){

                public void onCompleted() {
                    LOGGER.debug("Channel remove notified as complete for {}", (Object)node.address());
                }

                public void onError(Throwable e) {
                    LOGGER.warn("Got error during Node removal for node {}" + node.address(), e);
                    if (Conductor.this.env.eventBus() != null) {
                        Conductor.this.env.eventBus().publish((CouchbaseEvent)new FailedToRemoveNodeEvent(node.address(), e));
                    }
                }

                public void onSubscribe(Subscription d) {
                }
            });
        }
    }

    void maybeMovePartition(final short partition) {
        Observable.timer((long)50L, (TimeUnit)TimeUnit.MILLISECONDS).filter((Func1)new Func1<Long, Boolean>(){

            public Boolean call(Long aLong) {
                PartitionState ps = Conductor.this.sessionState.get(partition);
                boolean desiredSeqnoReached = ps.isAtEnd();
                if (desiredSeqnoReached) {
                    LOGGER.debug("Reached desired high seqno {} for vbucket {}, not reopening stream.", (Object)ps.getEndSeqno(), (Object)partition);
                }
                return !desiredSeqnoReached;
            }
        }).flatMap(new Func1<Long, Observable<?>>(){

            public Observable<?> call(Long aLong) {
                PartitionState ps = Conductor.this.sessionState.get(partition);
                return Conductor.this.startStreamForPartition(partition, ps.getLastUuid(), ps.getStartSeqno(), ps.getEndSeqno(), ps.getSnapshotStartSeqno(), ps.getSnapshotEndSeqno()).retryWhen((Func1)RetryBuilder.anyOf(NotMyVbucketException.class).max(Integer.MAX_VALUE).delay(Delay.fixed((long)200L, (TimeUnit)TimeUnit.MILLISECONDS)).build()).toObservable();
            }
        }).toCompletable().subscribe(new CompletableSubscriber(){

            public void onCompleted() {
                LOGGER.trace("Completed Partition Move for partition {}", (Object)partition);
            }

            public void onError(Throwable e) {
                LOGGER.warn("Error during Partition Move for partition " + partition, e);
                if (Conductor.this.env.eventBus() != null) {
                    Conductor.this.env.eventBus().publish((CouchbaseEvent)new FailedToMovePartitionEvent(partition, e));
                }
            }

            public void onSubscribe(Subscription d) {
                LOGGER.debug("Subscribing for Partition Move for partition {}", (Object)partition);
            }
        });
    }
}

