/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.java.cluster;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.config.BucketsConfigRequest;
import com.couchbase.client.core.message.config.BucketsConfigResponse;
import com.couchbase.client.core.message.config.ClusterConfigRequest;
import com.couchbase.client.core.message.config.ClusterConfigResponse;
import com.couchbase.client.core.message.config.InsertBucketRequest;
import com.couchbase.client.core.message.config.InsertBucketResponse;
import com.couchbase.client.core.message.config.RemoveBucketRequest;
import com.couchbase.client.core.message.config.RemoveBucketResponse;
import com.couchbase.client.core.message.config.UpdateBucketRequest;
import com.couchbase.client.core.message.config.UpdateBucketResponse;
import com.couchbase.client.core.message.internal.AddNodeRequest;
import com.couchbase.client.core.message.internal.AddNodeResponse;
import com.couchbase.client.core.message.internal.AddServiceRequest;
import com.couchbase.client.core.message.internal.AddServiceResponse;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.java.ConnectionString;
import com.couchbase.client.java.CouchbaseAsyncBucket;
import com.couchbase.client.java.bucket.BucketType;
import com.couchbase.client.java.cluster.AsyncClusterManager;
import com.couchbase.client.java.cluster.BucketSettings;
import com.couchbase.client.java.cluster.ClusterInfo;
import com.couchbase.client.java.cluster.DefaultBucketSettings;
import com.couchbase.client.java.cluster.DefaultClusterInfo;
import com.couchbase.client.java.cluster.api.AsyncClusterApiClient;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.error.BucketAlreadyExistsException;
import com.couchbase.client.java.error.BucketDoesNotExistException;
import com.couchbase.client.java.error.InvalidPasswordException;
import com.couchbase.client.java.error.TranscodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

