/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.service.client;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.query.QueryLocation;
import org.apache.paimon.service.exceptions.UnknownPartitionBucketException;
import org.apache.paimon.service.messages.KvRequest;
import org.apache.paimon.service.messages.KvResponse;
import org.apache.paimon.service.network.NetworkClient;
import org.apache.paimon.service.network.messages.MessageSerializer;
import org.apache.paimon.service.network.stats.DisabledServiceRequestStats;
import org.apache.paimon.utils.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KvQueryClient {
    private static final Logger LOG = LoggerFactory.getLogger(KvQueryClient.class);
    private final NetworkClient<KvRequest, KvResponse> networkClient;
    private final QueryLocation queryLocation;

    public KvQueryClient(QueryLocation queryLocation, int numEventLoopThreads) {
        this.queryLocation = queryLocation;
        MessageSerializer<KvRequest, KvResponse> messageSerializer = new MessageSerializer<KvRequest, KvResponse>(new KvRequest.KvRequestDeserializer(), new KvResponse.KvResponseDeserializer());
        this.networkClient = new NetworkClient<KvRequest, KvResponse>("Kv Query Client", numEventLoopThreads, messageSerializer, new DisabledServiceRequestStats());
    }

    public CompletableFuture<BinaryRow[]> getValues(BinaryRow partition, int bucket, BinaryRow[] keys) {
        CompletableFuture<BinaryRow[]> response = new CompletableFuture<BinaryRow[]>();
        this.executeActionAsync(response, new KvRequest(partition, bucket, keys), false);
        return response;
    }

    private void executeActionAsync(CompletableFuture<BinaryRow[]> result, KvRequest request, boolean update) {
        if (!result.isDone()) {
            CompletableFuture<KvResponse> operationFuture = this.getResponse(request, update);
            operationFuture.whenCompleteAsync((t, throwable) -> {
                if (throwable != null) {
                    if (throwable instanceof UnknownPartitionBucketException || throwable.getCause() instanceof ConnectException) {
                        LOG.debug("Retrying after failing to retrieve state due to: {}.", (Object)throwable.getMessage());
                        this.executeActionAsync(result, request, true);
                    } else {
                        result.completeExceptionally((Throwable)throwable);
                    }
                } else {
                    result.complete(t.values());
                }
            });
            result.whenComplete((t, throwable) -> operationFuture.cancel(false));
        }
    }

    private CompletableFuture<KvResponse> getResponse(KvRequest request, boolean forceUpdate) {
        InetSocketAddress serverAddress = this.queryLocation.getLocation(request.partition(), request.bucket(), forceUpdate);
        if (serverAddress == null) {
            return FutureUtils.completedExceptionally((Throwable)new RuntimeException("Cannot find address for bucket: " + request.bucket()));
        }
        return this.networkClient.sendRequest(serverAddress, request);
    }

    public void shutdown() {
        try {
            this.shutdownFuture().get(60L, TimeUnit.SECONDS);
            LOG.info("{} was shutdown successfully.", (Object)this.networkClient.getClientName());
        }
        catch (Exception e) {
            LOG.warn(String.format("%s shutdown failed.", this.networkClient.getClientName()), (Throwable)e);
        }
    }

    public CompletableFuture<Void> shutdownFuture() {
        return this.networkClient.shutdown();
    }
}

