/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.deps.io.netty.util.concurrent.DefaultThreadFactory;
import com.couchbase.client.core.util.ConnectionString;
import com.couchbase.client.core.util.HostAndPort;
import com.couchbase.client.core.util.NanoTimestamp;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.buffer.DcpBucketConfig;
import com.couchbase.client.dcp.conductor.BucketConfigArbiter;
import com.couchbase.client.dcp.conductor.DcpChannel;
import com.couchbase.client.dcp.conductor.NotMyVbucketException;
import com.couchbase.client.dcp.core.config.BucketCapability;
import com.couchbase.client.dcp.core.config.NodeInfo;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import com.couchbase.client.dcp.core.state.LifecycleState;
import com.couchbase.client.dcp.core.state.NotConnectedException;
import com.couchbase.client.dcp.core.utils.CbCollections;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.events.FailedToAddNodeEvent;
import com.couchbase.client.dcp.events.FailedToMovePartitionEvent;
import com.couchbase.client.dcp.events.FailedToRemoveNodeEvent;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.highlevel.internal.CollectionsManifest;
import com.couchbase.client.dcp.highlevel.internal.KeyExtractor;
import com.couchbase.client.dcp.message.PartitionAndSeqno;
import com.couchbase.client.dcp.metrics.DcpClientMetrics;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