public class DefaultAsyncClusterManager
implements AsyncClusterManager {
    final ClusterFacade core;
    final String username;
    final String password;
    final CouchbaseEnvironment environment;
    private final ConnectionString connectionString;

    DefaultAsyncClusterManager(String username, String password, ConnectionString connectionString, CouchbaseEnvironment environment, ClusterFacade core) {
        this.username = username;
        this.password = password;
        this.core = core;
        this.environment = environment;
        this.connectionString = connectionString;
    }

    public static DefaultAsyncClusterManager create(String username, String password, ConnectionString connectionString, CouchbaseEnvironment environment, ClusterFacade core) {
        return new DefaultAsyncClusterManager(username, password, connectionString, environment, core);
    }

    @Override
    @InterfaceStability.Experimental
    public Observable<AsyncClusterApiClient> apiClient() {
        return this.ensureServiceEnabled().map((Func1)new Func1<Boolean, AsyncClusterApiClient>(){

            public AsyncClusterApiClient call(Boolean aBoolean) {
                return new AsyncClusterApiClient(DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password, DefaultAsyncClusterManager.this.core);
            }
        });
    }

    @Override
    public Observable<ClusterInfo> info() {
        return this.ensureServiceEnabled().flatMap((Func1)new Func1<Boolean, Observable<ClusterConfigResponse>>(){

            public Observable<ClusterConfigResponse> call(Boolean aBoolean) {
                return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new ClusterConfigRequest(DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password));
            }
        }).doOnNext((Action1)new Action1<ClusterConfigResponse>(){

            public void call(ClusterConfigResponse response) {
                if (!response.status().isSuccess()) {
                    if (response.config().contains("Unauthorized")) {
                        throw new InvalidPasswordException();
                    }
                    throw new CouchbaseException(response.status() + ": " + response.config());
                }
            }
        }).map((Func1)new Func1<ClusterConfigResponse, ClusterInfo>(){

            public ClusterInfo call(ClusterConfigResponse response) {
                try {
                    return new DefaultClusterInfo(CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER.stringToJsonObject(response.config()));
                }
                catch (Exception e) {
                    throw new TranscodingException("Could not decode cluster info.", e);
                }
            }
        });
    }

    @Override
    public Observable<BucketSettings> getBuckets() {
        return this.ensureServiceEnabled().flatMap((Func1)new Func1<Boolean, Observable<BucketsConfigResponse>>(){

            public Observable<BucketsConfigResponse> call(Boolean aBoolean) {
                return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new BucketsConfigRequest(DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password));
            }
        }).doOnNext((Action1)new Action1<BucketsConfigResponse>(){

            public void call(BucketsConfigResponse response) {
                if (!response.status().isSuccess()) {
                    if (response.config().contains("Unauthorized")) {
                        throw new InvalidPasswordException();
                    }
                    throw new CouchbaseException(response.status() + ": " + response.config());
                }
            }
        }).flatMap((Func1)new Func1<BucketsConfigResponse, Observable<BucketSettings>>(){

            public Observable<BucketSettings> call(BucketsConfigResponse response) {
                try {
                    JsonArray decoded = CouchbaseAsyncBucket.JSON_ARRAY_TRANSCODER.stringToJsonArray(response.config());
                    ArrayList<DefaultBucketSettings> settings = new ArrayList<DefaultBucketSettings>();
                    for (Object item : decoded) {
                        JsonObject bucket = (JsonObject)item;
                        JsonObject controllers = bucket.getObject("controllers");
                        boolean enableFlush = controllers != null && controllers.getString("flush") != null;
                        Boolean replicaIndex = bucket.getBoolean("replicaIndex");
                        boolean indexReplicas = replicaIndex != null ? replicaIndex : false;
                        int ramQuota = 0;
                        ramQuota = bucket.getObject("quota").get("ram") instanceof Long ? (int)(bucket.getObject("quota").getLong("ram") / 1024L / 1024L) : bucket.getObject("quota").getInt("ram") / 1024 / 1024;
                        BucketType bucketType = "membase".equalsIgnoreCase(bucket.getString("bucketType")) ? BucketType.COUCHBASE : BucketType.MEMCACHED;
                        settings.add(DefaultBucketSettings.builder().name(bucket.getString("name")).enableFlush(enableFlush).type(bucketType).replicas(bucket.getInt("replicaNumber")).quota(ramQuota).indexReplicas(indexReplicas).port(bucket.getInt("proxyPort")).password(bucket.getString("saslPassword")).build(bucket));
                    }
                    return Observable.from(settings);
                }
                catch (Exception e) {
                    throw new TranscodingException("Could not decode cluster info.", e);
                }
            }
        });
    }

    @Override
    public Observable<BucketSettings> getBucket(final String name) {
        return this.getBuckets().filter((Func1)new Func1<BucketSettings, Boolean>(){

            public Boolean call(BucketSettings bucketSettings) {
                return bucketSettings.name().equals(name);
            }
        });
    }

    @Override
    public Observable<Boolean> hasBucket(String name) {
        return this.getBucket(name).isEmpty().map((Func1)new Func1<Boolean, Boolean>(){

            public Boolean call(Boolean notFound) {
                return notFound == false;
            }
        });
    }

    @Override
    public Observable<Boolean> removeBucket(final String name) {
        return this.ensureServiceEnabled().flatMap((Func1)new Func1<Boolean, Observable<RemoveBucketResponse>>(){

            public Observable<RemoveBucketResponse> call(Boolean aBoolean) {
                return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new RemoveBucketRequest(name, DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password));
            }
        }).map((Func1)new Func1<RemoveBucketResponse, Boolean>(){

            public Boolean call(RemoveBucketResponse response) {
                return response.status().isSuccess();
            }
        });
    }

    @Override
    public Observable<BucketSettings> insertBucket(final BucketSettings settings) {
        final String payload = this.getConfigureBucketPayload(settings, true);
        return this.ensureBucketIsHealthy((Observable<BucketSettings>)this.hasBucket(settings.name()).doOnNext((Action1)new Action1<Boolean>(){

            public void call(Boolean exists) {
                if (exists.booleanValue()) {
                    throw new BucketAlreadyExistsException("Bucket " + settings.name() + " already exists!");
                }
            }
        }).flatMap((Func1)new Func1<Boolean, Observable<InsertBucketResponse>>(){

            public Observable<InsertBucketResponse> call(Boolean exists) {
                return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new InsertBucketRequest(payload, DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password));
            }
        }).map((Func1)new Func1<InsertBucketResponse, BucketSettings>(){

            public BucketSettings call(InsertBucketResponse response) {
                if (!response.status().isSuccess()) {
                    throw new CouchbaseException("Could not insert bucket: " + response.config());
                }
                return settings;
            }
        }));
    }

    @Override
    public Observable<BucketSettings> updateBucket(final BucketSettings settings) {
        final String payload = this.getConfigureBucketPayload(settings, false);
        return this.ensureBucketIsHealthy((Observable<BucketSettings>)this.hasBucket(settings.name()).doOnNext((Action1)new Action1<Boolean>(){

            public void call(Boolean exists) {
                if (!exists.booleanValue()) {
                    throw new BucketDoesNotExistException("Bucket " + settings.name() + " does not exist!");
                }
            }
        }).flatMap((Func1)new Func1<Boolean, Observable<UpdateBucketResponse>>(){

            public Observable<UpdateBucketResponse> call(Boolean exists) {
                return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new UpdateBucketRequest(settings.name(), payload, DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password));
            }
        }).map((Func1)new Func1<UpdateBucketResponse, BucketSettings>(){

            public BucketSettings call(UpdateBucketResponse response) {
                if (!response.status().isSuccess()) {
                    throw new CouchbaseException("Could not update bucket: " + response.config());
                }
                return settings;
            }
        }));
    }

    protected String getConfigureBucketPayload(BucketSettings settings, boolean includeName) {
        Map<String, Object> customSettings = settings.customSettings();
        LinkedHashMap<String, Object> actual = new LinkedHashMap<String, Object>(8 + customSettings.size());
        if (includeName) {
            actual.put("name", settings.name());
        }
        actual.put("ramQuotaMB", settings.quota());
        actual.put("authType", "sasl");
        actual.put("saslPassword", settings.password());
        actual.put("replicaNumber", settings.replicas());
        actual.put("proxyPort", settings.port());
        actual.put("bucketType", settings.type() == BucketType.COUCHBASE ? "membase" : "memcached");
        actual.put("flushEnabled", settings.enableFlush() ? "1" : "0");
        for (Map.Entry<String, Object> customSetting : customSettings.entrySet()) {
            if (actual.containsKey(customSetting.getKey()) || !includeName && "name".equals(customSetting.getKey())) continue;
            actual.put(customSetting.getKey(), customSetting.getValue());
        }
        StringBuilder sb = new StringBuilder();
        for (Map.Entry setting : actual.entrySet()) {
            sb.append('&').append((String)setting.getKey()).append('=').append(setting.getValue());
        }
        if (sb.length() > 0) {
            sb.deleteCharAt(0);
        }
        return sb.toString();
    }

    private Observable<BucketSettings> ensureBucketIsHealthy(Observable<BucketSettings> input) {
        return input.flatMap((Func1)new Func1<BucketSettings, Observable<BucketSettings>>(){

            public Observable<BucketSettings> call(final BucketSettings bucketSettings) {
                return DefaultAsyncClusterManager.this.info().delay(100L, TimeUnit.MILLISECONDS).filter((Func1)new Func1<ClusterInfo, Boolean>(){

                    public Boolean call(ClusterInfo clusterInfo) {
                        boolean allHealthy = true;
                        for (Object n : clusterInfo.raw().getArray("nodes")) {
                            JsonObject node = (JsonObject)n;
                            if (node.getString("status").equals("healthy")) continue;
                            allHealthy = false;
                            break;
                        }
                        return allHealthy;
                    }
                }).repeat().take(1).flatMap((Func1)new Func1<ClusterInfo, Observable<BucketSettings>>(){

                    public Observable<BucketSettings> call(ClusterInfo clusterInfo) {
                        return Observable.just((Object)bucketSettings);
                    }
                });
            }
        });
    }

    private Observable<Boolean> ensureServiceEnabled() {
        return Observable.just((Object)this.connectionString.hosts().get(0).getHostName()).map((Func1)new Func1<String, InetAddress>(){

            public InetAddress call(String hostname) {
                try {
                    return InetAddress.getByName(hostname);
                }
                catch (UnknownHostException e) {
                    throw new CouchbaseException((Throwable)e);
                }
            }
        }).flatMap((Func1)new Func1<InetAddress, Observable<AddServiceResponse>>(){

            public Observable<AddServiceResponse> call(final InetAddress hostname) {
                return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new AddNodeRequest(hostname)).flatMap((Func1)new Func1<AddNodeResponse, Observable<AddServiceResponse>>(){

                    public Observable<AddServiceResponse> call(AddNodeResponse response) {
                        int port = DefaultAsyncClusterManager.this.environment.sslEnabled() ? DefaultAsyncClusterManager.this.environment.bootstrapHttpSslPort() : DefaultAsyncClusterManager.this.environment.bootstrapHttpDirectPort();
                        return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new AddServiceRequest(ServiceType.CONFIG, DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password, port, hostname));
                    }
                });
            }
        }).map((Func1)new Func1<AddServiceResponse, Boolean>(){

            public Boolean call(AddServiceResponse addServiceResponse) {
                if (!addServiceResponse.status().isSuccess()) {
                    throw new CouchbaseException("Could not enable ClusterManager service to function properly.");
                }
                return true;
            }
        });
    }
}

