/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.config.etcd.internal.client.v2;

import io.helidon.common.reactive.Flow;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.etcd.internal.client.EtcdClient;
import io.helidon.config.etcd.internal.client.EtcdClientException;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import mousio.client.promises.ResponsePromise;
import mousio.client.retry.RetryPolicy;
import mousio.client.retry.RetryWithTimeout;
import mousio.etcd4j.responses.EtcdAuthenticationException;
import mousio.etcd4j.responses.EtcdException;
import mousio.etcd4j.responses.EtcdKeysResponse;

public class EtcdV2Client
implements EtcdClient {
    private static final Logger LOGGER = Logger.getLogger(EtcdV2Client.class.getName());
    private final Map<String, SubmissionPublisher<Long>> publishers = new ConcurrentHashMap<String, SubmissionPublisher<Long>>();
    private final mousio.etcd4j.EtcdClient etcd;

    EtcdV2Client(URI uri) {
        this.etcd = new mousio.etcd4j.EtcdClient(new URI[]{uri});
        this.etcd.setRetryHandler((RetryPolicy)new RetryWithTimeout(100, 2000));
    }

    @Override
    public Long revision(String key) throws EtcdClientException {
        try {
            return ((EtcdKeysResponse)this.etcd.get((String)key).send().get()).getNode().modifiedIndex;
        }
        catch (IOException | TimeoutException | EtcdAuthenticationException e) {
            throw new EtcdClientException("Cannot retrieve modifiedIndex for key " + key);
        }
        catch (EtcdException e) {
            if (e.errorCode == 100) {
                return null;
            }
            throw new EtcdClientException("Cannot retrieve modifiedIndex for key " + key);
        }
    }

    @Override
    public String get(String key) throws EtcdClientException {
        try {
            return ((EtcdKeysResponse)this.etcd.get(key).send().get()).getNode().getValue();
        }
        catch (IOException | TimeoutException | EtcdAuthenticationException e) {
            throw new EtcdClientException("Cannot retrieve key " + key);
        }
        catch (EtcdException e) {
            if (e.errorCode == 100) {
                return null;
            }
            throw new EtcdClientException("Cannot retrieve key " + key);
        }
    }

    @Override
    public void put(String key, String value) throws EtcdClientException {
        try {
            this.etcd.put(key, value).timeout(1L, TimeUnit.SECONDS).send().get();
        }
        catch (IOException | TimeoutException | EtcdAuthenticationException | EtcdException e) {
            throw new EtcdClientException("Cannot put KV pair under " + key, e);
        }
    }

    @Override
    public Flow.Publisher<Long> watch(String key, Executor executor) throws EtcdClientException {
        SubmissionPublisher publisher = this.publishers.computeIfAbsent(key, k -> new SubmissionPublisher(executor, Flow.defaultBufferSize()));
        WatchResponseHandler handler = new WatchResponseHandler(publisher, this.etcd, key);
        handler.waitForChange();
        return publisher;
    }

    @Override
    public Flow.Publisher<Long> watch(String key) throws EtcdClientException {
        return this.watch(key, Runnable::run);
    }

    @Override
    public void close() throws EtcdClientException {
        this.publishers.values().forEach(SubmissionPublisher::close);
        try {
            this.etcd.close();
        }
        catch (IOException ex) {
            throw new EtcdClientException("Error closing gRPC channel, reason: " + ex.getLocalizedMessage(), ex);
        }
    }

    private static class WatchResponseHandler
    implements ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse> {
        private final SubmissionPublisher<Long> publisher;
        private final mousio.etcd4j.EtcdClient etcd;
        private final String key;

        private WatchResponseHandler(SubmissionPublisher<Long> publisher, mousio.etcd4j.EtcdClient etcd, String key) {
            this.publisher = publisher;
            this.etcd = etcd;
            this.key = key;
        }

        public void onResponse(ResponsePromise<EtcdKeysResponse> responsePromise) {
            try {
                long modifiedIndex = ((EtcdKeysResponse)responsePromise.get()).getNode().getModifiedIndex();
                this.publisher.submit((Object)modifiedIndex);
                this.waitForChange(modifiedIndex + 1L);
            }
            catch (Exception e) {
                LOGGER.log(Level.CONFIG, "Cannot read changed value.", e);
            }
        }

        private void waitForChange() throws EtcdClientException {
            try {
                this.etcd.get(this.key).waitForChange().send().addListener((ResponsePromise.IsSimplePromiseResponseHandler)this);
            }
            catch (IOException e) {
                throw new EtcdClientException("Cannot register listener on key " + this.key, e);
            }
        }

        private void waitForChange(long waitIndex) throws EtcdClientException {
            try {
                this.etcd.get(this.key).waitForChange(waitIndex).send().addListener((ResponsePromise.IsSimplePromiseResponseHandler)this);
            }
            catch (IOException e) {
                throw new EtcdClientException("Cannot register listener on key " + this.key + " and index " + waitIndex + ".", e);
            }
        }
    }
}

