/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.service.server;

import java.util.concurrent.CompletableFuture;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.service.exceptions.UnknownPartitionBucketException;
import org.apache.paimon.service.messages.KvRequest;
import org.apache.paimon.service.messages.KvResponse;
import org.apache.paimon.service.network.AbstractServerHandler;
import org.apache.paimon.service.network.messages.MessageSerializer;
import org.apache.paimon.service.network.stats.ServiceRequestStats;
import org.apache.paimon.service.server.KvQueryServer;
import org.apache.paimon.shade.netty4.io.netty.channel.ChannelHandler;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.utils.ExceptionUtils;
import org.apache.paimon.utils.Preconditions;

@ChannelHandler.Sharable
public class KvServerHandler
extends AbstractServerHandler<KvRequest, KvResponse> {
    private final int serverId;
    private final int numServers;
    private final TableQuery lookup;
    private final InternalRowSerializer valueSerializer;

    public KvServerHandler(KvQueryServer server, int serverId, int numServers, TableQuery lookup, MessageSerializer<KvRequest, KvResponse> serializer, ServiceRequestStats stats) {
        super(server, serializer, stats);
        this.serverId = serverId;
        this.numServers = numServers;
        this.lookup = Preconditions.checkNotNull(lookup);
        this.valueSerializer = lookup.createValueSerializer();
    }

    @Override
    public CompletableFuture<KvResponse> handleRequest(long requestId, KvRequest request) {
        CompletableFuture<KvResponse> responseFuture = new CompletableFuture<KvResponse>();
        int selectServerId = ChannelComputer.select(request.partition(), request.bucket(), this.numServers);
        if (selectServerId != this.serverId) {
            responseFuture.completeExceptionally(new UnknownPartitionBucketException(this.getServerName()));
            return responseFuture;
        }
        try {
            BinaryRow[] keys2 = request.keys();
            BinaryRow[] values2 = new BinaryRow[keys2.length];
            for (int i = 0; i < values2.length; ++i) {
                InternalRow value = this.lookup.lookup(request.partition(), request.bucket(), keys2[i]);
                if (value == null) continue;
                values2[i] = this.valueSerializer.toBinaryRow(value).copy();
            }
            responseFuture.complete(new KvResponse(values2));
            return responseFuture;
        }
        catch (Throwable t) {
            String errMsg = "Error while processing request with ID " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
            responseFuture.completeExceptionally(new RuntimeException(errMsg));
            return responseFuture;
        }
    }

    @Override
    public CompletableFuture<Void> shutdown() {
        return CompletableFuture.completedFuture(null);
    }
}

