/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp;

import com.couchbase.client.dcp.Authenticator;
import com.couchbase.client.dcp.ConnectionNameGenerator;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.CredentialsProvider;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.DefaultConnectionNameGenerator;
import com.couchbase.client.dcp.PasswordAuthenticator;
import com.couchbase.client.dcp.SecurityConfig;
import com.couchbase.client.dcp.StaticCredentialsProvider;
import com.couchbase.client.dcp.StreamFrom;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.SystemEventHandler;
import com.couchbase.client.dcp.buffer.PersistedSeqnos;
import com.couchbase.client.dcp.buffer.StreamEventBuffer;
import com.couchbase.client.dcp.conductor.Conductor;
import com.couchbase.client.dcp.config.CompressionMode;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.config.HostAndPort;
import com.couchbase.client.dcp.core.env.NetworkResolution;
import com.couchbase.client.dcp.core.event.EventBus;
import com.couchbase.client.dcp.core.event.EventType;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import com.couchbase.client.dcp.core.time.Delay;
import com.couchbase.client.dcp.core.utils.CbCollections;
import com.couchbase.client.dcp.core.utils.ConnectionString;
import com.couchbase.client.dcp.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.dcp.deps.io.netty.channel.EventLoopGroup;
import com.couchbase.client.dcp.deps.io.netty.channel.epoll.Epoll;
import com.couchbase.client.dcp.deps.io.netty.channel.epoll.EpollEventLoopGroup;
import com.couchbase.client.dcp.deps.io.netty.channel.kqueue.KQueue;
import com.couchbase.client.dcp.deps.io.netty.channel.kqueue.KQueueEventLoopGroup;
import com.couchbase.client.dcp.deps.io.netty.channel.nio.NioEventLoopGroup;
import com.couchbase.client.dcp.deps.io.netty.util.concurrent.DefaultThreadFactory;
import com.couchbase.client.dcp.error.BootstrapException;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.events.DefaultDcpEventBus;
import com.couchbase.client.dcp.events.StreamEndEvent;
import com.couchbase.client.dcp.highlevel.DatabaseChangeListener;
import com.couchbase.client.dcp.highlevel.FlowControlMode;
import com.couchbase.client.dcp.highlevel.SnapshotMarker;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.highlevel.internal.AsyncEventDispatcher;
import com.couchbase.client.dcp.highlevel.internal.CollectionsManifest;
import com.couchbase.client.dcp.highlevel.internal.EventHandlerAdapter;
import com.couchbase.client.dcp.highlevel.internal.ImmediateEventDispatcher;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSeqnoAdvancedRequest;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.DcpSystemEvent;
import com.couchbase.client.dcp.message.DcpSystemEventRequest;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.OpenConnectionFlag;
import com.couchbase.client.dcp.message.PartitionAndSeqno;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.message.StreamEndReason;
import com.couchbase.client.dcp.metrics.DcpClientMetrics;
import com.couchbase.client.dcp.metrics.MetricsContext;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.state.StateFormat;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.dcp.util.MathUtils;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

