/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.config.etcd.internal.client.v3;

import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.helidon.common.reactive.Flow;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.etcd.internal.client.EtcdClient;
import io.helidon.config.etcd.internal.client.EtcdClientException;
import io.helidon.config.etcd.internal.client.proto.KVGrpc;
import io.helidon.config.etcd.internal.client.proto.PutRequest;
import io.helidon.config.etcd.internal.client.proto.RangeRequest;
import io.helidon.config.etcd.internal.client.proto.RangeResponse;
import io.helidon.config.etcd.internal.client.proto.WatchCreateRequest;
import io.helidon.config.etcd.internal.client.proto.WatchGrpc;
import io.helidon.config.etcd.internal.client.proto.WatchRequest;
import io.helidon.config.etcd.internal.client.proto.WatchResponse;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class EtcdV3Client
implements EtcdClient {
    private static final Logger LOGGER = Logger.getLogger(EtcdV3Client.class.getName());
    private final Map<String, SubmissionPublisher<Long>> publishers = new ConcurrentHashMap<String, SubmissionPublisher<Long>>();
    private final ManagedChannel channel;
    private final KVGrpc.KVBlockingStub kvStub;
    private final WatchGrpc.WatchStub watchStub;

    public EtcdV3Client(URI uri) {
        ManagedChannelBuilder mcb = ManagedChannelBuilder.forAddress((String)uri.getHost(), (int)uri.getPort());
        this.channel = mcb.usePlaintext(true).build();
        this.kvStub = KVGrpc.newBlockingStub((Channel)this.channel);
        this.watchStub = WatchGrpc.newStub((Channel)this.channel);
    }

    @Override
    public Long revision(String key) throws EtcdClientException {
        RangeRequest.Builder builder = RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8((String)key));
        try {
            RangeResponse rangeResponse = this.kvStub.range(builder.build());
            return rangeResponse.getCount() == 0L ? null : Long.valueOf(rangeResponse.getKvs(0).getModRevision());
        }
        catch (StatusRuntimeException e) {
            throw new EtcdClientException("Cannot retrieve a value for the key: " + key, e);
        }
    }

    @Override
    public String get(String key) throws EtcdClientException {
        RangeRequest.Builder builder = RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8((String)key));
        try {
            RangeResponse rangeResponse = this.kvStub.range(builder.build());
            return rangeResponse.getCount() == 0L ? null : rangeResponse.getKvs(0).getValue().toStringUtf8();
        }
        catch (StatusRuntimeException e) {
            throw new EtcdClientException("Cannot retrieve a value for the key: " + key, e);
        }
    }

    @Override
    public void put(String key, String value) throws EtcdClientException {
        PutRequest.Builder builder = PutRequest.newBuilder().setKey(ByteString.copyFromUtf8((String)key)).setValue(ByteString.copyFromUtf8((String)value));
        this.kvStub.put(builder.build());
    }

    @Override
    public Flow.Publisher<Long> watch(String key, Executor executor) throws EtcdClientException {
        final SubmissionPublisher publisher = this.publishers.computeIfAbsent(key, k -> new SubmissionPublisher(executor, Flow.defaultBufferSize()));
        StreamObserver<WatchResponse> responseObserver = new StreamObserver<WatchResponse>(){

            public void onNext(WatchResponse value) {
                value.getEventsList().forEach(e -> publisher.submit((Object)e.getKv().getVersion()));
            }

            public void onError(Throwable t) {
                publisher.closeExceptionally(t);
            }

            public void onCompleted() {
            }
        };
        WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder().setKey(ByteString.copyFromUtf8((String)key));
        WatchRequest watchRequest = WatchRequest.newBuilder().setCreateRequest(builder).build();
        StreamObserver<WatchRequest> requestObserver = this.watchStub.watch(responseObserver);
        requestObserver.onNext((Object)watchRequest);
        return publisher;
    }

    @Override
    public Flow.Publisher<Long> watch(String key) throws EtcdClientException {
        return this.watch(key, Runnable::run);
    }

    @Override
    public void close() throws EtcdClientException {
        this.publishers.values().forEach(SubmissionPublisher::close);
        if (!this.channel.isShutdown() && !this.channel.isTerminated()) {
            try {
                this.channel.awaitTermination(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                LOGGER.log(Level.CONFIG, "Error closing gRPC channel, reason: " + e.getLocalizedMessage(), e);
            }
            finally {
                this.channel.shutdown();
            }
        }
    }
}