public class Conductor {
    private static final Logger LOGGER = LoggerFactory.getLogger(Conductor.class);
    private final BucketConfigArbiter bucketConfigArbiter;
    private final Set<DcpChannel> channels = ConcurrentHashMap.newKeySet();
    private volatile boolean stopped = true;
    private final Client.Environment env;
    private final AtomicReference<DcpBucketConfig> currentConfig = new AtomicReference();
    private final SessionState sessionState = new SessionState();
    private final DcpClientMetrics metrics;
    private final Duration dnsSrvRefreshCheckInterval = Duration.ofSeconds(2L);
    private final Duration dnsSrvRefreshThrottle = Duration.ofSeconds(15L);
    private final Duration shutdownTimeout = Duration.ofSeconds(30L);
    private final Disposable configSubscription;
    private final ScheduledExecutorService configUpdateExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("couchbase-dcp-reconfigure", true));
    private final Thread configUpdateThread;
    private final CountDownLatch configurationApplied;
    private NanoTimestamp lastDnsSrvRefresh;

    private void requireConfigUpdateThread() {
        if (Thread.currentThread() != this.configUpdateThread) {
            throw new IllegalStateException("This method may only be called on the config update thread, but was called on: " + Thread.currentThread());
        }
    }

    public Conductor(Client.Environment env, DcpClientMetrics metrics) {
        try {
            this.configUpdateThread = this.configUpdateExecutor.submit(Thread::currentThread).get();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.configurationApplied = new CountDownLatch(1);
        this.metrics = Objects.requireNonNull(metrics);
        this.env = env;
        this.bucketConfigArbiter = new BucketConfigArbiter(env);
        this.configSubscription = this.bucketConfigArbiter.configs().publishOn(Schedulers.fromExecutor((Executor)this.configUpdateExecutor)).subscribe(config -> {
            if (config.numberOfPartitions() == 0 && this.currentConfig.get() == null) {
                LOGGER.debug("Skipping initial config (rev {}) because it has invalid partition count.", (Object)config.rev());
                return;
            }
            LOGGER.trace("Applying new configuration, new rev is {}.", (Object)config.rev());
            this.currentConfig.set((DcpBucketConfig)config);
            this.reconfigure((DcpBucketConfig)config);
            this.configurationApplied.countDown();
        });
    }

    public SessionState sessionState() {
        return this.sessionState;
    }

    public Mono<Void> connect() {
        Duration bootstrapTimeout = this.env.bootstrapTimeout().plus(this.env.configRefreshInterval());
        return Mono.fromRunnable(() -> {
            boolean usedDnsSrvForBootstrap;
            this.stopped = false;
            HashSet<HostAndPort> addresses = new HashSet<HostAndPort>(this.env.clusterAt());
            this.configUpdateExecutor.execute(() -> this.add(addresses));
            Set<String> connectionStringHosts = CbCollections.setCopyOf(CbCollections.transform(this.env.connectionString().hosts(), ConnectionString.UnresolvedSocket::host));
            Set<String> resolvedHosts = CbCollections.setCopyOf(CbCollections.transform(addresses, HostAndPort::host));
            boolean bl = usedDnsSrvForBootstrap = !resolvedHosts.equals(connectionStringHosts);
            if (usedDnsSrvForBootstrap) {
                this.startDnsSrvRefreshWatchdog();
            }
        }).then(Conductor.await(this.configurationApplied, bootstrapTimeout).doOnError(throwable -> LOGGER.warn("Did not receive initial configuration from cluster within {}", (Object)bootstrapTimeout)));
    }

    private void startDnsSrvRefreshWatchdog() {
        LOGGER.info("Scheduling DNS SRV re-bootstrap check at interval {}", (Object)this.dnsSrvRefreshCheckInterval);
        long intervalMillis = this.dnsSrvRefreshCheckInterval.toMillis();
        long initialDelayMillis = this.env.bootstrapTimeout().toMillis();
        this.configUpdateExecutor.execute(() -> {
            this.lastDnsSrvRefresh = NanoTimestamp.never();
        });
        this.configUpdateExecutor.scheduleWithFixedDelay(this::maybeBootstrapAgain, initialDelayMillis, intervalMillis, TimeUnit.MILLISECONDS);
    }

    private void maybeBootstrapAgain() {
        this.requireConfigUpdateThread();
        try {
            if (this.stopped || !this.lastDnsSrvRefresh.hasElapsed(this.dnsSrvRefreshThrottle) || this.channels.stream().anyMatch(it -> it.state() == LifecycleState.CONNECTED)) {
                return;
            }
            this.lastDnsSrvRefresh = NanoTimestamp.now();
            LOGGER.info("Attempting DNS SRV refresh because the client is currently connected to zero nodes.");
            HashSet<HostAndPort> nodesToAdd = new HashSet<HostAndPort>(this.env.clusterAt());
            nodesToAdd.removeAll(this.channelsByAddress().keySet());
            if (nodesToAdd.isEmpty()) {
                LOGGER.info("DNS SRV record has no new nodes.");
            } else {
                LOGGER.info("Adding new nodes from DNS SRV record: {}", (Object)RedactableArgument.system(nodesToAdd));
                this.add(nodesToAdd);
            }
        }
        catch (Throwable t) {
            LOGGER.error("Exception in DNS SRV refresh watchdog task.", t);
        }
    }

    private static Mono<Void> await(CountDownLatch latch, Duration timeout) {
        return Mono.fromRunnable(() -> {
            try {
                if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    throw new RuntimeException(new TimeoutException("Timed out after waiting " + timeout + " for latch."));
                }
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }

    BucketConfigArbiter bucketConfigArbiter() {
        return this.bucketConfigArbiter;
    }

    public boolean disconnected() {
        return this.channels.stream().allMatch(c -> c.isState(LifecycleState.DISCONNECTED));
    }

    public Mono<Void> stop() {
        return Mono.fromCallable(() -> {
            LOGGER.debug("Shutting down...");
            this.stopped = true;
            this.configSubscription.dispose();
            this.configUpdateExecutor.shutdown();
            if (!this.configUpdateExecutor.awaitTermination(this.shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                LOGGER.error("Config updater executor failed to terminate within {}", (Object)this.shutdownTimeout);
            }
            return null;
        }).subscribeOn(Schedulers.boundedElastic()).then(Flux.fromIterable(this.channels).flatMap(DcpChannel::disconnect).then()).doOnSuccess(ignore -> LOGGER.info("Shutdown complete."));
    }

    public int numberOfPartitions() {
        return this.currentConfig.get().numberOfPartitions();
    }

    public Flux<PartitionAndSeqno> getSeqnos() {
        return Flux.fromIterable(this.channels).flatMap(this::getSeqnosForChannel).flatMap(Flux::fromIterable);
    }

    private Mono<List<PartitionAndSeqno>> getSeqnosForChannel(DcpChannel channel) {
        return Mono.just((Object)channel).flatMap(DcpChannel::getSeqnos).retryWhen((Retry)Retry.fixedDelay((long)Long.MAX_VALUE, (Duration)Duration.ofMillis(200L)).filter(e -> e instanceof NotConnectedException).doAfterRetry(retrySignal -> LOGGER.debug("Rescheduling get Seqnos for channel {}, not connected (yet).", (Object)channel)));
    }

    public Mono<ByteBuf> getFailoverLog(int partition) {
        return Mono.just((Object)partition).map(ignored -> this.activeChannelByPartition(partition)).flatMap(channel -> channel.getFailoverLog(partition)).retryWhen((Retry)Retry.fixedDelay((long)Long.MAX_VALUE, (Duration)Duration.ofMillis(200L)).filter(e -> e instanceof NotConnectedException).doAfterRetry(retrySignal -> LOGGER.debug("Rescheduling Get Failover Log for vbid {}, not connected (yet).", (Object)partition)));
    }

    public Mono<Void> startStreamForPartition(int partition, StreamOffset startOffset, long endSeqno) {
        return Mono.just((Object)partition).map(this::activeChannelByPartition).flatMap(channel -> channel.getCollectionsManifest().flatMap(manifest -> {
            CollectionsManifest m = manifest.orElse(CollectionsManifest.DEFAULT);
            PartitionState ps = this.sessionState.get(partition);
            ps.setCollectionsManifest(m);
            ps.setKeyExtractor(manifest.isPresent() ? KeyExtractor.COLLECTIONS : KeyExtractor.NO_COLLECTIONS);
            ps.setMostRecentOpenStreamOffset(startOffset);
            return channel.openStream(partition, startOffset, endSeqno, m, this.env.streamFlags());
        })).retryWhen((Retry)Retry.fixedDelay((long)Long.MAX_VALUE, (Duration)Duration.ofMillis(200L)).filter(e -> e instanceof NotConnectedException).doAfterRetry(retrySignal -> LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", (Object)partition)));
    }

    public Mono<Void> stopStreamForPartition(int partition) {
        if (this.streamIsOpen(partition)) {
            DcpChannel channel = this.activeChannelByPartition(partition);
            return channel.closeStream(partition);
        }
        return Mono.empty();
    }

    public boolean streamIsOpen(int partition) {
        DcpChannel channel = this.activeChannelByPartition(partition);
        return channel.streamIsOpen(partition);
    }

    private DcpChannel activeChannelByPartition(int partition) {
        HostAndPort address = this.currentConfig.get().getActiveNodeKvAddress(partition);
        for (DcpChannel ch : this.channels) {
            if (!ch.address().equals((Object)address)) continue;
            return ch;
        }
        throw new NotConnectedException("No DcpChannel found for partition " + partition);
    }

    public boolean hasCapability(BucketCapability capability) {
        DcpBucketConfig config = this.currentConfig.get();
        return config != null && config.hasCapability(capability);
    }

    public boolean hasCapabilities(Collection<BucketCapability> capabilities) {
        return capabilities.stream().allMatch(this::hasCapability);
    }

    private Map<HostAndPort, DcpChannel> channelsByAddress() {
        this.requireConfigUpdateThread();
        return this.channels.stream().collect(Collectors.toMap(DcpChannel::address, c -> c));
    }

    private void reconfigure(DcpBucketConfig configHelper) {
        this.requireConfigUpdateThread();
        if (this.stopped) {
            return;
        }
        this.metrics.incrementReconfigure();
        List<NodeInfo> nodes = configHelper.getKvNodes();
        if (nodes.isEmpty()) {
            throw new IllegalStateException("Bucket config helper returned no data nodes");
        }
        Map<HostAndPort, DcpChannel> existingChannelsByAddress = this.channelsByAddress();
        Set nodeAddresses = nodes.stream().map(configHelper::getAddress).collect(Collectors.toSet());
        boolean nodesChanged = false;
        for (HostAndPort hostAndPort : nodeAddresses) {
            if (existingChannelsByAddress.containsKey(hostAndPort)) continue;
            this.metrics.incrementAddChannel();
            this.add(hostAndPort);
            nodesChanged = true;
        }
        for (Map.Entry entry : existingChannelsByAddress.entrySet()) {
            if (nodeAddresses.contains(entry.getKey())) continue;
            this.metrics.incrementRemoveChannel();
            this.remove((DcpChannel)entry.getValue());
            nodesChanged = true;
        }
        if (nodesChanged) {
            this.updateChannelGauges();
        }
    }

    private void updateChannelGauges() {
        this.metrics.registerConnectionStatusGauges(this.channels);
    }

    private void add(Collection<HostAndPort> nodes) {
        this.requireConfigUpdateThread();
        nodes.forEach(this::add);
        this.updateChannelGauges();
    }

    private void add(HostAndPort node) {
        this.requireConfigUpdateThread();
        LOGGER.info("Adding DCP Channel against {}", (Object)RedactableArgument.system(node));
        DcpChannel channel = new DcpChannel(node, this.env, this, this.metrics);
        if (!this.channels.add(channel)) {
            throw new IllegalStateException("Tried to add duplicate channel: " + RedactableArgument.system(channel));
        }
        channel.connect().retryWhen((Retry)Retry.fixedDelay((long)Long.MAX_VALUE, (Duration)Duration.ofSeconds(1L)).filter(t -> !this.stopped).doAfterRetry(retrySignal -> LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", (Object)node))).doOnSuccess(ignored -> LOGGER.debug("Completed Node connect for DCP channel {}", (Object)node)).onErrorResume(e -> {
            LOGGER.warn("Got error during connect (maybe retried) for node {}", (Object)RedactableArgument.system(node), e);
            if (this.env.eventBus() != null) {
                this.env.eventBus().publish(new FailedToAddNodeEvent(node, (Throwable)e));
            }
            return Mono.empty();
        }).subscribe();
    }

    private void remove(DcpChannel node) {
        this.requireConfigUpdateThread();
        if (!this.channels.remove(node)) {
            throw new IllegalStateException("Tried to remove unknown channel: " + RedactableArgument.system(node));
        }
        LOGGER.info("Removing DCP Channel against {}", (Object)RedactableArgument.system(node));
        for (int partition = 0; partition < node.streamIsOpen.length(); ++partition) {
            if (!node.streamIsOpen(partition)) continue;
            this.maybeMovePartition(partition);
        }
        node.disconnect().doOnSuccess(ignored -> LOGGER.debug("Channel remove notified as complete for {}", (Object)node.address())).onErrorResume(e -> {
            LOGGER.warn("Got error during Node removal for node {}", (Object)RedactableArgument.system(node.address()), e);
            if (this.env.eventBus() != null) {
                this.env.eventBus().publish(new FailedToRemoveNodeEvent(node.address(), (Throwable)e));
            }
            return Mono.empty();
        }).subscribe();
    }

    void maybeMovePartition(int partition) {
        Mono.just((Object)partition).delayElement(Duration.ofMillis(50L)).filter(ignored -> {
            PartitionState ps = this.sessionState.get(partition);
            boolean desiredSeqnoReached = ps.isAtEnd();
            if (desiredSeqnoReached) {
                LOGGER.debug("Reached desired high seqno {} for vbucket {}, not reopening stream.", (Object)ps.getEndSeqno(), (Object)partition);
            }
            return !desiredSeqnoReached;
        }).flatMap(ignored -> {
            PartitionState ps = this.sessionState.get(partition);
            return this.startStreamForPartition(partition, ps.getOffset(), ps.getEndSeqno()).retryWhen((Retry)Retry.fixedDelay((long)Long.MAX_VALUE, (Duration)Duration.ofMillis(200L)).filter(e -> e instanceof NotMyVbucketException));
        }).doOnSubscribe(subscription -> LOGGER.debug("Subscribing for Partition Move for partition {}", (Object)partition)).doOnSuccess(ignored -> LOGGER.trace("Completed Partition Move for partition {}", (Object)partition)).onErrorResume(e -> {
            if (e instanceof RollbackException) {
                LOGGER.warn("Rollback during Partition Move for partition {}", (Object)partition);
            } else {
                LOGGER.warn("Error during Partition Move for partition {}", (Object)partition, e);
                if (this.env.eventBus() != null) {
                    this.env.eventBus().publish(new FailedToMovePartitionEvent(partition, (Throwable)e));
                }
            }
            return Mono.empty();
        }).subscribe();
    }
}

