/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.conductor;

import com.couchbase.client.dcp.buffer.DcpBucketConfig;
import com.couchbase.client.dcp.conductor.BucketConfigSink;
import com.couchbase.client.dcp.conductor.BucketConfigSource;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.config.HostAndPort;
import com.couchbase.client.dcp.core.config.AlternateAddress;
import com.couchbase.client.dcp.core.config.BucketConfig;
import com.couchbase.client.dcp.core.config.CouchbaseBucketConfig;
import com.couchbase.client.dcp.core.config.NodeInfo;
import com.couchbase.client.dcp.core.config.parser.BucketConfigParser;
import com.couchbase.client.dcp.core.env.NetworkResolution;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.subjects.BehaviorSubject;
import rx.subjects.Subject;

public class BucketConfigArbiter
implements BucketConfigSink,
BucketConfigSource {
    private static final Logger log = LoggerFactory.getLogger(BucketConfigArbiter.class);
    private final Subject<DcpBucketConfig, DcpBucketConfig> configStream = BehaviorSubject.create().toSerialized();
    private final Object revLock = new Object();
    private long currentRev = -1L;
    private boolean hasDeterminedAlternateNetworkName = false;
    private String alternateNetworkName;
    private final ClientEnvironment environment;
    private static final Pattern REV_PATTERN = Pattern.compile("\"rev\"\\s*:\\s*(-?\\d+)");

    public BucketConfigArbiter(ClientEnvironment environment) {
        this.environment = Objects.requireNonNull(environment);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void accept(HostAndPort origin, String rawConfig, long rev) {
        Object object = this.revLock;
        synchronized (object) {
            RedactableArgument formattedOrigin = RedactableArgument.system(origin.host() + ":" + origin.port());
            if (rev <= this.currentRev) {
                log.debug("Ignoring bucket config revision {} from {}; not newer than current revision {}", new Object[]{formattedOrigin, rev, this.currentRev});
                return;
            }
            log.debug("Received bucket config revision {} from {} -> {}", new Object[]{rev, formattedOrigin, RedactableArgument.system(rawConfig)});
            try {
                this.currentRev = rev;
                CouchbaseBucketConfig config = (CouchbaseBucketConfig)BucketConfigParser.parse(rawConfig, origin.host());
                this.selectAlternateNetwork(config);
                this.configStream.onNext((Object)new DcpBucketConfig(config, this.environment.sslEnabled()));
            }
            catch (Exception e) {
                log.error("Failed to parse bucket config", (Throwable)e);
            }
        }
    }

    @Override
    public void accept(HostAndPort origin, String rawConfig) {
        try {
            this.accept(origin, rawConfig, BucketConfigArbiter.getRev(rawConfig));
        }
        catch (Exception e) {
            log.error("Failed to parse bucket config", (Throwable)e);
        }
    }

    @Override
    public Observable<DcpBucketConfig> configs() {
        return this.configStream;
    }

    private void selectAlternateNetwork(CouchbaseBucketConfig config) {
        if (!Thread.holdsLock(this.revLock)) {
            throw new IllegalStateException("Must hold revLock");
        }
        if (!this.hasDeterminedAlternateNetworkName) {
            String displayName;
            Set<String> seedHosts = this.environment.clusterAt().stream().map(HostAndPort::host).collect(Collectors.toSet());
            this.alternateNetworkName = BucketConfigArbiter.determineNetworkResolution(config, this.environment.networkResolution(), seedHosts);
            this.hasDeterminedAlternateNetworkName = true;
            String string = displayName = this.alternateNetworkName == null ? "<default>" : this.alternateNetworkName;
            if (NetworkResolution.AUTO.equals(this.environment.networkResolution())) {
                displayName = "auto -> " + displayName;
            }
            log.info("Selected network: {}", (Object)displayName);
        }
        config.useAlternateNetwork(this.alternateNetworkName);
    }

    private static String determineNetworkResolution(BucketConfig config, NetworkResolution nr, Set<String> seedHosts) {
        if (nr.equals(NetworkResolution.DEFAULT)) {
            return null;
        }
        if (nr.equals(NetworkResolution.AUTO)) {
            for (NodeInfo info : config.nodes()) {
                if (seedHosts.contains(info.hostname())) {
                    return null;
                }
                Map<String, AlternateAddress> aa = info.alternateAddresses();
                if (aa == null || aa.isEmpty()) continue;
                for (Map.Entry<String, AlternateAddress> entry : aa.entrySet()) {
                    AlternateAddress alternateAddress = entry.getValue();
                    if (alternateAddress == null || !seedHosts.contains(alternateAddress.hostname())) continue;
                    return entry.getKey();
                }
            }
            return null;
        }
        return nr.name();
    }

    private static long getRev(String rawConfig) {
        Matcher m = REV_PATTERN.matcher(rawConfig);
        if (m.find()) {
            return Long.parseLong(m.group(1));
        }
        throw new IllegalArgumentException("Failed to locate revision property in " + RedactableArgument.system(rawConfig));
    }
}

