/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.conductor;

import com.couchbase.client.dcp.buffer.DcpBucketConfig;
import com.couchbase.client.dcp.conductor.BucketConfigArbiter;
import com.couchbase.client.dcp.conductor.DcpChannel;
import com.couchbase.client.dcp.conductor.NotMyVbucketException;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.core.config.NodeInfo;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import com.couchbase.client.dcp.core.state.LifecycleState;
import com.couchbase.client.dcp.core.state.NotConnectedException;
import com.couchbase.client.dcp.core.time.Delay;
import com.couchbase.client.dcp.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.events.FailedToAddNodeEvent;
import com.couchbase.client.dcp.events.FailedToMovePartitionEvent;
import com.couchbase.client.dcp.events.FailedToRemoveNodeEvent;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.highlevel.internal.CollectionsManifest;
import com.couchbase.client.dcp.highlevel.internal.KeyExtractor;
import com.couchbase.client.dcp.metrics.DcpClientMetrics;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.util.retry.RetryBuilder;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Observable;
import rx.Single;
import rx.Subscription;
import rx.functions.Action4;
import rx.functions.Func1;

public class Conductor {
    private static final Logger LOGGER = LoggerFactory.getLogger(Conductor.class);
    private final BucketConfigArbiter bucketConfigArbiter;
    private final Set<DcpChannel> channels = ConcurrentHashMap.newKeySet();
    private volatile boolean stopped = true;
    private final ClientEnvironment env;
    private final AtomicReference<DcpBucketConfig> currentConfig = new AtomicReference();
    private final SessionState sessionState = new SessionState();
    private final DcpClientMetrics metrics;

    public Conductor(ClientEnvironment env, DcpClientMetrics metrics) {
        this.metrics = Objects.requireNonNull(metrics);
        this.env = env;
        this.bucketConfigArbiter = new BucketConfigArbiter(env);
        this.bucketConfigArbiter.configs().forEach(config -> {
            LOGGER.trace("Applying new configuration, new rev is {}.", (Object)config.rev());
            this.currentConfig.set((DcpBucketConfig)config);
            this.reconfigure((DcpBucketConfig)config);
        });
    }

    public SessionState sessionState() {
        return this.sessionState;
    }

    public Completable connect() {
        this.stopped = false;
        this.env.clusterAt().forEach(h -> this.add(h.toAddress()));
        long bootstrapTimeoutMillis = this.env.bootstrapTimeout().toMillis() + this.env.configRefreshInterval().toMillis();
        Completable atLeastOneConfig = this.bucketConfigArbiter.configs().filter(config -> config.numberOfPartitions() != 0).first().toCompletable().timeout(bootstrapTimeoutMillis, TimeUnit.MILLISECONDS).doOnError(throwable -> LOGGER.warn("Did not receive initial configuration from cluster within {}ms", (Object)bootstrapTimeoutMillis));
        return atLeastOneConfig;
    }

    BucketConfigArbiter bucketConfigArbiter() {
        return this.bucketConfigArbiter;
    }

    public boolean disconnected() {
        return this.channels.stream().allMatch(c -> c.isState(LifecycleState.DISCONNECTED));
    }

    public Completable stop() {
        LOGGER.debug("Instructed to shutdown.");
        this.stopped = true;
        Completable channelShutdown = Observable.from(this.channels).flatMapCompletable(DcpChannel::disconnect).toCompletable();
        return channelShutdown.doOnCompleted(() -> LOGGER.info("Shutdown complete."));
    }

    public int numberOfPartitions() {
        return this.currentConfig.get().numberOfPartitions();
    }

    public Observable<ByteBuf> getSeqnos() {
        return Observable.from(this.channels).flatMap(this::getSeqnosForChannel);
    }

