/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.java.cluster;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.config.BucketsConfigRequest;
import com.couchbase.client.core.message.config.BucketsConfigResponse;
import com.couchbase.client.core.message.config.ClusterConfigRequest;
import com.couchbase.client.core.message.config.ClusterConfigResponse;
import com.couchbase.client.core.message.config.InsertBucketRequest;
import com.couchbase.client.core.message.config.InsertBucketResponse;
import com.couchbase.client.core.message.config.RemoveBucketRequest;
import com.couchbase.client.core.message.config.RemoveBucketResponse;
import com.couchbase.client.core.message.config.UpdateBucketRequest;
import com.couchbase.client.core.message.config.UpdateBucketResponse;
import com.couchbase.client.core.message.internal.AddNodeRequest;
import com.couchbase.client.core.message.internal.AddNodeResponse;
import com.couchbase.client.core.message.internal.AddServiceRequest;
import com.couchbase.client.core.message.internal.AddServiceResponse;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.java.ConnectionString;
import com.couchbase.client.java.CouchbaseAsyncBucket;
import com.couchbase.client.java.bucket.BucketType;
import com.couchbase.client.java.cluster.AsyncClusterManager;
import com.couchbase.client.java.cluster.BucketSettings;
import com.couchbase.client.java.cluster.ClusterInfo;
import com.couchbase.client.java.cluster.DefaultBucketSettings;
import com.couchbase.client.java.cluster.DefaultClusterInfo;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.error.BucketAlreadyExistsException;
import com.couchbase.client.java.error.BucketDoesNotExistException;
import com.couchbase.client.java.error.TranscodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

