/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp;

import com.couchbase.client.dcp.ConnectionNameGenerator;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.Credentials;
import com.couchbase.client.dcp.CredentialsProvider;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.DefaultConnectionNameGenerator;
import com.couchbase.client.dcp.StaticCredentialsProvider;
import com.couchbase.client.dcp.StreamFrom;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.SystemEventHandler;
import com.couchbase.client.dcp.conductor.Conductor;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.config.CompressionMode;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.config.HostAndPort;
import com.couchbase.client.dcp.core.env.NetworkResolution;
import com.couchbase.client.dcp.core.event.EventBus;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import com.couchbase.client.dcp.core.time.Delay;
import com.couchbase.client.dcp.core.utils.CbCollections;
import com.couchbase.client.dcp.core.utils.ConnectionString;
import com.couchbase.client.dcp.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.dcp.deps.io.netty.channel.EventLoopGroup;
import com.couchbase.client.dcp.deps.io.netty.channel.epoll.Epoll;
import com.couchbase.client.dcp.deps.io.netty.channel.epoll.EpollEventLoopGroup;
import com.couchbase.client.dcp.deps.io.netty.channel.kqueue.KQueue;
import com.couchbase.client.dcp.deps.io.netty.channel.kqueue.KQueueEventLoopGroup;
import com.couchbase.client.dcp.deps.io.netty.channel.nio.NioEventLoopGroup;
import com.couchbase.client.dcp.deps.io.netty.util.concurrent.DefaultThreadFactory;
import com.couchbase.client.dcp.error.BootstrapException;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.events.StreamEndEvent;
import com.couchbase.client.dcp.highlevel.DatabaseChangeListener;
import com.couchbase.client.dcp.highlevel.FlowControlMode;
import com.couchbase.client.dcp.highlevel.SnapshotMarker;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.highlevel.internal.AsyncEventDispatcher;
import com.couchbase.client.dcp.highlevel.internal.CollectionsManifest;
import com.couchbase.client.dcp.highlevel.internal.EventHandlerAdapter;
import com.couchbase.client.dcp.highlevel.internal.ImmediateEventDispatcher;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSeqnoAdvancedRequest;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.DcpSystemEvent;
import com.couchbase.client.dcp.message.DcpSystemEventRequest;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.message.StreamEndReason;
import com.couchbase.client.dcp.metrics.DcpClientMetrics;
import com.couchbase.client.dcp.metrics.MetricsContext;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.state.StateFormat;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.dcp.util.MathUtils;
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Observable;
import rx.Single;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

