/*
 * Decompiled with CFR 0.152.
 */
package alluxio.membership;

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.runtime.UnavailableRuntimeException;
import alluxio.membership.ServiceDiscoveryRecipe;
import alluxio.membership.StateListener;
import alluxio.resource.LockResource;
import alluxio.retry.ExponentialBackoffRetry;
import alluxio.retry.RetryPolicy;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.common.base.MoreObjects;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.io.etcd.jetcd.ByteSequence;
import alluxio.shaded.client.io.etcd.jetcd.Client;
import alluxio.shaded.client.io.etcd.jetcd.ClientBuilder;
import alluxio.shaded.client.io.etcd.jetcd.KeyValue;
import alluxio.shaded.client.io.etcd.jetcd.Watch;
import alluxio.shaded.client.io.etcd.jetcd.kv.GetResponse;
import alluxio.shaded.client.io.etcd.jetcd.lease.LeaseGrantResponse;
import alluxio.shaded.client.io.etcd.jetcd.lease.LeaseRevokeResponse;
import alluxio.shaded.client.io.etcd.jetcd.lease.LeaseTimeToLiveResponse;
import alluxio.shaded.client.io.etcd.jetcd.options.DeleteOption;
import alluxio.shaded.client.io.etcd.jetcd.options.GetOption;
import alluxio.shaded.client.io.etcd.jetcd.options.LeaseOption;
import alluxio.shaded.client.io.etcd.jetcd.options.WatchOption;
import alluxio.shaded.client.io.etcd.jetcd.watch.WatchEvent;
import alluxio.shaded.client.io.etcd.jetcd.watch.WatchResponse;
import alluxio.shaded.client.io.netty.util.internal.StringUtil;
import alluxio.shaded.client.javax.annotation.Nullable;
import alluxio.shaded.client.javax.annotation.concurrent.GuardedBy;
import alluxio.util.io.PathUtils;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AlluxioEtcdClient {
    private static final Logger LOG = LoggerFactory.getLogger(AlluxioEtcdClient.class);
    private static final Lock INSTANCE_LOCK = new ReentrantLock();
    public static final String BASE_PATH = "/ServiceDiscovery";
    public static final long DEFAULT_LEASE_TTL_IN_SEC = 5L;
    public static final long DEFAULT_TIMEOUT_IN_SEC = 5L;
    public static final int RETRY_TIMES = 5;
    private static final int RETRY_SLEEP_IN_MS = 500;
    private static final int MAX_RETRY_SLEEP_IN_MS = 5000;
    @Nullable
    @GuardedBy(value="INSTANCE_LOCK")
    private static volatile AlluxioEtcdClient sAlluxioEtcdClient;
    public final ServiceDiscoveryRecipe mServiceDiscovery;
    private final ConcurrentHashMap<String, Watch.Watcher> mRegisteredWatchers = new ConcurrentHashMap();
    private Client mClient;
    private final String[] mEndpoints;

    @VisibleForTesting
    public AlluxioEtcdClient(AlluxioConfiguration conf) {
        String clusterName = conf.getString(PropertyKey.ALLUXIO_CLUSTER_NAME);
        List<String> endpointsList = conf.getList(PropertyKey.ETCD_ENDPOINTS);
        this.mEndpoints = endpointsList.toArray(new String[0]);
        this.mServiceDiscovery = new ServiceDiscoveryRecipe(this, String.format("%s%s%s", BASE_PATH, "/", clusterName));
        ClientBuilder jetcdClientBuilder = Client.builder().endpoints(this.mEndpoints);
        Preconditions.checkArgument(!(conf.isSet(PropertyKey.ETCD_USERNAME) ^ conf.isSet(PropertyKey.ETCD_PASSWORD)), "Need to set both username/password for etcd connection, only one is set.");
        if (conf.isSet(PropertyKey.ETCD_USERNAME) && conf.isSet(PropertyKey.ETCD_PASSWORD)) {
            jetcdClientBuilder.user(ByteSequence.from(conf.getString(PropertyKey.ETCD_USERNAME), StandardCharsets.UTF_8)).password(ByteSequence.from(conf.getString(PropertyKey.ETCD_PASSWORD), StandardCharsets.UTF_8));
        }
        this.mClient = jetcdClientBuilder.build();
    }

    public static AlluxioEtcdClient getInstance(AlluxioConfiguration conf) {
        if (sAlluxioEtcdClient == null) {
            try (LockResource lockResource = new LockResource(INSTANCE_LOCK);){
                if (sAlluxioEtcdClient == null) {
                    LOG.debug("Creating ETCD client");
                    sAlluxioEtcdClient = new AlluxioEtcdClient(conf);
                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                        try {
                            AlluxioEtcdClient.destroy();
                        }
                        catch (Throwable t) {
                            LOG.error("Failed to destroy ETCD client", t);
                        }
                    }, "alluxio-etcd-client-shutdown-hook"));
                    LOG.debug("ETCD client created");
                }
            }
        }
        return sAlluxioEtcdClient;
    }

    private <V> V retryInternal(String description, RetryPolicy retryPolicy, EtcdUtilCallable<V> etcdCallable) {
        Exception ex = null;
        while (retryPolicy.attempt()) {
            try {
                return etcdCallable.call();
            }
            catch (Exception e) {
                LOG.warn("Failed to {} (attempt {}): {}", new Object[]{description, retryPolicy.getAttemptCount(), e.toString()});
                ex = e;
                LOG.debug("AlluxioEtcdClient call failed ({}): ", (Object)retryPolicy.getAttemptCount(), (Object)ex);
            }
        }
        throw new UnavailableRuntimeException(String.format("Exhausted retry for (%s), retries:%s, last exception:", description, retryPolicy.getAttemptCount()), ex);
    }

    public Lease createLease(long ttlInSec, long timeout, TimeUnit timeUnit) {
        return this.retryInternal(String.format("Creating Lease with ttl:%s", ttlInSec), new ExponentialBackoffRetry(500, 5000, 5), () -> {
            CompletableFuture<LeaseGrantResponse> leaseGrantFut = this.getEtcdClient().getLeaseClient().grant(ttlInSec, timeout, timeUnit);
            LeaseGrantResponse resp = leaseGrantFut.get(timeout, timeUnit);
            long leaseId = resp.getID();
            Lease lease = new Lease(leaseId, ttlInSec);
            return lease;
        });
    }

    public Lease createLease() {
        return this.createLease(5L, 5L, TimeUnit.SECONDS);
    }

    public void revokeLease(Lease lease) {
        this.retryInternal(String.format("Revoking Lease:%s", lease.toString()), new ExponentialBackoffRetry(500, 5000, 5), () -> {
            CompletableFuture<LeaseRevokeResponse> leaseRevokeFut = this.getEtcdClient().getLeaseClient().revoke(lease.mLeaseId);
            leaseRevokeFut.get(5L, TimeUnit.SECONDS);
            return null;
        });
    }

    public boolean isLeaseExpired(Lease lease) {
        return this.retryInternal(String.format("Checking IsLeaseExpired, lease:%s", lease.toString()), new ExponentialBackoffRetry(500, 5000, 5), () -> {
            LeaseTimeToLiveResponse leaseResp = this.mClient.getLeaseClient().timeToLive(lease.mLeaseId, LeaseOption.DEFAULT).get(5L, TimeUnit.SECONDS);
            return leaseResp.getTTl() <= 0L;
        });
    }

    public void addChildren(String parentPath, String childPath, byte[] value) {
        Preconditions.checkArgument(!StringUtil.isNullOrEmpty(parentPath));
        Preconditions.checkArgument(!StringUtil.isNullOrEmpty(childPath));
        String fullPath = PathUtils.concatPath((Object)parentPath, (Object)childPath);
        Preconditions.checkArgument(!StringUtil.isNullOrEmpty(fullPath));
        this.retryInternal(String.format("Adding child for parentPath:%s, childPath:%s", parentPath, childPath), new ExponentialBackoffRetry(500, 5000, 0), () -> this.mClient.getKVClient().put(ByteSequence.from(fullPath, StandardCharsets.UTF_8), ByteSequence.from(value)).get(5L, TimeUnit.SECONDS));
    }

    public List<KeyValue> getChildren(String parentPath) {
        Preconditions.checkArgument(!StringUtil.isNullOrEmpty(parentPath));
        return this.retryInternal(String.format("Getting children for path:%s", parentPath), new ExponentialBackoffRetry(500, 5000, 5), () -> {
            GetResponse getResponse = this.mClient.getKVClient().get(ByteSequence.from(parentPath, StandardCharsets.UTF_8), GetOption.newBuilder().isPrefix(true).build()).get(5L, TimeUnit.SECONDS);
            return getResponse.getKvs();
        });
    }

    private void addListenerInternal(final String parentPath, final StateListener listener, WatchType watchType) {
        if (this.mRegisteredWatchers.containsKey(AlluxioEtcdClient.getRegisterWatcherKey(parentPath, watchType))) {
            LOG.warn("Watcher already there for path:{} for children.", (Object)parentPath);
            return;
        }
        WatchOption.Builder watchOptBuilder = WatchOption.newBuilder();
        switch (watchType) {
            case CHILDREN: {
                String keyRangeEnd = parentPath.substring(0, parentPath.length() - 1) + (char)(parentPath.charAt(parentPath.length() - 1) + '\u0001');
                watchOptBuilder.isPrefix(true).withRange(ByteSequence.from(keyRangeEnd, StandardCharsets.UTF_8));
                break;
            }
        }
        Watch.Watcher watcher = this.retryInternal(String.format("Adding listener for path:%s, type:%s", new Object[]{parentPath, watchType}), new ExponentialBackoffRetry(500, 5000, 0), () -> {
            Watch.Watcher newWatcher = this.mClient.getWatchClient().watch(ByteSequence.from(parentPath, StandardCharsets.UTF_8), watchOptBuilder.build(), new Watch.Listener(){

                @Override
                public void onNext(WatchResponse response) {
                    block4: for (WatchEvent event : response.getEvents()) {
                        switch (event.getEventType()) {
                            case PUT: {
                                listener.onNewPut(event.getKeyValue().getKey().toString(StandardCharsets.UTF_8), event.getKeyValue().getValue().getBytes());
                                continue block4;
                            }
                            case DELETE: {
                                listener.onNewDelete(event.getKeyValue().getKey().toString(StandardCharsets.UTF_8));
                                continue block4;
                            }
                        }
                        LOG.info("Unrecognized event:{} on watch path of:{}", (Object)event.getEventType(), (Object)parentPath);
                    }
                }

                @Override
                public void onError(Throwable throwable) {
                    LOG.warn("Error occurred on children watch for path:{}, removing the watch.", (Object)parentPath, (Object)throwable);
                    AlluxioEtcdClient.this.removeChildrenListener(parentPath);
                }

                @Override
                public void onCompleted() {
                    LOG.warn("Watch for path onCompleted:{}, removing the watch.", (Object)parentPath);
                    AlluxioEtcdClient.this.removeChildrenListener(parentPath);
                }
            });
            return newWatcher;
        });
        Watch.Watcher prevWatcher = this.mRegisteredWatchers.putIfAbsent(AlluxioEtcdClient.getRegisterWatcherKey(parentPath, watchType), watcher);
        if (prevWatcher != null) {
            watcher.close();
        }
    }

    private static String getRegisterWatcherKey(String path, WatchType type) {
        return path + "$$@@$$" + type.toString();
    }

    public void addStateListener(String path, StateListener listener) {
        this.addListenerInternal(path, listener, WatchType.SINGLE_PATH);
    }

    public void removeStateListener(String path) {
        this.removeListenerInternal(path, WatchType.SINGLE_PATH);
    }

    public void addChildrenListener(String parentPath, StateListener listener) {
        this.addListenerInternal(parentPath, listener, WatchType.CHILDREN);
    }

    public void removeChildrenListener(String parentPath) {
        this.removeListenerInternal(parentPath, WatchType.CHILDREN);
    }

    public byte[] getForPath(String path) {
        return this.retryInternal(String.format("Get for path:%s", path), new ExponentialBackoffRetry(500, 5000, 5), () -> {
            byte[] ret = null;
            CompletableFuture<GetResponse> getResponse = this.getEtcdClient().getKVClient().get(ByteSequence.from(path, StandardCharsets.UTF_8));
            List<KeyValue> kvs = getResponse.get(5L, TimeUnit.SECONDS).getKvs();
            if (!kvs.isEmpty()) {
                KeyValue latestKv = Collections.max(kvs, Comparator.comparing(KeyValue::getModRevision));
                return latestKv.getValue().getBytes();
            }
            return ret;
        });
    }

    public boolean checkExistsForPath(String path) {
        return this.retryInternal(String.format("Check exists for path:%s", path), new ExponentialBackoffRetry(500, 5000, 5), () -> {
            boolean exist = false;
            CompletableFuture<GetResponse> getResponse = this.getEtcdClient().getKVClient().get(ByteSequence.from(path, StandardCharsets.UTF_8));
            List<KeyValue> kvs = getResponse.get(5L, TimeUnit.SECONDS).getKvs();
            exist = !kvs.isEmpty();
            return exist;
        });
    }

    public void createForPath(String path, Optional<byte[]> value) {
        this.retryInternal(String.format("Create for path:%s, value bytes len:%s", path, !value.isPresent() ? "null" : Integer.valueOf(value.get().length)), new ExponentialBackoffRetry(500, 5000, 5), () -> {
            this.mClient.getKVClient().put(ByteSequence.from(path, StandardCharsets.UTF_8), ByteSequence.from((byte[])value.get())).get(5L, TimeUnit.SECONDS);
            return null;
        });
    }

    public void deleteForPath(String path, boolean recursive) {
        this.retryInternal(String.format("Delete for path:%s", path), new ExponentialBackoffRetry(500, 5000, 5), () -> {
            this.mClient.getKVClient().delete(ByteSequence.from(path, StandardCharsets.UTF_8), DeleteOption.newBuilder().isPrefix(recursive).build()).get(5L, TimeUnit.SECONDS);
            return null;
        });
    }

    public void removeListenerInternal(String path, WatchType watchType) {
        Watch.Watcher watcher = this.mRegisteredWatchers.remove(AlluxioEtcdClient.getRegisterWatcherKey(path, watchType));
        if (watcher == null) {
            return;
        }
        watcher.close();
    }

    public Client getEtcdClient() {
        return this.mClient;
    }

    public static void destroy() {
        LOG.debug("Destroying ETCD client {}", (Object)sAlluxioEtcdClient);
        try (LockResource lockResource = new LockResource(INSTANCE_LOCK);){
            if (sAlluxioEtcdClient != null) {
                AlluxioEtcdClient.sAlluxioEtcdClient.mServiceDiscovery.close();
                Client client = sAlluxioEtcdClient.getEtcdClient();
                if (client != null) {
                    client.close();
                }
                sAlluxioEtcdClient = null;
            }
        }
        LOG.debug("ETCD client destroyed");
    }

    public static class Lease {
        public long mLeaseId = -1L;
        public long mTtlInSec = -1L;

        public Lease(long leaseId, long ttlInSec) {
            this.mLeaseId = leaseId;
            this.mTtlInSec = ttlInSec;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("leaseId", this.mLeaseId).add("ttl", this.mTtlInSec).toString();
        }
    }

    static enum WatchType {
        CHILDREN,
        SINGLE_PATH;

    }

    @FunctionalInterface
    protected static interface EtcdUtilCallable<V> {
        public V call() throws Exception;
    }
}

