/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.servicediscovery.zookeeper;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.servicediscovery.Record;
import io.vertx.servicediscovery.spi.ServiceImporter;
import io.vertx.servicediscovery.spi.ServicePublisher;
import io.vertx.servicediscovery.zookeeper.JsonObjectSerializer;
import io.vertx.servicediscovery.zookeeper.RegistrationHolder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.InstanceSerializer;
import org.apache.zookeeper.KeeperException;

public class ZookeeperServiceImporter
implements ServiceImporter,
TreeCacheListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServiceImporter.class);
    private ServicePublisher publisher;
    private CuratorFramework client;
    private ServiceDiscovery<JsonObject> discovery;
    private TreeCache cache;
    private volatile boolean started;
    private Set<RegistrationHolder<ServiceInstance<JsonObject>>> registrations = new ConcurrentHashSet();

    public void start(Vertx vertx, ServicePublisher publisher, JsonObject configuration, Promise<Void> future) {
        this.publisher = publisher;
        String connection = Objects.requireNonNull(configuration.getString("connection"));
        int maxRetries = configuration.getInteger("maxRetries", Integer.valueOf(3));
        int baseGraceBetweenRetries = configuration.getInteger("baseSleepTimeBetweenRetries", Integer.valueOf(1000));
        String basePath = configuration.getString("basePath", "/discovery");
        boolean canBeReadOnly = configuration.getBoolean("canBeReadOnly", Boolean.valueOf(true));
        int connectionTimeoutMs = configuration.getInteger("connectionTimeoutMs", Integer.valueOf(1000));
        vertx.executeBlocking(f -> {
            try {
                this.client = CuratorFrameworkFactory.builder().canBeReadOnly(canBeReadOnly).connectString(connection).connectionTimeoutMs(connectionTimeoutMs).retryPolicy((RetryPolicy)new ExponentialBackoffRetry(baseGraceBetweenRetries, maxRetries)).build();
                this.client.start();
                this.discovery = ServiceDiscoveryBuilder.builder(JsonObject.class).client(this.client).basePath(basePath).serializer((InstanceSerializer)new JsonObjectSerializer()).watchInstances(true).build();
                this.discovery.start();
                this.cache = TreeCache.newBuilder((CuratorFramework)this.client, (String)basePath).build();
                this.cache.start();
                this.cache.getListenable().addListener((Object)this);
                f.complete();
            }
            catch (Exception e) {
                future.fail((Throwable)e);
            }
        }, ar -> {
            if (ar.failed()) {
                future.fail(ar.cause());
            } else {
                Promise p = Promise.promise();
                p.future().setHandler(x -> {
                    if (x.failed()) {
                        future.fail(x.cause());
                    } else {
                        future.complete(null);
                    }
                });
                this.compute((Promise<Void>)p);
            }
        });
    }

    private synchronized void compute(Promise<Void> done) {
        ArrayList instances = new ArrayList();
        try {
            Collection names = this.discovery.queryForNames();
            for (String name : names) {
                instances.addAll(this.discovery.queryForInstances(name));
            }
        }
        catch (KeeperException.NoNodeException names) {
        }
        catch (Exception e) {
            if (done != null) {
                done.fail((Throwable)e);
            }
            LOGGER.error((Object)"Unable to retrieve service instances from Zookeeper", (Throwable)e);
            return;
        }
        HashSet registered = new HashSet(this.registrations);
        HashSet remote = new HashSet(instances);
        ArrayList actions = new ArrayList();
        RegistrationHolder.filter(registered, instances).stream().map(reg -> {
            Promise promise = Promise.promise();
            this.publisher.unpublish(reg.record().getRegistration(), v -> {
                this.registrations.remove(reg);
                if (v.succeeded()) {
                    promise.complete(null);
                } else {
                    promise.fail(v.cause());
                }
            });
            return promise.future();
        }).forEach(actions::add);
        RegistrationHolder.filter(remote, this.registrations).stream().map(instance -> {
            Promise promise = Promise.promise();
            this.publisher.publish(ZookeeperServiceImporter.createRecordForInstance((ServiceInstance<JsonObject>)instance), v -> {
                if (v.succeeded()) {
                    this.registrations.add(new RegistrationHolder<ServiceInstance>((Record)v.result(), (ServiceInstance)instance));
                    promise.complete(null);
                } else {
                    promise.fail(v.cause());
                }
            });
            return promise.future();
        }).forEach(actions::add);
        if (done != null) {
            CompositeFuture.all(actions).setHandler(ar -> {
                if (ar.succeeded()) {
                    done.complete(null);
                } else {
                    done.fail(ar.cause());
                }
            });
        }
    }

    static Record createRecordForInstance(ServiceInstance<JsonObject> instance) {
        Record record = new Record();
        record.setName(instance.getName());
        JsonObject payload = (JsonObject)instance.getPayload();
        record.setMetadata(payload);
        record.getMetadata().put("zookeeper-service-type", instance.getServiceType().toString());
        record.getMetadata().put("zookeeper-address", instance.getAddress());
        record.getMetadata().put("zookeeper-registration-time", Long.valueOf(instance.getRegistrationTimeUTC()));
        record.getMetadata().put("zookeeper-port", instance.getPort());
        record.getMetadata().put("zookeeper-ssl-port", instance.getSslPort());
        record.getMetadata().put("zookeeper-id", instance.getId());
        record.setLocation(new JsonObject());
        if (instance.getUriSpec() != null) {
            String uri = instance.buildUriSpec();
            record.getLocation().put("endpoint", uri);
        } else {
            String uri = "http";
            uri = instance.getSslPort() != null ? uri + "s://" + instance.getAddress() + ":" + instance.getSslPort() : (instance.getPort() != null ? uri + "s://" + instance.getAddress() + ":" + instance.getPort() : uri + "://" + instance.getAddress());
            record.getLocation().put("endpoint", uri);
        }
        if (instance.getPort() != null) {
            record.getLocation().put("port", instance.getPort());
        }
        if (instance.getSslPort() != null) {
            record.getLocation().put("ssl-port", instance.getSslPort());
        }
        if (instance.getAddress() != null) {
            record.getLocation().put("address", instance.getAddress());
        }
        record.setType(payload.getString("service-type", "unknown"));
        return record;
    }

    public void close(Handler<Void> closeHandler) {
        Promise done = Promise.promise();
        this.unregisterAllServices((Promise<Void>)done);
        done.future().setHandler(v -> {
            try {
                this.cache.close();
                this.discovery.close();
                this.client.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            closeHandler.handle(null);
        });
    }

    public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) {
        if (treeCacheEvent.getType() == TreeCacheEvent.Type.INITIALIZED) {
            this.started = true;
        } else if (this.started) {
            this.compute(null);
        }
    }

    private synchronized void unregisterAllServices(Promise<Void> done) {
        ArrayList list = new ArrayList();
        new HashSet<RegistrationHolder<ServiceInstance<JsonObject>>>(this.registrations).forEach(reg -> {
            Promise unreg = Promise.promise();
            this.publisher.unpublish(reg.record().getRegistration(), (Handler)unreg);
        });
        this.registrations.clear();
        CompositeFuture.all(list).setHandler(x -> {
            if (x.failed()) {
                done.fail(x.cause());
            } else {
                done.complete();
            }
        });
    }
}