public class Client
implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Client.class);
    private static final ThreadFactory threadFactory = new DefaultThreadFactory("dcp-io");
    private final Conductor conductor;
    private final Environment env;
    private final boolean bufferAckEnabled;
    private volatile AsyncEventDispatcher listenerDispatcher;
    private final AtomicBoolean hasHighLevelListener = new AtomicBoolean();

    private Client(Builder builder) {
        this.env = new Environment(builder);
        this.bufferAckEnabled = this.env.dcpControl().bufferAckEnabled();
        if (this.bufferAckEnabled && this.env.bufferAckWatermark() == 0) {
            throw new IllegalArgumentException("The bufferAckWatermark needs to be set if bufferAck is enabled.");
        }
        this.controlEventHandler((flowController, event) -> {
            try {
                if (DcpSnapshotMarkerRequest.is(event)) {
                    flowController.ack(event);
                }
            }
            finally {
                event.release();
            }
        });
        this.dataEventHandler((flowController, event) -> {
            try {
                flowController.ack(event);
            }
            finally {
                event.release();
            }
        });
        MetricsContext metricsContext = new MetricsContext(builder.meterRegistry, "dcp");
        this.conductor = new Conductor(this.env, new DcpClientMetrics(metricsContext));
        LOGGER.info("Environment Configuration Used: {}", (Object)RedactableArgument.system(this.env));
    }

    private static EventLoopGroup newEventLoopGroup() {
        if (Epoll.isAvailable()) {
            LOGGER.info("Using Netty epoll native transport.");
            return new EpollEventLoopGroup(threadFactory);
        }
        if (KQueue.isAvailable()) {
            LOGGER.info("Using Netty kqueue native transport.");
            return new KQueueEventLoopGroup(threadFactory);
        }
        LOGGER.info("Using Netty NIO transport.");
        return new NioEventLoopGroup(threadFactory);
    }

    @Deprecated
    public static Builder configure() {
        return Client.builder();
    }

    public static Builder builder() {
        return new Builder();
    }

    public Flux<PartitionAndSeqno> getSeqnos() {
        return this.conductor.getSeqnos();
    }

    public SessionState sessionState() {
        return this.conductor.sessionState();
    }

    public void listener(DatabaseChangeListener listener, FlowControlMode flowControlMode) {
        if (!this.hasHighLevelListener.compareAndSet(false, true)) {
            throw new IllegalStateException("Listener may only be set once.");
        }
        if (flowControlMode == FlowControlMode.AUTOMATIC && !this.bufferAckEnabled) {
            throw new IllegalStateException("Can't register listener in automatic flow control mode because the DCP client was not configured for flow control. Make sure to call flowControl(bufferSizeInBytes) when building the DCP client.");
        }
        this.listenerDispatcher = new AsyncEventDispatcher(flowControlMode, listener);
        EventHandlerAdapter.register(this, this.listenerDispatcher);
    }

    public void nonBlockingListener(DatabaseChangeListener listener) {
        if (!this.hasHighLevelListener.compareAndSet(false, true)) {
            throw new IllegalStateException("Listener may only be set once.");
        }
        EventHandlerAdapter.register(this, new ImmediateEventDispatcher(listener));
    }

    public void controlEventHandler(final ControlEventHandler controlEventHandler) {
        final boolean userHandlerWantsFailoverLogs = controlEventHandler instanceof EventHandlerAdapter;
        this.env.setControlEventHandler(new ControlEventHandler(){

            @Override
            public void onEvent(ChannelFlowController flowController, ByteBuf event) {
                if (DcpSnapshotMarkerRequest.is(event)) {
                    int partition = DcpSnapshotMarkerRequest.partition(event);
                    Client.this.sessionState().get(partition).setSnapshot(new SnapshotMarker(DcpSnapshotMarkerRequest.startSeqno(event), DcpSnapshotMarkerRequest.endSeqno(event)));
                } else if (DcpFailoverLogResponse.is(event)) {
                    Client.this.handleFailoverLogResponse(event);
                    if (!userHandlerWantsFailoverLogs) {
                        event.release();
                        return;
                    }
                } else if (RollbackMessage.is(event)) {
                    LOGGER.warn("Received rollback for vbucket {} to seqno {}", (Object)RollbackMessage.vbucket(event), (Object)RollbackMessage.seqno(event));
                } else if (DcpSeqnoAdvancedRequest.is(event)) {
                    this.handleSeqnoAdvanced(event);
                } else if (DcpSystemEventRequest.is(event)) {
                    this.handleDcpSystemEvent(event);
                }
                controlEventHandler.onEvent(flowController, event);
            }

            private void handleSeqnoAdvanced(ByteBuf event) {
                int vbucket = MessageUtil.getVbucket(event);
                long seqno = DcpSeqnoAdvancedRequest.getSeqno(event);
                LOGGER.debug("Seqno for vbucket {} advanced to {}", (Object)vbucket, (Object)seqno);
                PartitionState ps = Client.this.sessionState().get(vbucket);
                ps.setStartSeqno(seqno);
                ps.setSnapshot(new SnapshotMarker(seqno, seqno));
            }

            private void handleDcpSystemEvent(ByteBuf event) {
                long seqno = DcpSystemEventRequest.getSeqno(event);
                int vbucket = MessageUtil.getVbucket(event);
                PartitionState ps = Client.this.sessionState().get(vbucket);
                ps.setStartSeqno(seqno);
                DcpSystemEvent sysEvent = DcpSystemEvent.parse(event);
                if (!(sysEvent instanceof DcpSystemEvent.CollectionsManifestEvent)) {
                    LOGGER.warn("Ignoring unrecognized DCP system event!\n{}", (Object)RedactableArgument.meta(MessageUtil.humanize(event)));
                } else {
                    DcpSystemEvent.CollectionsManifestEvent manifestEvent = (DcpSystemEvent.CollectionsManifestEvent)((Object)sysEvent);
                    CollectionsManifest existingManifest = ps.getCollectionsManifest();
                    ps.setCollectionsManifestUid(manifestEvent.getManifestId());
                    if (MathUtils.lessThanUnsigned(manifestEvent.getManifestId(), existingManifest.getId())) {
                        LOGGER.debug("Ignoring collection manifest event; UID {} is < current manifest UID {}", (Object)manifestEvent.getManifestId(), (Object)existingManifest.getId());
                    } else {
                        LOGGER.debug("Applying collection manifest change; UID {} is >= current manifest UID {}", (Object)manifestEvent.getManifestId(), (Object)existingManifest.getId());
                        ps.setCollectionsManifest(manifestEvent.apply(existingManifest));
                    }
                }
            }
        });
    }

    public void systemEventHandler(SystemEventHandler systemEventHandler) {
        this.env.setSystemEventHandler(systemEventHandler);
    }

    private void handleFailoverLogResponse(ByteBuf event) {
        int partition = DcpFailoverLogResponse.vbucket(event);
        this.sessionState().get(partition).setFailoverLog(DcpFailoverLogResponse.entries(event));
    }

    public void dataEventHandler(DataEventHandler dataEventHandler) {
        this.env.setDataEventHandler((flowController, event) -> {
            if (DcpMutationMessage.is(event) || DcpDeletionMessage.is(event) || DcpExpirationMessage.is(event)) {
                int partition = MessageUtil.getVbucket(event);
                long seqno = DcpMutationMessage.bySeqno(event);
                this.sessionState().get(partition).setStartSeqno(seqno);
            }
            dataEventHandler.onEvent(flowController, event);
        });
    }

    public Mono<Void> connect() {
        if (!this.conductor.disconnected()) {
            LOGGER.debug("Ignoring duplicate connect attempt, already connecting/connected.");
            return Mono.empty();
        }
        if (this.env.dataEventHandler() == null) {
            throw new IllegalArgumentException("A DataEventHandler needs to be provided!");
        }
        if (this.env.controlEventHandler() == null) {
            throw new IllegalArgumentException("A ControlEventHandler needs to be provided!");
        }
        LOGGER.info("Connecting to seed nodes and bootstrapping bucket {}.", (Object)RedactableArgument.meta(this.env.bucket()));
        return this.conductor.connect().onErrorResume(throwable -> this.disconnect().then(Mono.error((Throwable)new BootstrapException("Could not connect to Cluster/Bucket", (Throwable)throwable))));
    }

    public Mono<Void> disconnect() {
        return this.dispatcherGracefulShutdown().then(this.conductor.stop()).then(this.env.shutdown()).then(this.dispatcherAwaitShutdown());
    }

    @Override
    public void close() {
        this.disconnect().block(Duration.ofSeconds(60L));
    }

    private Mono<Void> dispatcherGracefulShutdown() {
        return Mono.fromRunnable(() -> {
            if (this.listenerDispatcher != null) {
                LOGGER.info("Asking event dispatcher to shut down.");
                this.listenerDispatcher.gracefulShutdown();
            }
        });
    }

    private Mono<Void> dispatcherAwaitShutdown() {
        return Mono.fromCallable(() -> {
            long startNanos = System.nanoTime();
            if (this.listenerDispatcher != null && !this.listenerDispatcher.awaitTermination(Duration.ofSeconds(30L))) {
                LOGGER.info("Forcing event dispatcher to shut down.");
                this.listenerDispatcher.shutdownNow();
                if (!this.listenerDispatcher.awaitTermination(Duration.ofSeconds(10L))) {
                    LOGGER.warn("Event dispatcher still hasn't terminated after {} seconds.", (Object)TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startNanos));
                }
            }
            return null;
        }).then().subscribeOn(Schedulers.elastic());
    }

    public Mono<Void> resumeStreaming(Map<Integer, StreamOffset> vbucketToOffset) {
        if (vbucketToOffset.isEmpty()) {
            return Mono.empty();
        }
        vbucketToOffset.forEach((partition, offset) -> this.sessionState().set((int)partition, PartitionState.fromOffset(offset)));
        return this.startStreaming(vbucketToOffset.keySet());
    }

    private static List<Integer> toIntList(Short ... shorts) {
        ArrayList<Integer> result = new ArrayList<Integer>();
        for (Short s : shorts) {
            result.add((int)s);
        }
        return result;
    }

    @Deprecated
    public Mono<Void> startStreaming(Short ... vbids) {
        return this.startStreaming(Client.toIntList(vbids));
    }

    public Mono<Void> startStreaming() {
        return this.startStreaming(Collections.emptyList());
    }

    public Mono<Void> startStreaming(Collection<Integer> vbids) {
        int numPartitions = this.numPartitions();
        List<Integer> partitions = Client.partitionsForVbids(numPartitions, vbids);
        List<Integer> initializedPartitions = this.selectInitializedPartitions(numPartitions, partitions);
        ArrayList<Integer> noopPartitions = new ArrayList<Integer>();
        for (int p2 : partitions) {
            if (initializedPartitions.contains(p2)) continue;
            noopPartitions.add(p2);
        }
        if (!noopPartitions.isEmpty()) {
            LOGGER.info("Immediately sending stream end events for {} partitions already at desired end.", (Object)noopPartitions.size());
            LOGGER.debug("Immediately sending stream end events for partitions already at desired end: {}", noopPartitions);
            noopPartitions.forEach(p -> this.env.eventBus().publish(new StreamEndEvent((int)p, StreamEndReason.OK)));
        }
        if (initializedPartitions.isEmpty()) {
            LOGGER.info("The configured session state does not require any streams to be opened. Completing immediately.");
            return Mono.empty();
        }
        LOGGER.info("Starting to Stream for " + initializedPartitions.size() + " partitions");
        LOGGER.debug("Stream start against partitions: {}", initializedPartitions);
        return Flux.fromIterable(initializedPartitions).flatMap(partition -> {
            PartitionState partitionState = this.sessionState().get((int)partition);
            return this.conductor.startStreamForPartition((int)partition, partitionState.getOffset(), partitionState.getEndSeqno()).onErrorResume(throwable -> throwable instanceof RollbackException ? Mono.empty() : Mono.error((Throwable)throwable));
        }).then();
    }

    private List<Integer> selectInitializedPartitions(int clusterPartitions, List<Integer> partitions) {
        ArrayList<Integer> initializedPartitions = new ArrayList<Integer>();
        SessionState state = this.sessionState();
        for (int partition : partitions) {
            PartitionState ps = state.get(partition);
            if (ps != null) {
                if (MathUtils.lessThanUnsigned(ps.getStartSeqno(), ps.getEndSeqno())) {
                    initializedPartitions.add(partition);
                    continue;
                }
                LOGGER.debug("Skipping partition {}, because startSeqno({}) >= endSeqno({})", new Object[]{partition, ps.getStartSeqno(), ps.getEndSeqno()});
                continue;
            }
            LOGGER.debug("Skipping partition {}, because its state is null", (Object)partition);
        }
        if (initializedPartitions.size() > clusterPartitions) {
            throw new IllegalStateException("Session State has " + initializedPartitions + " partitions while the cluster has " + clusterPartitions + "!");
        }
        return initializedPartitions;
    }

    @Deprecated
    public Mono<Void> stopStreaming(Short ... vbids) {
        return this.stopStreaming(Client.toIntList(vbids));
    }

    public Mono<Void> stopStreaming(Collection<Integer> vbids) {
        List<Integer> partitions = Client.partitionsForVbids(this.numPartitions(), vbids);
        return Flux.fromIterable(partitions).doOnSubscribe(subscription -> {
            LOGGER.info("Stopping stream for {} partitions", (Object)partitions.size());
            LOGGER.debug("Stream stop against partitions: {}", (Object)partitions);
        }).flatMap(this.conductor::stopStreamForPartition).then();
    }

    private static List<Integer> partitionsForVbids(int numPartitions, Collection<Integer> vbids) {
        if (!vbids.isEmpty()) {
            ArrayList<Integer> result = new ArrayList<Integer>(vbids);
            Collections.sort(result);
            return result;
        }
        ArrayList<Integer> partitions = new ArrayList<Integer>();
        for (int i = 0; i < numPartitions; ++i) {
            partitions.add(i);
        }
        return partitions;
    }

    @Deprecated
    public Flux<ByteBuf> failoverLogs(Short ... vbids) {
        return this.failoverLogs(Client.toIntList(vbids));
    }

    public Flux<ByteBuf> failoverLogs(Collection<Integer> vbids) {
        List<Integer> partitions = Client.partitionsForVbids(this.numPartitions(), vbids);
        LOGGER.debug("Asking for failover logs on partitions {}", partitions);
        return Flux.fromIterable(partitions).flatMap(this.conductor::getFailoverLog);
    }

    public Mono<ByteBuf> failoverLog(int vbid) {
        return Mono.just((Object)vbid).flatMap(this.conductor::getFailoverLog);
    }

    public Mono<Void> rollbackAndRestartStream(int partition, long seqno) {
        return this.stopStreaming(Collections.singletonList(partition)).then(Mono.fromRunnable(() -> this.sessionState().rollbackToPosition(partition, seqno))).then(this.startStreaming(Collections.singletonList(partition)));
    }

    public int numPartitions() {
        return this.conductor.numberOfPartitions();
    }

    public boolean streamIsOpen(int vbid) {
        return this.conductor.streamIsOpen(vbid);
    }

    public Mono<Void> initializeState(StreamFrom from, StreamTo to) {
        if (from == StreamFrom.BEGINNING && to == StreamTo.INFINITY) {
            Client.buzzMe();
            return this.initFromBeginningToInfinity();
        }
        if (from == StreamFrom.BEGINNING && to == StreamTo.NOW) {
            return this.initFromBeginningToNow();
        }
        if (from == StreamFrom.NOW && to == StreamTo.INFINITY) {
            Client.buzzMe();
            return this.initFromNowToInfinity();
        }
        throw new IllegalStateException("Unsupported FROM/TO combination: " + (Object)((Object)from) + " -> " + (Object)((Object)to));
    }

    public Mono<Void> recoverState(StateFormat format, byte[] persistedState) {
        return Mono.create(sink -> {
            LOGGER.info("Recovering state from format {}", (Object)format);
            LOGGER.debug("PersistedState on recovery is: {}", (Object)new String(persistedState, StandardCharsets.UTF_8));
            try {
                if (format == StateFormat.JSON) {
                    this.sessionState().setFromJson(persistedState);
                    sink.success();
                } else {
                    sink.error((Throwable)new IllegalStateException("Unsupported StateFormat " + (Object)((Object)format)));
                }
            }
            catch (Exception ex) {
                sink.error((Throwable)ex);
            }
        });
    }

    public Mono<Void> recoverOrInitializeState(StateFormat format, byte[] persistedState, StreamFrom from, StreamTo to) {
        if (persistedState == null || persistedState.length == 0) {
            return this.initializeState(from, to);
        }
        return this.recoverState(format, persistedState);
    }

    private Mono<Void> initFromBeginningToInfinity() {
        return Mono.create(sink -> {
            LOGGER.info("Initializing state from beginning to no end.");
            try {
                this.sessionState().setToBeginningWithNoEnd(this.numPartitions());
                sink.success();
            }
            catch (Exception ex) {
                LOGGER.warn("Failed to initialize state from beginning to no end.", (Throwable)ex);
                sink.error((Throwable)ex);
            }
        });
    }

    private Mono<Void> initFromNowToInfinity() {
        return this.initWithCallback(partitionAndSeqno -> {
            int partition = partitionAndSeqno.partition();
            long seqno = partitionAndSeqno.seqno();
            PartitionState partitionState = this.sessionState().get(partition);
            partitionState.setStartSeqno(seqno);
            partitionState.setSnapshot(new SnapshotMarker(seqno, seqno));
            this.sessionState().set(partition, partitionState);
        });
    }

    private Mono<Void> initFromBeginningToNow() {
        return this.initWithCallback(partitionAndSeqno -> {
            int partition = partitionAndSeqno.partition();
            long seqno = partitionAndSeqno.seqno();
            PartitionState partitionState = this.sessionState().get(partition);
            partitionState.setEndSeqno(seqno);
            this.sessionState().set(partition, partitionState);
        });
    }

    private Mono<Void> initWithCallback(Consumer<PartitionAndSeqno> callback) {
        this.sessionState().setToBeginningWithNoEnd(this.numPartitions());
        return this.getSeqnos().doOnNext(callback).map(PartitionAndSeqno::partition).flatMap(this::failoverLog).map(buf -> {
            int partition = DcpFailoverLogResponse.vbucket(buf);
            this.handleFailoverLogResponse((ByteBuf)buf);
            buf.release();
            return partition;
        }).then();
    }

    private static void buzzMe() {
        LOGGER.debug("To Infinity... AND BEYOND!");
    }

    public static class Environment {
        private static final Logger log = LoggerFactory.getLogger(Environment.class);
        public static final Duration DEFAULT_BOOTSTRAP_TIMEOUT = Duration.ofSeconds(5L);
        public static final Duration DEFAULT_CONFIG_REFRESH_INTERVAL = Duration.ofSeconds(2L);
        public static final long DEFAULT_SOCKET_CONNECT_TIMEOUT = TimeUnit.SECONDS.toMillis(1L);
        public static final Retry DEFAULT_DCP_CHANNELS_RECONNECT_DELAY = Retry.fixedDelay((long)Long.MAX_VALUE, (Duration)Duration.ofMillis(200L));
        public static final int DEFAULT_DCP_CHANNELS_RECONNECT_MAX_ATTEMPTS = Integer.MAX_VALUE;
        private static final int DEFAULT_KV_PORT = 11210;
        private static final int DEFAULT_KV_TLS_PORT = 11207;
        private final List<HostAndPort> seedNodes;
        private final NetworkResolution networkResolution;
        private final ConnectionNameGenerator connectionNameGenerator;
        private final String bucket;
        private final boolean collectionsAware;
        private final OptionalLong scopeId;
        private final Optional<String> scopeName;
        private final Set<Long> collectionIds;
        private final Set<String> collectionNames;
        private final Authenticator authenticator;
        private final Duration bootstrapTimeout;
        private final Duration configRefreshInterval;
        private final DcpControl dcpControl;
        private final Set<OpenConnectionFlag> connectionFlags;
        private final EventLoopGroup eventLoopGroup;
        private final boolean eventLoopGroupIsPrivate;
        private final boolean poolBuffers;
        private final int bufferAckWatermark;
        private final long socketConnectTimeout;
        private final long persistencePollingIntervalMillis;
        private volatile DataEventHandler dataEventHandler;
        private volatile ControlEventHandler controlEventHandler;
        private final PersistedSeqnos persistedSeqnos = PersistedSeqnos.uninitialized();
        private final Retry dcpChannelsReconnectDelay;
        private final int dcpChannelsReconnectMaxAttempts;
        private final EventBus eventBus;
        private final Scheduler scheduler;
        private Disposable systemEventSubscription;
        private final SecurityConfig securityConfig;

        private Environment(Builder builder) {
            this.connectionNameGenerator = builder.connectionNameGenerator;
            this.bucket = builder.bucket;
            this.authenticator = builder.authenticator;
            this.bootstrapTimeout = builder.bootstrapTimeout;
            this.configRefreshInterval = builder.configRefreshInterval;
            this.dcpControl = builder.dcpControl;
            this.connectionFlags = Collections.unmodifiableSet(EnumSet.copyOf(builder.connectionFlags));
            this.eventLoopGroup = Optional.ofNullable(builder.eventLoopGroup).orElseGet(() -> Client.newEventLoopGroup());
            this.eventLoopGroupIsPrivate = builder.eventLoopGroup == null;
            this.bufferAckWatermark = builder.bufferAckWatermark;
            this.poolBuffers = builder.poolBuffers;
            this.socketConnectTimeout = builder.socketConnectTimeout;
            this.dcpChannelsReconnectDelay = builder.dcpChannelsReconnectDelay;
            this.dcpChannelsReconnectMaxAttempts = builder.dcpChannelsReconnectMaxAttempts;
            this.collectionsAware = builder.collectionsAware;
            this.collectionIds = Collections.unmodifiableSet(builder.collectionIds);
            this.collectionNames = Collections.unmodifiableSet(builder.collectionNames);
            this.scopeId = builder.scopeId;
            this.scopeName = builder.scopeName;
            if (builder.eventBus != null) {
                this.eventBus = builder.eventBus;
                this.scheduler = null;
            } else {
                this.scheduler = Schedulers.parallel();
                this.eventBus = new DefaultDcpEventBus(this.scheduler);
            }
            this.securityConfig = builder.securityConfig;
            this.seedNodes = Environment.makeDefaultPortsExplicit(builder.seedNodes, this.securityConfig.tlsEnabled());
            this.networkResolution = builder.networkResolution;
            this.persistencePollingIntervalMillis = builder.persistencePollingIntervalMillis;
            if (this.persistencePollingIntervalMillis > 0L) {
                if (this.bufferAckWatermark == 0) {
                    throw new IllegalArgumentException("Rollback mitigation requires flow control.");
                }
                StreamEventBuffer buffer = new StreamEventBuffer(this.eventBus);
                this.dataEventHandler = buffer;
                this.controlEventHandler = buffer;
            }
        }

        public List<HostAndPort> clusterAt() {
            return this.seedNodes;
        }

        public NetworkResolution networkResolution() {
            return this.networkResolution;
        }

        public DataEventHandler dataEventHandler() {
            return this.dataEventHandler;
        }

        public StreamEventBuffer streamEventBuffer() {
            try {
                return (StreamEventBuffer)this.dataEventHandler;
            }
            catch (ClassCastException e) {
                throw new IllegalStateException("Stream event buffer not configured");
            }
        }

        public PersistedSeqnos persistedSeqnos() {
            return this.persistedSeqnos;
        }

        public long persistencePollingIntervalMillis() {
            return this.persistencePollingIntervalMillis;
        }

        public boolean persistencePollingEnabled() {
            return this.persistencePollingIntervalMillis > 0L;
        }

        public ControlEventHandler controlEventHandler() {
            return this.controlEventHandler;
        }

        public ConnectionNameGenerator connectionNameGenerator() {
            return this.connectionNameGenerator;
        }

        public String bucket() {
            return this.bucket;
        }

        public boolean collectionsAware() {
            return this.collectionsAware;
        }

        public Set<Long> collectionIds() {
            return this.collectionIds;
        }

        public Set<String> collectionNames() {
            return this.collectionNames;
        }

        public OptionalLong scopeId() {
            return this.scopeId;
        }

        public Optional<String> scopeName() {
            return this.scopeName;
        }

        public Authenticator authenticator() {
            return this.authenticator;
        }

        public DcpControl dcpControl() {
            return this.dcpControl;
        }

        public Set<OpenConnectionFlag> connectionFlags() {
            return this.connectionFlags;
        }

        public int bufferAckWatermark() {
            return this.bufferAckWatermark;
        }

        public EventLoopGroup eventLoopGroup() {
            return this.eventLoopGroup;
        }

        public Duration configRefreshInterval() {
            return this.configRefreshInterval;
        }

        public Duration bootstrapTimeout() {
            return this.bootstrapTimeout;
        }

        public void setDataEventHandler(DataEventHandler dataEventHandler) {
            if (this.persistencePollingEnabled()) {
                this.streamEventBuffer().setDataEventHandler(dataEventHandler);
            } else {
                this.dataEventHandler = dataEventHandler;
            }
        }

        public void setControlEventHandler(ControlEventHandler controlEventHandler) {
            if (this.persistencePollingEnabled()) {
                this.streamEventBuffer().setControlEventHandler(controlEventHandler);
            } else {
                this.controlEventHandler = controlEventHandler;
            }
        }

        public void setSystemEventHandler(SystemEventHandler systemEventHandler) {
            if (this.systemEventSubscription != null) {
                this.systemEventSubscription.dispose();
            }
            if (systemEventHandler != null) {
                this.systemEventSubscription = this.eventBus().get().filter(evt -> evt.type().equals((Object)EventType.SYSTEM)).subscribe(systemEventHandler::onEvent);
            }
        }

        public boolean poolBuffers() {
            return this.poolBuffers;
        }

        public long socketConnectTimeout() {
            return this.socketConnectTimeout;
        }

        public EventBus eventBus() {
            return this.eventBus;
        }

        public SecurityConfig securityConfig() {
            return this.securityConfig;
        }

        private static List<HostAndPort> makeDefaultPortsExplicit(List<HostAndPort> addresses, boolean sslEnabled) {
            int defaultKvPort = sslEnabled ? 11207 : 11210;
            ArrayList<HostAndPort> result = new ArrayList<HostAndPort>();
            for (HostAndPort node : addresses) {
                if (node.port() == 8091 || node.port() == 18091) {
                    log.warn("Seed node '{}' uses port '{}' which is likely incorrect. This should be the port of the KV service, not the Manager service. If the connection fails, omit the port so the client can supply the correct default.", (Object)node.host(), (Object)node.port());
                }
                result.add(node.port() == 0 ? node.withPort(defaultKvPort) : node);
            }
            return result;
        }

        public Mono<Void> shutdown() {
            Mono loopShutdown = Mono.empty();
            if (this.eventLoopGroupIsPrivate) {
                loopShutdown = Mono.create(sink -> this.eventLoopGroup.shutdownGracefully(0L, 10L, TimeUnit.MILLISECONDS).addListener(future -> {
                    if (future.isSuccess()) {
                        sink.success();
                    } else {
                        sink.error(future.cause());
                    }
                }));
            }
            return loopShutdown.then();
        }

        public String toString() {
            return "ClientEnvironment{seedNodes=" + this.seedNodes + ", connectionNameGenerator=" + this.connectionNameGenerator + ", bucket='" + this.bucket + '\'' + ", collectionsAware=" + this.collectionsAware + ", collectionIds=" + this.collectionIds + ", collectionNames=" + this.collectionNames + ", scopeId=" + this.scopeId + ", scopeName=" + this.scopeName + ", dcpControl=" + this.dcpControl + ", eventLoopGroup=" + this.eventLoopGroup.getClass().getSimpleName() + ", eventLoopGroupIsPrivate=" + this.eventLoopGroupIsPrivate + ", poolBuffers=" + this.poolBuffers + ", bufferAckWatermark=" + this.bufferAckWatermark + ", bootstrapTimeout=" + this.bootstrapTimeout + ", configRefreshInterval=" + this.configRefreshInterval + ", securityConfig=" + this.securityConfig.exportAsMap() + '}';
        }

        public int dcpChannelsReconnectMaxAttempts() {
            return this.dcpChannelsReconnectMaxAttempts;
        }
    }

    public static class Builder {
        private List<HostAndPort> seedNodes = Collections.singletonList(new HostAndPort("127.0.0.1", 0));
        private NetworkResolution networkResolution = NetworkResolution.AUTO;
        private EventLoopGroup eventLoopGroup;
        private String bucket = "default";
        private boolean collectionsAware;
        private Set<Long> collectionIds = new HashSet<Long>();
        private Set<String> collectionNames = new HashSet<String>();
        private OptionalLong scopeId = OptionalLong.empty();
        private Optional<String> scopeName = Optional.empty();
        private Authenticator authenticator = null;
        private ConnectionNameGenerator connectionNameGenerator = DefaultConnectionNameGenerator.INSTANCE;
        private final DcpControl dcpControl = new DcpControl().put(DcpControl.Names.ENABLE_NOOP, "true");
        private final EnumSet<OpenConnectionFlag> connectionFlags = EnumSet.noneOf(OpenConnectionFlag.class);
        private int bufferAckWatermark;
        private boolean poolBuffers = true;
        private Duration bootstrapTimeout = Environment.DEFAULT_BOOTSTRAP_TIMEOUT;
        private Duration configRefreshInterval = Environment.DEFAULT_CONFIG_REFRESH_INTERVAL;
        private long socketConnectTimeout = Environment.DEFAULT_SOCKET_CONNECT_TIMEOUT;
        private int dcpChannelsReconnectMaxAttempts = Integer.MAX_VALUE;
        private Retry dcpChannelsReconnectDelay = Environment.DEFAULT_DCP_CHANNELS_RECONNECT_DELAY;
        private EventBus eventBus;
        private SecurityConfig securityConfig = SecurityConfig.builder().build();
        private long persistencePollingIntervalMillis;
        private MeterRegistry meterRegistry = Metrics.globalRegistry;

        public Builder noValue(boolean noValue) {
            return this.setConnectionFlag(OpenConnectionFlag.NO_VALUE, noValue);
        }

        public Builder xattrs(boolean xattrs) {
            return this.setConnectionFlag(OpenConnectionFlag.INCLUDE_XATTRS, xattrs);
        }

        private Builder setConnectionFlag(OpenConnectionFlag flag, boolean value) {
            if (value) {
                this.connectionFlags.add(flag);
            } else {
                this.connectionFlags.remove((Object)flag);
            }
            return this;
        }

        public Builder bufferAckWatermark(int watermark) {
            if (watermark > 100 || watermark < 0) {
                throw new IllegalArgumentException("The bufferAckWatermark is percents, so it needs to be between 0 and 100");
            }
            this.bufferAckWatermark = watermark;
            return this;
        }

        public Builder seedNodes(Collection<String> addresses) {
            ArrayList<String> asList = new ArrayList<String>(new HashSet<String>(addresses));
            this.seedNodes = Builder.getSeedNodes(ConnectionString.fromHostnames(asList));
            return this;
        }

        public Builder seedNodes(String ... addresses) {
            return this.seedNodes(Arrays.asList(addresses));
        }

        @Deprecated
        public Builder hostnames(List<String> addresses) {
            return this.seedNodes(addresses);
        }

        @Deprecated
        public Builder hostnames(String ... addresses) {
            return this.seedNodes(addresses);
        }

        public Builder connectionString(String connectionString) {
            this.seedNodes = Builder.getSeedNodes(ConnectionString.create(connectionString));
            return this;
        }

        private static List<HostAndPort> getSeedNodes(ConnectionString cs) {
            return cs.hosts().stream().map(s -> new HostAndPort(s.hostname(), s.port())).collect(Collectors.toList());
        }

        public Builder networkResolution(NetworkResolution nr) {
            this.networkResolution = Objects.requireNonNull(nr);
            return this;
        }

        public Builder eventLoopGroup(EventLoopGroup eventLoopGroup) {
            this.eventLoopGroup = eventLoopGroup;
            return this;
        }

        public Builder bucket(String bucket) {
            this.bucket = bucket;
            return this;
        }

        public Builder collectionsAware(boolean enable) {
            this.collectionsAware = enable;
            return this;
        }

        public Builder collectionNames(Collection<String> qualifiedCollectionNames) {
            for (String name : qualifiedCollectionNames) {
                if (name == null) {
                    throw new IllegalArgumentException("Collection name must not be null");
                }
                if (name.split("\\.", -1).length == 2) continue;
                throw new IllegalArgumentException("Collection name '" + name + "' must be qualified by a scope name, like: myScope.myCollection");
            }
            this.collectionNames = new HashSet<String>(qualifiedCollectionNames);
            return this;
        }

        public Builder collectionNames(String ... qualifiedCollectionNames) {
            return this.collectionNames(Arrays.asList(qualifiedCollectionNames));
        }

        public Builder collectionIds(Collection<Long> collectionIds) {
            if (collectionIds.stream().anyMatch(Objects::isNull)) {
                throw new IllegalArgumentException("Collection ID must not be null");
            }
            this.collectionIds = new HashSet<Long>(collectionIds);
            return this;
        }

        public Builder collectionIds(long ... collectionIds) {
            return this.collectionIds(Arrays.stream(collectionIds).boxed().collect(Collectors.toList()));
        }

        public Builder scopeName(String scopeName) {
            this.scopeName = CbCollections.isNullOrEmpty(scopeName) ? Optional.empty() : Optional.of(scopeName);
            return this;
        }

        public Builder scopeId(long scopeId) {
            this.scopeId = OptionalLong.of(scopeId);
            return this;
        }

        public Builder credentials(String username, String password) {
            return this.credentialsProvider(new StaticCredentialsProvider(username, password));
        }

        public Builder credentialsProvider(CredentialsProvider credentialsProvider) {
            return this.authenticator(new PasswordAuthenticator(credentialsProvider));
        }

        public Builder authenticator(Authenticator authenticator) {
            this.authenticator = Objects.requireNonNull(authenticator);
            return this;
        }

        public Builder userAgent(String productName, String productVersion, String ... comments) {
            return this.connectionNameGenerator(DefaultConnectionNameGenerator.forProduct(productName, productVersion, comments));
        }

        public Builder connectionNameGenerator(ConnectionNameGenerator connectionNameGenerator) {
            this.connectionNameGenerator = connectionNameGenerator;
            return this;
        }

        public Builder controlParam(DcpControl.Names name, Object value) {
            this.dcpControl.put(name, value.toString());
            return this;
        }

        public Builder compression(CompressionMode compressionMode) {
            this.dcpControl.compression(compressionMode);
            return this;
        }

        public Builder poolBuffers(boolean pool) {
            this.poolBuffers = pool;
            return this;
        }

        public Builder socketConnectTimeout(long socketConnectTimeout) {
            this.socketConnectTimeout = socketConnectTimeout;
            return this;
        }

        public Builder bootstrapTimeout(Duration bootstrapTimeout) {
            this.bootstrapTimeout = bootstrapTimeout;
            return this;
        }

        public Builder configRefreshInterval(Duration configRefreshInterval) {
            if (configRefreshInterval.compareTo(Duration.ofSeconds(1L)) < 0) {
                throw new IllegalArgumentException("Minimum config refresh interval is 1 second.");
            }
            if (configRefreshInterval.compareTo(Duration.ofMinutes(2L)) > 0) {
                throw new IllegalArgumentException("Maximum config refresh interval is 2 minutes.");
            }
            this.configRefreshInterval = Objects.requireNonNull(configRefreshInterval);
            return this;
        }

        public Builder dcpChannelsReconnectMaxAttempts(int dcpChannelsReconnectMaxAttempts) {
            this.dcpChannelsReconnectMaxAttempts = dcpChannelsReconnectMaxAttempts;
            return this;
        }

        @Deprecated
        public Builder dcpChannelsReconnectDelay(Delay ignored) {
            return this;
        }

        public Builder eventBus(EventBus eventBus) {
            this.eventBus = eventBus;
            return this;
        }

        public Builder securityConfig(SecurityConfig securityConfig) {
            this.securityConfig = Objects.requireNonNull(securityConfig);
            return this;
        }

        public Builder securityConfig(SecurityConfig.Builder securityConfigBuilder) {
            return this.securityConfig(securityConfigBuilder.build());
        }

        public Builder mitigateRollbacks(long persistencePollingInterval, TimeUnit unit) {
            this.persistencePollingIntervalMillis = unit.toMillis(persistencePollingInterval);
            return this;
        }

        public Builder flowControl(int bufferSizeInBytes) {
            this.controlParam(DcpControl.Names.CONNECTION_BUFFER_SIZE, bufferSizeInBytes);
            if (this.bufferAckWatermark == 0) {
                this.bufferAckWatermark = 80;
            }
            return this;
        }

        public Builder meterRegistry(MeterRegistry meterRegistry) {
            this.meterRegistry = Objects.requireNonNull(meterRegistry);
            return this;
        }

        public Client build() {
            boolean hasScope;
            if (this.authenticator == null) {
                throw new IllegalStateException("Must provide authenticator. Simplest way is to call credentials(username, password).");
            }
            if (this.authenticator.requiresTls() && !this.securityConfig.tlsEnabled()) {
                throw new IllegalStateException("The provided authenticator requires TLS, but TLS was not enabled in the SecurityConfig");
            }
            if (this.collectionsAware && !this.dcpControl.noopEnabled()) {
                throw new IllegalStateException("Collections awareness requires NOOPs; must not disable NOOPs.");
            }
            if (this.scopeName.isPresent() && this.scopeId.isPresent()) {
                throw new IllegalStateException("May specify scope name or ID, but not both.");
            }
            boolean hasCollections = !this.collectionIds.isEmpty() || !this.collectionNames.isEmpty();
            boolean bl = hasScope = this.scopeId.isPresent() || this.scopeName.isPresent();
            if (hasCollections && hasScope) {
                throw new IllegalStateException("May specify scope or collections, but not both.");
            }
            if ((hasCollections || hasScope) && !this.collectionsAware) {
                throw new IllegalStateException("Must call collectionsAware(true) when specifying scope or collections.");
            }
            return new Client(this);
        }
    }
}