    private Observable<ByteBuf> getSeqnosForChannel(DcpChannel channel) {
        return Observable.just((Object)channel).flatMapSingle(DcpChannel::getSeqnos).retryWhen((Func1)RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed(200L, TimeUnit.MILLISECONDS)).doOnRetry((Action4<Integer, Throwable, Long, TimeUnit>)((Action4)(retry, cause, delay, delayUnit) -> LOGGER.debug("Rescheduling get Seqnos for channel {}, not connected (yet).", (Object)channel))).build());
    }

    public Single<ByteBuf> getFailoverLog(short partition) {
        return Observable.just((Object)partition).map(ignored -> this.activeChannelByPartition(partition)).flatMapSingle(channel -> channel.getFailoverLog(partition)).retryWhen((Func1)RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed(200L, TimeUnit.MILLISECONDS)).doOnRetry((Action4<Integer, Throwable, Long, TimeUnit>)((Action4)(retry, cause, delay, delayUnit) -> LOGGER.debug("Rescheduling Get Failover Log for vbid {}, not connected (yet).", (Object)partition))).build()).toSingle();
    }

    public Completable startStreamForPartition(short partition, StreamOffset startOffset, long endSeqno) {
        return Observable.just((Object)partition).map(ignored -> this.activeChannelByPartition(partition)).flatMapCompletable(channel -> channel.getCollectionsManifest().flatMapCompletable(manifest -> {
            CollectionsManifest m = manifest.orElse(CollectionsManifest.DEFAULT);
            PartitionState ps = this.sessionState.get(partition);
            ps.setCollectionsManifest(m);
            ps.setKeyExtractor(manifest.isPresent() ? KeyExtractor.COLLECTIONS : KeyExtractor.NO_COLLECTIONS);
            return channel.openStream(partition, startOffset, endSeqno, m);
        })).retryWhen((Func1)RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed(200L, TimeUnit.MILLISECONDS)).doOnRetry((Action4<Integer, Throwable, Long, TimeUnit>)((Action4)(retry, cause, delay, delayUnit) -> LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", (Object)partition))).build()).toCompletable();
    }

    public Completable stopStreamForPartition(short partition) {
        if (this.streamIsOpen(partition)) {
            DcpChannel channel = this.activeChannelByPartition(partition);
            return channel.closeStream(partition);
        }
        return Completable.complete();
    }

    public boolean streamIsOpen(short partition) {
        DcpChannel channel = this.activeChannelByPartition(partition);
        return channel.streamIsOpen(partition);
    }

    private DcpChannel activeChannelByPartition(short partition) {
        InetSocketAddress address = this.currentConfig.get().getActiveNodeKvAddress(partition).toAddress();
        for (DcpChannel ch : this.channels) {
            if (!ch.address().equals(address)) continue;
            return ch;
        }
        throw new IllegalStateException("No DcpChannel found for partition " + partition);
    }

    private void reconfigure(DcpBucketConfig configHelper) {
        this.metrics.incrementReconfigure();
        List<NodeInfo> nodes = configHelper.getDataNodes();
        if (nodes.isEmpty()) {
            throw new IllegalStateException("Bucket config helper returned no data nodes");
        }
        Map<InetSocketAddress, DcpChannel> existingChannelsByAddress = this.channels.stream().collect(Collectors.toMap(DcpChannel::address, c -> c));
        Set nodeAddresses = nodes.stream().map(configHelper::getAddress).collect(Collectors.toSet());
        for (InetSocketAddress inetSocketAddress : nodeAddresses) {
            if (existingChannelsByAddress.containsKey(inetSocketAddress)) continue;
            this.metrics.incrementAddChannel();
            this.add(inetSocketAddress);
        }
        for (Map.Entry entry : existingChannelsByAddress.entrySet()) {
            if (nodeAddresses.contains(entry.getKey())) continue;
            this.metrics.incrementRemoveChannel();
            this.remove((DcpChannel)entry.getValue());
        }
    }

    private void add(final InetSocketAddress node) {
        LOGGER.info("Adding DCP Channel against {}", (Object)RedactableArgument.system(node));
        DcpChannel channel = new DcpChannel(node, this.env, this);
        if (!this.channels.add(channel)) {
            throw new IllegalStateException("Tried to add duplicate channel: " + RedactableArgument.system(channel));
        }
        channel.connect().retryWhen((Func1)RetryBuilder.anyMatches((Func1<Throwable, Boolean>)((Func1)t -> !this.stopped)).max(this.env.dcpChannelsReconnectMaxAttempts()).delay(this.env.dcpChannelsReconnectDelay()).doOnRetry((Action4<Integer, Throwable, Long, TimeUnit>)((Action4)(retry, cause, delay, delayUnit) -> LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", (Object)node))).build()).subscribe(new CompletableSubscriber(){

            public void onCompleted() {
                LOGGER.debug("Completed Node connect for DCP channel {}", (Object)node);
            }

            public void onError(Throwable e) {
                LOGGER.warn("Got error during connect (maybe retried) for node {}", (Object)RedactableArgument.system(node), (Object)e);
                if (Conductor.this.env.eventBus() != null) {
                    Conductor.this.env.eventBus().publish(new FailedToAddNodeEvent(node, e));
                }
            }

            public void onSubscribe(Subscription d) {
            }
        });
    }

    private void remove(final DcpChannel node) {
        if (!this.channels.remove(node)) {
            throw new IllegalStateException("Tried to remove unknown channel: " + RedactableArgument.system(node));
        }
        LOGGER.info("Removing DCP Channel against {}", (Object)RedactableArgument.system(node));
        for (short partition = 0; partition < node.streamIsOpen.length(); partition = (short)(partition + 1)) {
            if (!node.streamIsOpen(partition)) continue;
            this.maybeMovePartition(partition);
        }
        node.disconnect().subscribe(new CompletableSubscriber(){

            public void onCompleted() {
                LOGGER.debug("Channel remove notified as complete for {}", (Object)node.address());
            }

            public void onError(Throwable e) {
                LOGGER.warn("Got error during Node removal for node {}", (Object)RedactableArgument.system(node.address()), (Object)e);
                if (Conductor.this.env.eventBus() != null) {
                    Conductor.this.env.eventBus().publish(new FailedToRemoveNodeEvent(node.address(), e));
                }
            }

            public void onSubscribe(Subscription d) {
            }
        });
    }

    void maybeMovePartition(final short partition) {
        Observable.timer((long)50L, (TimeUnit)TimeUnit.MILLISECONDS).filter(ignored -> {
            PartitionState ps = this.sessionState.get(partition);
            boolean desiredSeqnoReached = ps.isAtEnd();
            if (desiredSeqnoReached) {
                LOGGER.debug("Reached desired high seqno {} for vbucket {}, not reopening stream.", (Object)ps.getEndSeqno(), (Object)partition);
            }
            return !desiredSeqnoReached;
        }).flatMapCompletable(ignored -> {
            PartitionState ps = this.sessionState.get(partition);
            return this.startStreamForPartition(partition, ps.getOffset(), ps.getEndSeqno()).retryWhen((Func1)RetryBuilder.anyOf(NotMyVbucketException.class).max(Integer.MAX_VALUE).delay(Delay.fixed(200L, TimeUnit.MILLISECONDS)).build());
        }).toCompletable().subscribe(new CompletableSubscriber(){

            public void onCompleted() {
                LOGGER.trace("Completed Partition Move for partition {}", (Object)partition);
            }

            public void onError(Throwable e) {
                if (e instanceof RollbackException) {
                    LOGGER.warn("Rollback during Partition Move for partition {}", (Object)partition);
                } else {
                    LOGGER.warn("Error during Partition Move for partition {}", (Object)partition, (Object)e);
                }
                if (Conductor.this.env.eventBus() != null) {
                    Conductor.this.env.eventBus().publish(new FailedToMovePartitionEvent(partition, e));
                }
            }

            public void onSubscribe(Subscription d) {
                LOGGER.debug("Subscribing for Partition Move for partition {}", (Object)partition);
            }
        });
    }
}

