/*
 * Decompiled with CFR 0.152.
 */
package alluxio.membership;

import alluxio.exception.status.AlreadyExistsException;
import alluxio.exception.status.NotFoundException;
import alluxio.membership.AlluxioEtcdClient;
import alluxio.membership.ServiceEntity;
import alluxio.resource.LockResource;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.io.etcd.jetcd.ByteSequence;
import alluxio.shaded.client.io.etcd.jetcd.KeyValue;
import alluxio.shaded.client.io.etcd.jetcd.Txn;
import alluxio.shaded.client.io.etcd.jetcd.kv.TxnResponse;
import alluxio.shaded.client.io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import alluxio.shaded.client.io.etcd.jetcd.op.Cmp;
import alluxio.shaded.client.io.etcd.jetcd.op.CmpTarget;
import alluxio.shaded.client.io.etcd.jetcd.op.Op;
import alluxio.shaded.client.io.etcd.jetcd.options.GetOption;
import alluxio.shaded.client.io.etcd.jetcd.options.PutOption;
import alluxio.shaded.client.io.etcd.jetcd.support.CloseableClient;
import alluxio.shaded.client.io.grpc.stub.StreamObserver;
import alluxio.util.ThreadFactoryUtils;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceDiscoveryRecipe {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryRecipe.class);
    private static final String BASE_PATH = "/ServiceDiscovery";
    final AlluxioEtcdClient mAlluxioEtcdClient;
    private final ScheduledExecutorService mExecutor;
    private final String mClusterIdentifier;
    private final String mRegisterPathPrefix;
    private final ConcurrentHashMap<String, ServiceEntity> mRegisteredServices = new ConcurrentHashMap();

    public ServiceDiscoveryRecipe(AlluxioEtcdClient client, String clusterIdentifier) {
        this.mAlluxioEtcdClient = client;
        this.mClusterIdentifier = clusterIdentifier;
        this.mRegisterPathPrefix = String.format("%s%s%s", BASE_PATH, "/", this.mClusterIdentifier);
        this.mExecutor = Executors.newSingleThreadScheduledExecutor(ThreadFactoryUtils.build("service-discovery-checker", false));
        this.mExecutor.scheduleWithFixedDelay(this::checkAllForReconnect, 2L, 2L, TimeUnit.SECONDS);
    }

    private void newLeaseInternal(ServiceEntity service) throws IOException {
        try (LockResource lockResource = new LockResource(service.mLock);){
            if (service.mLease != null && !this.mAlluxioEtcdClient.isLeaseExpired(service.mLease)) {
                LOG.info("Lease attached with service:{} is not expired, bail from here.");
                return;
            }
            String path = service.mServiceEntityName;
            String fullPath = new StringBuffer().append(this.mRegisterPathPrefix).append("/").append(path).toString();
            try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
                long latestRevision;
                AlluxioEtcdClient.Lease lease = this.mAlluxioEtcdClient.createLease();
                Txn txn = this.mAlluxioEtcdClient.getEtcdClient().getKVClient().txn();
                ByteSequence keyToPut = ByteSequence.from(fullPath, StandardCharsets.UTF_8);
                DataOutputStream dos = new DataOutputStream(baos);
                service.serialize(dos);
                ByteSequence valToPut = ByteSequence.from(baos.toByteArray());
                CompletableFuture<TxnResponse> txnResponseFut = txn.If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.version(0L))).Then(Op.put(keyToPut, valToPut, PutOption.newBuilder().withLeaseId(lease.mLeaseId).build())).Then(Op.get(keyToPut, GetOption.DEFAULT)).Else(Op.get(keyToPut, GetOption.DEFAULT)).commit();
                TxnResponse txnResponse = txnResponseFut.get();
                ArrayList kvs = new ArrayList();
                txnResponse.getGetResponses().stream().map(r -> kvs.addAll(r.getKvs())).collect(Collectors.toList());
                if (!txnResponse.isSucceeded()) {
                    if (!kvs.isEmpty()) {
                        throw new AlreadyExistsException("Same service kv pair is there but attached lease is expired, this should not happen");
                    }
                    throw new IOException("Failed to new a lease for service:" + service.toString());
                }
                Preconditions.checkState(!kvs.isEmpty(), "No such service entry found.");
                service.mRevision = latestRevision = kvs.stream().mapToLong(kv -> kv.getModRevision()).max().getAsLong();
                service.mLease = lease;
                this.startHeartBeat(service);
            }
            catch (InterruptedException | ExecutionException ex) {
                throw new IOException("Exception in new-ing lease for service:" + service, ex);
            }
        }
    }

    public void registerAndStartSync(ServiceEntity service) throws IOException {
        LOG.info("registering service : {}", (Object)service);
        if (this.mRegisteredServices.containsKey(service.getServiceEntityName())) {
            throw new AlreadyExistsException("Service " + service.mServiceEntityName + " already registered.");
        }
        this.newLeaseInternal(service);
        ServiceEntity existEntity = this.mRegisteredServices.putIfAbsent(service.getServiceEntityName(), service);
        if (existEntity != null) {
            ServiceEntity entity = service;
            Throwable throwable = null;
            if (entity != null) {
                if (throwable != null) {
                    try {
                        entity.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    entity.close();
                }
            }
            throw new AlreadyExistsException("Service " + service.mServiceEntityName + " already registered.");
        }
    }

    public void unregisterService(String serviceIdentifier) throws IOException {
        ServiceEntity entity = this.mRegisteredServices.remove(serviceIdentifier);
        if (entity != null) {
            try (ServiceEntity service = entity;){
                LOG.info("Service unregistered:{}", (Object)service);
            }
        } else {
            LOG.info("Service already unregistered:{}", (Object)serviceIdentifier);
        }
    }

    public void unregisterAll() {
        for (Map.Entry<String, ServiceEntity> entry : this.mRegisteredServices.entrySet()) {
            try {
                this.unregisterService(entry.getKey());
            }
            catch (IOException ex) {
                LOG.error("Unregister all services failed unregistering for:{}.", (Object)entry.getKey(), (Object)ex);
            }
        }
    }

    public ByteBuffer getRegisteredServiceDetail(String serviceEntityName) throws IOException {
        String fullPath = new StringBuffer().append(this.mRegisterPathPrefix).append("/").append(serviceEntityName).toString();
        byte[] val = this.mAlluxioEtcdClient.getForPath(fullPath);
        return ByteBuffer.wrap(val);
    }

    public void updateService(ServiceEntity service) throws IOException {
        LOG.info("Updating service : {}", (Object)service);
        if (!this.mRegisteredServices.containsKey(service.mServiceEntityName)) {
            Preconditions.checkNotNull(service.mLease, "Service not attach with lease");
            throw new NoSuchElementException("Service " + service.mServiceEntityName + " not registered, please register first.");
        }
        String fullPath = new StringBuffer().append(this.mRegisterPathPrefix).append("/").append(service.mServiceEntityName).toString();
        try (LockResource lockResource = new LockResource(service.mLock);
             ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            long latestRevision;
            Txn txn = this.mAlluxioEtcdClient.getEtcdClient().getKVClient().txn();
            ByteSequence keyToPut = ByteSequence.from(fullPath, StandardCharsets.UTF_8);
            DataOutputStream dos = new DataOutputStream(baos);
            service.serialize(dos);
            ByteSequence valToPut = ByteSequence.from(baos.toByteArray());
            CompletableFuture<TxnResponse> txnResponseFut = txn.If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.modRevision(service.mRevision))).Then(Op.put(keyToPut, valToPut, PutOption.newBuilder().withLeaseId(service.mLease.mLeaseId).build())).Then(Op.get(keyToPut, GetOption.DEFAULT)).Else(Op.get(keyToPut, GetOption.DEFAULT)).commit();
            TxnResponse txnResponse = txnResponseFut.get();
            ArrayList kvs = new ArrayList();
            txnResponse.getGetResponses().stream().map(r -> kvs.addAll(r.getKvs())).collect(Collectors.toList());
            if (!txnResponse.isSucceeded()) {
                if (kvs.isEmpty()) {
                    throw new NotFoundException("Such service kv pair is not in etcd anymore.");
                }
                throw new IOException("Failed to update service:" + service.toString());
            }
            service.mRevision = latestRevision = kvs.stream().mapToLong(kv -> kv.getModRevision()).max().getAsLong();
            if (service.getKeepAliveClient() == null) {
                this.startHeartBeat(service);
            }
            this.mRegisteredServices.put(service.getServiceEntityName(), service);
        }
        catch (ExecutionException ex) {
            throw new IOException("ExecutionException in registering service:" + service, ex);
        }
        catch (InterruptedException ex) {
            LOG.info("InterruptedException caught, bail.");
        }
    }

    private void startHeartBeat(ServiceEntity service) {
        CloseableClient keepAliveClient = this.mAlluxioEtcdClient.getEtcdClient().getLeaseClient().keepAlive(service.mLease.mLeaseId, new RetryKeepAliveObserver(service));
        service.setKeepAliveClient(keepAliveClient);
    }

    public Map<String, ByteBuffer> getAllLiveServices() throws IOException {
        HashMap<String, ByteBuffer> ret = new HashMap<String, ByteBuffer>();
        List<KeyValue> children = this.mAlluxioEtcdClient.getChildren(this.mRegisterPathPrefix);
        for (KeyValue kv : children) {
            ret.put(kv.getKey().toString(StandardCharsets.UTF_8), ByteBuffer.wrap(kv.getValue().getBytes()));
        }
        return ret;
    }

    private void checkAllForReconnect() {
        for (Map.Entry<String, ServiceEntity> entry : this.mRegisteredServices.entrySet()) {
            ServiceEntity entity = entry.getValue();
            LockResource lockResource = new LockResource(entry.getValue().mLock);
            Throwable throwable = null;
            try {
                if (!entity.mNeedReconnect.get()) continue;
                try {
                    LOG.info("Start reconnect for service:{}", (Object)entity.getServiceEntityName());
                    this.newLeaseInternal(entity);
                    entity.mNeedReconnect.set(false);
                }
                catch (IOException e) {
                    LOG.info("Failed trying to new the lease for service:{}", (Object)entity, (Object)e);
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (lockResource == null) continue;
                if (throwable != null) {
                    try {
                        lockResource.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                lockResource.close();
            }
        }
    }

    class RetryKeepAliveObserver
    implements StreamObserver<LeaseKeepAliveResponse> {
        public ServiceEntity mService;

        public RetryKeepAliveObserver(ServiceEntity service) {
            this.mService = service;
        }

        @Override
        public void onNext(LeaseKeepAliveResponse value) {
            LOG.debug("onNext keepalive response:id:{}:ttl:{}", (Object)value.getID(), (Object)value.getTTL());
        }

        @Override
        public void onError(Throwable t) {
            LOG.error("onError for Lease for service:{}, leaseId:{}. Setting status to reconnect", new Object[]{this.mService, this.mService.mLease.mLeaseId, t});
            this.mService.mNeedReconnect.compareAndSet(false, true);
        }

        @Override
        public void onCompleted() {
            LOG.warn("onCompleted for Lease for service:{}, leaseId:{}. Setting status to reconnect", (Object)this.mService, (Object)this.mService.mLease.mLeaseId);
            this.mService.mNeedReconnect.compareAndSet(false, true);
        }
    }
}