public class DefaultAsyncClusterManager
implements AsyncClusterManager {
    private final ClusterFacade core;
    private final String username;
    private final String password;
    private final CouchbaseEnvironment environment;
    private final ConnectionString connectionString;

    DefaultAsyncClusterManager(String username, String password, ConnectionString connectionString, CouchbaseEnvironment environment, ClusterFacade core) {
        this.username = username;
        this.password = password;
        this.core = core;
        this.environment = environment;
        this.connectionString = connectionString;
    }

    public static DefaultAsyncClusterManager create(String username, String password, ConnectionString connectionString, CouchbaseEnvironment environment, ClusterFacade core) {
        return new DefaultAsyncClusterManager(username, password, connectionString, environment, core);
    }

    @Override
    public Observable<ClusterInfo> info() {
        return this.ensureServiceEnabled().flatMap((Func1)new Func1<Boolean, Observable<ClusterConfigResponse>>(){

            public Observable<ClusterConfigResponse> call(Boolean aBoolean) {
                return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new ClusterConfigRequest(DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password));
            }
        }).map((Func1)new Func1<ClusterConfigResponse, ClusterInfo>(){

            public ClusterInfo call(ClusterConfigResponse response) {
                try {
                    return new DefaultClusterInfo(CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER.stringToJsonObject(response.config()));
                }
                catch (Exception e) {
                    throw new TranscodingException("Could not decode cluster info.", e);
                }
            }
        });
    }

    @Override
    public Observable<BucketSettings> getBuckets() {
        return this.ensureServiceEnabled().flatMap((Func1)new Func1<Boolean, Observable<BucketsConfigResponse>>(){

            public Observable<BucketsConfigResponse> call(Boolean aBoolean) {
                return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new BucketsConfigRequest(DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password));
            }
        }).flatMap((Func1)new Func1<BucketsConfigResponse, Observable<BucketSettings>>(){

            public Observable<BucketSettings> call(BucketsConfigResponse response) {
                try {
                    JsonArray decoded = CouchbaseAsyncBucket.JSON_ARRAY_TRANSCODER.stringToJsonArray(response.config());
                    ArrayList<DefaultBucketSettings> settings = new ArrayList<DefaultBucketSettings>();
                    for (Object item : decoded) {
                        JsonObject bucket = (JsonObject)item;
                        settings.add(DefaultBucketSettings.builder().name(bucket.getString("name")).enableFlush(bucket.getObject("controllers").getString("flush") != null).type(bucket.getString("bucketType").equals("membase") ? BucketType.COUCHBASE : BucketType.MEMCACHED).replicas(bucket.getInt("replicaNumber")).quota(bucket.getObject("quota").getInt("ram")).indexReplicas(bucket.getBoolean("replicaIndex")).port(bucket.getInt("proxyPort")).password(bucket.getString("saslPassword")).build());
                    }
                    return Observable.from(settings);
                }
                catch (Exception e) {
                    throw new TranscodingException("Could not decode cluster info.", e);
                }
            }
        });
    }

    @Override
    public Observable<BucketSettings> getBucket(final String name) {
        return this.getBuckets().filter((Func1)new Func1<BucketSettings, Boolean>(){

            public Boolean call(BucketSettings bucketSettings) {
                return bucketSettings.name().equals(name);
            }
        });
    }

    @Override
    public Observable<Boolean> hasBucket(String name) {
        return this.getBucket(name).isEmpty().map((Func1)new Func1<Boolean, Boolean>(){

            public Boolean call(Boolean notFound) {
                return notFound == false;
            }
        });
    }

    @Override
    public Observable<Boolean> removeBucket(final String name) {
        return this.ensureServiceEnabled().flatMap((Func1)new Func1<Boolean, Observable<RemoveBucketResponse>>(){

            public Observable<RemoveBucketResponse> call(Boolean aBoolean) {
                return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new RemoveBucketRequest(name, DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password));
            }
        }).map((Func1)new Func1<RemoveBucketResponse, Boolean>(){

            public Boolean call(RemoveBucketResponse response) {
                return response.status().isSuccess();
            }
        });
    }

    @Override
    public Observable<BucketSettings> insertBucket(final BucketSettings settings) {
        final StringBuilder sb = new StringBuilder();
        sb.append("name=").append(settings.name());
        sb.append("&ramQuotaMB=").append(settings.quota());
        sb.append("&authType=").append("sasl");
        sb.append("&saslPassword=").append(settings.password());
        sb.append("&replicaNumber=").append(settings.replicas());
        sb.append("&proxyPort=").append(settings.port());
        sb.append("&bucketType=").append(settings.type() == BucketType.COUCHBASE ? "membase" : "memcached");
        sb.append("&flushEnabled=").append(settings.enableFlush() ? "1" : "0");
        return this.hasBucket(settings.name()).doOnNext((Action1)new Action1<Boolean>(){

            public void call(Boolean exists) {
                if (exists.booleanValue()) {
                    throw new BucketAlreadyExistsException("Bucket " + settings.name() + " already exists!");
                }
            }
        }).flatMap((Func1)new Func1<Boolean, Observable<InsertBucketResponse>>(){

            public Observable<InsertBucketResponse> call(Boolean exists) {
                return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new InsertBucketRequest(sb.toString(), DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password));
            }
        }).map((Func1)new Func1<InsertBucketResponse, BucketSettings>(){

            public BucketSettings call(InsertBucketResponse response) {
                if (!response.status().isSuccess()) {
                    throw new CouchbaseException("Could not insert bucket: " + response.config());
                }
                return settings;
            }
        });
    }

    @Override
    public Observable<BucketSettings> updateBucket(final BucketSettings settings) {
        final StringBuilder sb = new StringBuilder();
        sb.append("ramQuotaMB=").append(settings.quota());
        sb.append("&authType=").append("sasl");
        sb.append("&saslPassword=").append(settings.password());
        sb.append("&replicaNumber=").append(settings.replicas());
        sb.append("&proxyPort=").append(settings.port());
        sb.append("&bucketType=").append(settings.type() == BucketType.COUCHBASE ? "membase" : "memcached");
        sb.append("&flushEnabled=").append(settings.enableFlush() ? "1" : "0");
        return this.hasBucket(settings.name()).doOnNext((Action1)new Action1<Boolean>(){

            public void call(Boolean exists) {
                if (!exists.booleanValue()) {
                    throw new BucketDoesNotExistException("Bucket " + settings.name() + " does not exist!");
                }
            }
        }).flatMap((Func1)new Func1<Boolean, Observable<UpdateBucketResponse>>(){

            public Observable<UpdateBucketResponse> call(Boolean exists) {
                return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new UpdateBucketRequest(settings.name(), sb.toString(), DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password));
            }
        }).map((Func1)new Func1<UpdateBucketResponse, BucketSettings>(){

            public BucketSettings call(UpdateBucketResponse response) {
                if (!response.status().isSuccess()) {
                    throw new CouchbaseException("Could not update bucket: " + response.config());
                }
                return settings;
            }
        });
    }

    private Observable<Boolean> ensureServiceEnabled() {
        return Observable.just((Object)this.connectionString.hosts().get(0).getHostName()).map((Func1)new Func1<String, InetAddress>(){

            public InetAddress call(String hostname) {
                try {
                    return InetAddress.getByName(hostname);
                }
                catch (UnknownHostException e) {
                    throw new CouchbaseException((Throwable)e);
                }
            }
        }).flatMap((Func1)new Func1<InetAddress, Observable<AddServiceResponse>>(){

            public Observable<AddServiceResponse> call(final InetAddress hostname) {
                return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new AddNodeRequest(hostname)).flatMap((Func1)new Func1<AddNodeResponse, Observable<AddServiceResponse>>(){

                    public Observable<AddServiceResponse> call(AddNodeResponse response) {
                        int port = DefaultAsyncClusterManager.this.environment.sslEnabled() ? DefaultAsyncClusterManager.this.environment.bootstrapHttpSslPort() : DefaultAsyncClusterManager.this.environment.bootstrapHttpDirectPort();
                        return DefaultAsyncClusterManager.this.core.send((CouchbaseRequest)new AddServiceRequest(ServiceType.CONFIG, DefaultAsyncClusterManager.this.username, DefaultAsyncClusterManager.this.password, port, hostname));
                    }
                });
            }
        }).map((Func1)new Func1<AddServiceResponse, Boolean>(){

            public Boolean call(AddServiceResponse addServiceResponse) {
                if (!addServiceResponse.status().isSuccess()) {
                    throw new CouchbaseException("Could not enable ClusterManager service to function properly.");
                }
                return true;
            }
        });
    }
}