public class Client
implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Client.class);
    private static final ThreadFactory threadFactory = new DefaultThreadFactory("dcp-io");
    private final Conductor conductor;
    private final ClientEnvironment env;
    private final boolean bufferAckEnabled;
    private volatile AsyncEventDispatcher listenerDispatcher;
    private final AtomicBoolean hasHighLevelListener = new AtomicBoolean();

    private Client(Builder builder) {
        EventLoopGroup eventLoopGroup = builder.eventLoopGroup == null ? this.newEventLoopGroup() : builder.eventLoopGroup;
        this.env = ClientEnvironment.builder().setClusterAt(builder.clusterAt).setNetworkResolution(builder.networkResolution).setConnectionNameGenerator(builder.connectionNameGenerator).setBucket(builder.bucket).setCollectionsAware(builder.collectionsAware).setScopeId(builder.scopeId).setScopeName(builder.scopeName).setCollectionIds(builder.collectionIds).setCollectionNames(builder.collectionNames).setCredentialsProvider(builder.credentialsProvider).setDcpControl(builder.dcpControl).setEventLoopGroup(eventLoopGroup, builder.eventLoopGroup == null).setBufferAckWatermark(builder.bufferAckWatermark).setBufferPooling(builder.poolBuffers).setConnectTimeout(builder.connectTimeout).setConfigRefreshInterval(builder.configRefreshInterval).setBootstrapTimeout(builder.bootstrapTimeout).setSocketConnectTimeout(builder.socketConnectTimeout).setDcpChannelsReconnectDelay(builder.dcpChannelsReconnectDelay).setDcpChannelsReconnectMaxAttempts(builder.dcpChannelsReconnectMaxAttempts).setEventBus(builder.eventBus).setSslEnabled(builder.sslEnabled).setSslKeystoreFile(builder.sslKeystoreFile).setSslKeystorePassword(builder.sslKeystorePassword).setSslKeystore(builder.sslKeystore).setPersistencePollingIntervalMillis(builder.persistencePollingIntervalMillis).build();
        this.bufferAckEnabled = this.env.dcpControl().bufferAckEnabled();
        if (this.bufferAckEnabled && this.env.bufferAckWatermark() == 0) {
            throw new IllegalArgumentException("The bufferAckWatermark needs to be set if bufferAck is enabled.");
        }
        this.controlEventHandler((flowController, event) -> {
            try {
                if (DcpSnapshotMarkerRequest.is(event)) {
                    flowController.ack(event);
                }
            }
            finally {
                event.release();
            }
        });
        this.dataEventHandler((flowController, event) -> {
            try {
                flowController.ack(event);
            }
            finally {
                event.release();
            }
        });
        MetricsContext metricsContext = new MetricsContext("dcp");
        this.conductor = new Conductor(this.env, new DcpClientMetrics(metricsContext));
        LOGGER.info("Environment Configuration Used: {}", (Object)RedactableArgument.system(this.env));
    }

    private EventLoopGroup newEventLoopGroup() {
        if (Epoll.isAvailable()) {
            LOGGER.info("Using Netty epoll native transport.");
            return new EpollEventLoopGroup(threadFactory);
        }
        if (KQueue.isAvailable()) {
            LOGGER.info("Using Netty kqueue native transport.");
            return new KQueueEventLoopGroup(threadFactory);
        }
        LOGGER.info("Using Netty NIO transport.");
        return new NioEventLoopGroup(threadFactory);
    }

    @Deprecated
    public static Builder configure() {
        return Client.builder();
    }

    public static Builder builder() {
        return new Builder();
    }

    private Observable<PartitionAndSeqno> getSeqnos() {
        return this.conductor.getSeqnos().flatMap((Func1)new Func1<ByteBuf, Observable<PartitionAndSeqno>>(){

            public Observable<PartitionAndSeqno> call(ByteBuf buf) {
                int numPairs = buf.readableBytes() / 10;
                ArrayList<PartitionAndSeqno> pairs = new ArrayList<PartitionAndSeqno>(numPairs);
                for (int i = 0; i < numPairs; ++i) {
                    pairs.add(new PartitionAndSeqno(buf.getShort(10 * i), buf.getLong(10 * i + 2)));
                }
                buf.release();
                return Observable.from(pairs);
            }
        });
    }

    public SessionState sessionState() {
        return this.conductor.sessionState();
    }

    public void listener(DatabaseChangeListener listener, FlowControlMode flowControlMode) {
        if (!this.hasHighLevelListener.compareAndSet(false, true)) {
            throw new IllegalStateException("Listener may only be set once.");
        }
        if (flowControlMode == FlowControlMode.AUTOMATIC && !this.bufferAckEnabled) {
            throw new IllegalStateException("Can't register listener in automatic flow control mode because the DCP client was not configured for flow control. Make sure to call flowControl(bufferSizeInBytes) when building the DCP client.");
        }
        this.listenerDispatcher = new AsyncEventDispatcher(flowControlMode, listener);
        EventHandlerAdapter.register(this, this.listenerDispatcher);
    }

    public void nonBlockingListener(DatabaseChangeListener listener) {
        if (!this.hasHighLevelListener.compareAndSet(false, true)) {
            throw new IllegalStateException("Listener may only be set once.");
        }
        EventHandlerAdapter.register(this, new ImmediateEventDispatcher(listener));
    }

    public void controlEventHandler(final ControlEventHandler controlEventHandler) {
        final boolean userHandlerWantsFailoverLogs = controlEventHandler instanceof EventHandlerAdapter;
        this.env.setControlEventHandler(new ControlEventHandler(){

            @Override
            public void onEvent(ChannelFlowController flowController, ByteBuf event) {
                if (DcpSnapshotMarkerRequest.is(event)) {
                    short partition = DcpSnapshotMarkerRequest.partition(event);
                    PartitionState ps = Client.this.sessionState().get(partition);
                    ps.setSnapshot(new SnapshotMarker(DcpSnapshotMarkerRequest.startSeqno(event), DcpSnapshotMarkerRequest.endSeqno(event)));
                    Client.this.sessionState().set(partition, ps);
                } else if (DcpFailoverLogResponse.is(event)) {
                    Client.this.handleFailoverLogResponse(event);
                    if (!userHandlerWantsFailoverLogs) {
                        event.release();
                        return;
                    }
                } else if (RollbackMessage.is(event)) {
                    LOGGER.warn("Received rollback for vbucket {} to seqno {}", (Object)RollbackMessage.vbucket(event), (Object)RollbackMessage.seqno(event));
                } else if (DcpSeqnoAdvancedRequest.is(event)) {
                    this.handleSeqnoAdvanced(event);
                } else if (DcpSystemEventRequest.is(event)) {
                    this.handleDcpSystemEvent(event);
                }
                controlEventHandler.onEvent(flowController, event);
            }

            private void handleSeqnoAdvanced(ByteBuf event) {
                short vbucket = MessageUtil.getVbucket(event);
                long seqno = DcpSeqnoAdvancedRequest.getSeqno(event);
                LOGGER.debug("Seqno for vbucket {} advanced to {}", (Object)vbucket, (Object)seqno);
                Client.this.sessionState().get(vbucket).setStartSeqno(seqno);
            }

            private void handleDcpSystemEvent(ByteBuf event) {
                long seqno = DcpSystemEventRequest.getSeqno(event);
                short vbucket = MessageUtil.getVbucket(event);
                PartitionState ps = Client.this.sessionState().get(vbucket);
                ps.setStartSeqno(seqno);
                DcpSystemEvent sysEvent = DcpSystemEvent.parse(event);
                if (!(sysEvent instanceof DcpSystemEvent.CollectionsManifestEvent)) {
                    LOGGER.warn("Ignoring unrecognized DCP system event!\n{}", (Object)RedactableArgument.meta(MessageUtil.humanize(event)));
                } else {
                    DcpSystemEvent.CollectionsManifestEvent manifestEvent = (DcpSystemEvent.CollectionsManifestEvent)((Object)sysEvent);
                    CollectionsManifest existingManifest = ps.getCollectionsManifest();
                    ps.setCollectionsManifestUid(manifestEvent.getManifestId());
                    if (MathUtils.lessThanUnsigned(manifestEvent.getManifestId(), existingManifest.getId())) {
                        LOGGER.debug("Ignoring collection manifest event; UID {} is < current manifest UID {}", (Object)manifestEvent.getManifestId(), (Object)existingManifest.getId());
                    } else {
                        LOGGER.debug("Applying collection manifest change; UID {} is >= current manifest UID {}", (Object)manifestEvent.getManifestId(), (Object)existingManifest.getId());
                        ps.setCollectionsManifest(manifestEvent.apply(existingManifest));
                    }
                }
            }
        });
    }

    public void systemEventHandler(SystemEventHandler systemEventHandler) {
        this.env.setSystemEventHandler(systemEventHandler);
    }

    private void handleFailoverLogResponse(ByteBuf event) {
        short partition = DcpFailoverLogResponse.vbucket(event);
        PartitionState ps = this.sessionState().get(partition);
        ps.setFailoverLog(DcpFailoverLogResponse.entries(event));
        this.sessionState().set(partition, ps);
    }

    public void dataEventHandler(final DataEventHandler dataEventHandler) {
        this.env.setDataEventHandler(new DataEventHandler(){

            @Override
            public void onEvent(ChannelFlowController flowController, ByteBuf event) {
                if (DcpMutationMessage.is(event)) {
                    short partition = DcpMutationMessage.partition(event);
                    PartitionState ps = Client.this.sessionState().get(partition);
                    ps.setStartSeqno(DcpMutationMessage.bySeqno(event));
                    Client.this.sessionState().set(partition, ps);
                } else if (DcpDeletionMessage.is(event)) {
                    short partition = DcpDeletionMessage.partition(event);
                    PartitionState ps = Client.this.sessionState().get(partition);
                    ps.setStartSeqno(DcpDeletionMessage.bySeqno(event));
                    Client.this.sessionState().set(partition, ps);
                } else if (DcpExpirationMessage.is(event)) {
                    short partition = DcpExpirationMessage.partition(event);
                    PartitionState ps = Client.this.sessionState().get(partition);
                    ps.setStartSeqno(DcpExpirationMessage.bySeqno(event));
                    Client.this.sessionState().set(partition, ps);
                }
                dataEventHandler.onEvent(flowController, event);
            }
        });
    }

    public Completable connect() {
        if (!this.conductor.disconnected()) {
            LOGGER.debug("Ignoring duplicate connect attempt, already connecting/connected.");
            return Completable.complete();
        }
        if (this.env.dataEventHandler() == null) {
            throw new IllegalArgumentException("A DataEventHandler needs to be provided!");
        }
        if (this.env.controlEventHandler() == null) {
            throw new IllegalArgumentException("A ControlEventHandler needs to be provided!");
        }
        LOGGER.info("Connecting to seed nodes and bootstrapping bucket {}.", (Object)RedactableArgument.meta(this.env.bucket()));
        return this.conductor.connect().onErrorResumeNext((Func1)new Func1<Throwable, Completable>(){

            public Completable call(Throwable throwable) {
                return Client.this.disconnect().andThen(Completable.error((Throwable)new BootstrapException("Could not connect to Cluster/Bucket", throwable)));
            }
        });
    }

    public Completable disconnect() {
        return this.dispatcherGracefulShutdown().andThen(this.conductor.stop()).andThen(this.env.shutdown()).andThen(this.dispatcherAwaitShutdown());
    }

    @Override
    public void close() {
        this.disconnect().await(60L, TimeUnit.SECONDS);
    }

    private Completable dispatcherGracefulShutdown() {
        return Completable.fromAction(() -> {
            if (this.listenerDispatcher != null) {
                LOGGER.info("Asking event dispatcher to shut down.");
                this.listenerDispatcher.gracefulShutdown();
            }
        });
    }

    private Completable dispatcherAwaitShutdown() {
        return Completable.fromCallable(() -> {
            long startNanos = System.nanoTime();
            if (this.listenerDispatcher != null && !this.listenerDispatcher.awaitTermination(Duration.ofSeconds(30L))) {
                LOGGER.info("Forcing event dispatcher to shut down.");
                this.listenerDispatcher.shutdownNow();
                if (!this.listenerDispatcher.awaitTermination(Duration.ofSeconds(10L))) {
                    LOGGER.warn("Event dispatcher still hasn't terminated after {} seconds.", (Object)TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startNanos));
                }
            }
            return null;
        }).subscribeOn(Schedulers.io());
    }

    public Completable resumeStreaming(Map<Integer, StreamOffset> vbucketToOffset) {
        if (vbucketToOffset.isEmpty()) {
            return Completable.complete();
        }
        vbucketToOffset.forEach((partition, offset) -> this.sessionState().set((int)partition, PartitionState.fromOffset(offset)));
        return this.startStreaming(Client.toBoxedShortArray(vbucketToOffset.keySet()));
    }

    private static Short[] toBoxedShortArray(Iterable<? extends Number> input) {
        ArrayList shorts = new ArrayList();
        input.forEach(i -> shorts.add(i.shortValue()));
        return shorts.toArray(new Short[0]);
    }

    public Completable startStreaming(Short ... vbids) {
        int numPartitions = this.numPartitions();
        List<Short> partitions = Client.partitionsForVbids(numPartitions, vbids);
        List<Short> initializedPartitions = this.selectInitializedPartitions(numPartitions, partitions);
        ArrayList<Short> noopPartitions = new ArrayList<Short>();
        for (short p2 : partitions) {
            if (initializedPartitions.contains(p2)) continue;
            noopPartitions.add(p2);
        }
        if (!noopPartitions.isEmpty()) {
            LOGGER.info("Immediately sending stream end events for {} partitions already at desired end.", (Object)noopPartitions.size());
            LOGGER.debug("Immediately sending stream end events for partitions already at desired end: {}", noopPartitions);
            noopPartitions.forEach(p -> this.env.eventBus().publish(new StreamEndEvent((short)p, StreamEndReason.OK)));
        }
        if (initializedPartitions.isEmpty()) {
            LOGGER.info("The configured session state does not require any streams to be opened. Completing immediately.");
            return Completable.complete();
        }
        LOGGER.info("Starting to Stream for " + initializedPartitions.size() + " partitions");
        LOGGER.debug("Stream start against partitions: {}", initializedPartitions);
        return Observable.from(initializedPartitions).flatMapCompletable((Func1)new Func1<Short, Completable>(){

            public Completable call(Short partition) {
                PartitionState partitionState = Client.this.sessionState().get(partition.shortValue());
                return Client.this.conductor.startStreamForPartition(partition, partitionState.getOffset(), partitionState.getEndSeqno()).onErrorResumeNext(throwable -> throwable instanceof RollbackException ? Completable.complete() : Completable.error((Throwable)throwable));
            }
        }).toCompletable();
    }

    private List<Short> selectInitializedPartitions(int clusterPartitions, List<Short> partitions) {
        ArrayList<Short> initializedPartitions = new ArrayList<Short>();
        SessionState state = this.sessionState();
        for (short partition : partitions) {
            PartitionState ps = state.get(partition);
            if (ps != null) {
                if (MathUtils.lessThanUnsigned(ps.getStartSeqno(), ps.getEndSeqno())) {
                    initializedPartitions.add(partition);
                    continue;
                }
                LOGGER.debug("Skipping partition {}, because startSeqno({}) >= endSeqno({})", new Object[]{partition, ps.getStartSeqno(), ps.getEndSeqno()});
                continue;
            }
            LOGGER.debug("Skipping partition {}, because its state is null", (Object)partition);
        }
        if (initializedPartitions.size() > clusterPartitions) {
            throw new IllegalStateException("Session State has " + initializedPartitions + " partitions while the cluster has " + clusterPartitions + "!");
        }
        return initializedPartitions;
    }

    public Completable stopStreaming(Short ... vbids) {
        List<Short> partitions = Client.partitionsForVbids(this.numPartitions(), vbids);
        LOGGER.info("Stopping to Stream for " + partitions.size() + " partitions");
        LOGGER.debug("Stream stop against partitions: {}", partitions);
        return Observable.from(partitions).flatMapCompletable((Func1)new Func1<Short, Completable>(){

            public Completable call(Short p) {
                return Client.this.conductor.stopStreamForPartition(p);
            }
        }).toCompletable();
    }

    private static List<Short> partitionsForVbids(int numPartitions, Short ... vbids) {
        if (vbids.length > 0) {
            Arrays.sort((Object[])vbids);
            return Arrays.asList(vbids);
        }
        ArrayList<Short> partitions = new ArrayList<Short>(vbids.length);
        for (short i = 0; i < numPartitions; i = (short)(i + 1)) {
            partitions.add(i);
        }
        return partitions;
    }

    public Observable<ByteBuf> failoverLogs(Short ... vbids) {
        List<Short> partitions = Client.partitionsForVbids(this.numPartitions(), vbids);
        LOGGER.debug("Asking for failover logs on partitions {}", partitions);
        return Observable.from(partitions).flatMapSingle((Func1)new Func1<Short, Single<ByteBuf>>(){

            public Single<ByteBuf> call(Short p) {
                return Client.this.conductor.getFailoverLog(p);
            }
        });
    }

    public Completable rollbackAndRestartStream(final short partition, final long seqno) {
        return this.stopStreaming(partition).andThen(Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(CompletableSubscriber subscriber) {
                Client.this.sessionState().rollbackToPosition(partition, seqno);
                subscriber.onCompleted();
            }
        })).andThen(this.startStreaming(partition));
    }

    public int numPartitions() {
        return this.conductor.numberOfPartitions();
    }

    public boolean streamIsOpen(short vbid) {
        return this.conductor.streamIsOpen(vbid);
    }

    public Completable initializeState(StreamFrom from, StreamTo to) {
        if (from == StreamFrom.BEGINNING && to == StreamTo.INFINITY) {
            Client.buzzMe();
            return this.initFromBeginningToInfinity();
        }
        if (from == StreamFrom.BEGINNING && to == StreamTo.NOW) {
            return this.initFromBeginningToNow();
        }
        if (from == StreamFrom.NOW && to == StreamTo.INFINITY) {
            Client.buzzMe();
            return this.initFromNowToInfinity();
        }
        throw new IllegalStateException("Unsupported FROM/TO combination: " + (Object)((Object)from) + " -> " + (Object)((Object)to));
    }

    public Completable recoverState(final StateFormat format, final byte[] persistedState) {
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(CompletableSubscriber subscriber) {
                LOGGER.info("Recovering state from format {}", (Object)format);
                LOGGER.debug("PersistedState on recovery is: {}", (Object)new String(persistedState, StandardCharsets.UTF_8));
                try {
                    if (format == StateFormat.JSON) {
                        Client.this.sessionState().setFromJson(persistedState);
                        subscriber.onCompleted();
                    } else {
                        subscriber.onError((Throwable)new IllegalStateException("Unsupported StateFormat " + (Object)((Object)format)));
                    }
                }
                catch (Exception ex) {
                    subscriber.onError((Throwable)ex);
                }
            }
        });
    }

    public Completable recoverOrInitializeState(StateFormat format, byte[] persistedState, StreamFrom from, StreamTo to) {
        if (persistedState == null || persistedState.length == 0) {
            return this.initializeState(from, to);
        }
        return this.recoverState(format, persistedState);
    }

    private Completable initFromBeginningToInfinity() {
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(CompletableSubscriber subscriber) {
                LOGGER.info("Initializing state from beginning to no end.");
                try {
                    Client.this.sessionState().setToBeginningWithNoEnd(Client.this.numPartitions());
                    subscriber.onCompleted();
                }
                catch (Exception ex) {
                    LOGGER.warn("Failed to initialize state from beginning to no end.", (Throwable)ex);
                    subscriber.onError((Throwable)ex);
                }
            }
        });
    }

    private Completable initFromNowToInfinity() {
        return this.initWithCallback(new Action1<PartitionAndSeqno>(){

            public void call(PartitionAndSeqno partitionAndSeqno) {
                short partition = partitionAndSeqno.partition();
                long seqno = partitionAndSeqno.seqno();
                PartitionState partitionState = Client.this.sessionState().get(partition);
                partitionState.setStartSeqno(seqno);
                partitionState.setSnapshot(new SnapshotMarker(seqno, seqno));
                Client.this.sessionState().set(partition, partitionState);
            }
        });
    }

    private Completable initFromBeginningToNow() {
        return this.initWithCallback(new Action1<PartitionAndSeqno>(){

            public void call(PartitionAndSeqno partitionAndSeqno) {
                short partition = partitionAndSeqno.partition();
                long seqno = partitionAndSeqno.seqno();
                PartitionState partitionState = Client.this.sessionState().get(partition);
                partitionState.setEndSeqno(seqno);
                Client.this.sessionState().set(partition, partitionState);
            }
        });
    }

    private Completable initWithCallback(Action1<PartitionAndSeqno> callback) {
        this.sessionState().setToBeginningWithNoEnd(this.numPartitions());
        return this.getSeqnos().doOnNext(callback).reduce(new ArrayList(), (Func2)new Func2<List<Short>, PartitionAndSeqno, List<Short>>(){

            public List<Short> call(List<Short> partitions, PartitionAndSeqno partitionAndSeqno) {
                partitions.add(partitionAndSeqno.partition());
                return partitions;
            }
        }).flatMap((Func1)new Func1<List<Short>, Observable<ByteBuf>>(){

            public Observable<ByteBuf> call(List<Short> partitions) {
                return Client.this.failoverLogs(partitions.toArray(new Short[0]));
            }
        }).map((Func1)new Func1<ByteBuf, Short>(){

            public Short call(ByteBuf buf) {
                short partition = DcpFailoverLogResponse.vbucket(buf);
                Client.this.handleFailoverLogResponse(buf);
                buf.release();
                return partition;
            }
        }).last().toCompletable();
    }

    private static void buzzMe() {
        LOGGER.debug("To Infinity... AND BEYOND!");
    }

    public static class Builder {
        private List<HostAndPort> clusterAt = Collections.singletonList(new HostAndPort("127.0.0.1", 0));
        private NetworkResolution networkResolution = NetworkResolution.AUTO;
        private EventLoopGroup eventLoopGroup;
        private String bucket = "default";
        private boolean collectionsAware;
        private Set<Long> collectionIds = new HashSet<Long>();
        private Set<String> collectionNames = new HashSet<String>();
        private OptionalLong scopeId = OptionalLong.empty();
        private Optional<String> scopeName = Optional.empty();
        private CredentialsProvider credentialsProvider = new StaticCredentialsProvider("", "");
        private ConnectionNameGenerator connectionNameGenerator = DefaultConnectionNameGenerator.INSTANCE;
        private DcpControl dcpControl = new DcpControl().put(DcpControl.Names.ENABLE_NOOP, "true");
        private int bufferAckWatermark;
        private boolean poolBuffers = true;
        private long connectTimeout = ClientEnvironment.DEFAULT_SOCKET_CONNECT_TIMEOUT;
        private Duration bootstrapTimeout = ClientEnvironment.DEFAULT_BOOTSTRAP_TIMEOUT;
        private Duration configRefreshInterval = ClientEnvironment.DEFAULT_CONFIG_REFRESH_INTERVAL;
        private long socketConnectTimeout = ClientEnvironment.DEFAULT_SOCKET_CONNECT_TIMEOUT;
        private int dcpChannelsReconnectMaxAttempts = Integer.MAX_VALUE;
        private Delay dcpChannelsReconnectDelay = ClientEnvironment.DEFAULT_DCP_CHANNELS_RECONNECT_DELAY;
        private EventBus eventBus;
        private boolean sslEnabled = false;
        private String sslKeystoreFile;
        private String sslKeystorePassword;
        private KeyStore sslKeystore;
        private long persistencePollingIntervalMillis;

        public Builder bufferAckWatermark(int watermark) {
            if (watermark > 100 || watermark < 0) {
                throw new IllegalArgumentException("The bufferAckWatermark is percents, so it needs to be between 0 and 100");
            }
            this.bufferAckWatermark = watermark;
            return this;
        }

        public Builder seedNodes(Collection<String> addresses) {
            ArrayList<String> asList = new ArrayList<String>(new HashSet<String>(addresses));
            this.clusterAt = Builder.getSeedNodes(ConnectionString.fromHostnames(asList));
            return this;
        }

        public Builder seedNodes(String ... addresses) {
            return this.seedNodes(Arrays.asList(addresses));
        }

        @Deprecated
        public Builder hostnames(List<String> addresses) {
            return this.seedNodes(addresses);
        }

        @Deprecated
        public Builder hostnames(String ... addresses) {
            return this.seedNodes(addresses);
        }

        public Builder connectionString(String connectionString) {
            this.clusterAt = Builder.getSeedNodes(ConnectionString.create(connectionString));
            return this;
        }

        private static List<HostAndPort> getSeedNodes(ConnectionString cs) {
            return cs.hosts().stream().map(s -> new HostAndPort(s.hostname(), s.port())).collect(Collectors.toList());
        }

        public Builder networkResolution(NetworkResolution nr) {
            this.networkResolution = Objects.requireNonNull(nr);
            return this;
        }

        public Builder eventLoopGroup(EventLoopGroup eventLoopGroup) {
            this.eventLoopGroup = eventLoopGroup;
            return this;
        }

        public Builder bucket(String bucket) {
            this.bucket = bucket;
            Credentials cred = this.credentialsProvider.get(null);
            if (cred.getUsername().isEmpty()) {
                this.username(bucket);
            }
            return this;
        }

        public Builder collectionsAware(boolean enable) {
            this.collectionsAware = enable;
            return this;
        }

        public Builder collectionNames(Collection<String> collectionNames) {
            if (collectionNames.stream().anyMatch(Objects::isNull)) {
                throw new IllegalArgumentException("Collection name must not be null");
            }
            this.collectionNames = new HashSet<String>(collectionNames);
            return this;
        }

        public Builder collectionNames(String ... collectionNames) {
            return this.collectionNames(Arrays.asList(collectionNames));
        }

        public Builder collectionIds(Collection<Long> collectionIds) {
            if (collectionIds.stream().anyMatch(Objects::isNull)) {
                throw new IllegalArgumentException("Collection ID must not be null");
            }
            this.collectionIds = new HashSet<Long>(collectionIds);
            return this;
        }

        public Builder collectionIds(long ... collectionIds) {
            return this.collectionIds(Arrays.stream(collectionIds).boxed().collect(Collectors.toList()));
        }

        public Builder scopeName(String scopeName) {
            this.scopeName = CbCollections.isNullOrEmpty(scopeName) ? Optional.empty() : Optional.of(scopeName);
            return this;
        }

        public Builder scopeId(long scopeId) {
            this.scopeId = OptionalLong.of(scopeId);
            return this;
        }

        public Builder credentials(String username, String password) {
            this.credentialsProvider = new StaticCredentialsProvider(username, password);
            return this;
        }

        @Deprecated
        public Builder username(String username) {
            Credentials cred = this.credentialsProvider.get(null);
            this.credentialsProvider = new StaticCredentialsProvider(username, cred.getPassword());
            return this;
        }

        public Builder credentialsProvider(CredentialsProvider credentialsProvider) {
            this.credentialsProvider = Objects.requireNonNull(credentialsProvider);
            return this;
        }

        @Deprecated
        public Builder password(String password) {
            Credentials cred = this.credentialsProvider.get(null);
            this.credentialsProvider = new StaticCredentialsProvider(cred.getUsername(), password);
            return this;
        }

        public Builder userAgent(String productName, String productVersion, String ... comments) {
            return this.connectionNameGenerator(DefaultConnectionNameGenerator.forProduct(productName, productVersion, comments));
        }

        public Builder connectionNameGenerator(ConnectionNameGenerator connectionNameGenerator) {
            this.connectionNameGenerator = connectionNameGenerator;
            return this;
        }

        public Builder controlParam(DcpControl.Names name, Object value) {
            this.dcpControl.put(name, value.toString());
            return this;
        }

        public Builder compression(CompressionMode compressionMode) {
            this.dcpControl.compression(compressionMode);
            return this;
        }

        public Builder poolBuffers(boolean pool) {
            this.poolBuffers = pool;
            return this;
        }

        public Builder socketConnectTimeout(long socketConnectTimeout) {
            this.socketConnectTimeout = socketConnectTimeout;
            return this;
        }

        public Builder bootstrapTimeout(Duration bootstrapTimeout) {
            this.bootstrapTimeout = bootstrapTimeout;
            return this;
        }

        public Builder configRefreshInterval(Duration configRefreshInterval) {
            if (configRefreshInterval.compareTo(Duration.ofSeconds(1L)) < 0) {
                throw new IllegalArgumentException("Minimum config refresh interval is 1 second.");
            }
            if (configRefreshInterval.compareTo(Duration.ofMinutes(2L)) > 0) {
                throw new IllegalArgumentException("Maximum config refresh interval is 2 minutes.");
            }
            this.configRefreshInterval = Objects.requireNonNull(configRefreshInterval);
            return this;
        }

        public Builder connectTimeout(long connectTimeout) {
            this.connectTimeout = connectTimeout;
            return this;
        }

        public Builder dcpChannelsReconnectMaxAttempts(int dcpChannelsReconnectMaxAttempts) {
            this.dcpChannelsReconnectMaxAttempts = dcpChannelsReconnectMaxAttempts;
            return this;
        }

        public Builder dcpChannelsReconnectDelay(Delay dcpChannelsReconnectDelay) {
            this.dcpChannelsReconnectDelay = dcpChannelsReconnectDelay;
            return this;
        }

        public Builder eventBus(EventBus eventBus) {
            this.eventBus = eventBus;
            return this;
        }

        public Builder sslEnabled(boolean sslEnabled) {
            this.sslEnabled = sslEnabled;
            return this;
        }

        public Builder sslKeystoreFile(String sslKeystoreFile) {
            this.sslKeystoreFile = sslKeystoreFile;
            return this;
        }

        public Builder sslKeystorePassword(String sslKeystorePassword) {
            this.sslKeystorePassword = sslKeystorePassword;
            return this;
        }

        public Builder sslKeystore(KeyStore sslKeystore) {
            this.sslKeystore = sslKeystore;
            return this;
        }

        public Builder mitigateRollbacks(long persistencePollingInterval, TimeUnit unit) {
            this.persistencePollingIntervalMillis = unit.toMillis(persistencePollingInterval);
            return this;
        }

        public Builder flowControl(int bufferSizeInBytes) {
            this.controlParam(DcpControl.Names.CONNECTION_BUFFER_SIZE, bufferSizeInBytes);
            if (this.bufferAckWatermark == 0) {
                this.bufferAckWatermark = 80;
            }
            return this;
        }

        public Client build() {
            boolean hasScope;
            if (this.collectionsAware && !this.dcpControl.noopEnabled()) {
                throw new IllegalStateException("Collections awareness requires NOOPs; must not disable NOOPs.");
            }
            if (this.scopeName.isPresent() && this.scopeId.isPresent()) {
                throw new IllegalStateException("May specify scope name or ID, but not both.");
            }
            boolean hasCollections = !this.collectionIds.isEmpty() || !this.collectionNames.isEmpty();
            boolean bl = hasScope = this.scopeId.isPresent() || this.scopeName.isPresent();
            if (hasCollections && hasScope) {
                throw new IllegalStateException("May specify scope or collections, but not both.");
            }
            if ((hasCollections || hasScope) && !this.collectionsAware) {
                throw new IllegalStateException("Must call collectionsAware(true) when specifying scope or collections.");
            }
            return new Client(this);
        }
    }

    private static class PartitionAndSeqno {
        private final short partition;
        private final long seqno;

        public PartitionAndSeqno(short partition, long seqno) {
            this.partition = partition;
            this.seqno = seqno;
        }

        public short partition() {
            return this.partition;
        }

        public long seqno() {
            return this.seqno;
        }
    }
}

