/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.query;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.query.QueryLocationImpl;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.service.client.KvQueryClient;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.TypeUtils;

public class RemoteTableQuery
implements TableQuery {
    private final FileStoreTable table;
    private final KvQueryClient client;
    private final InternalRowSerializer keySerializer;
    @Nullable
    private int[] projection;

    public RemoteTableQuery(Table table) {
        this.table = (FileStoreTable)table;
        ServiceManager manager = this.table.store().newServiceManager();
        this.client = new KvQueryClient(new QueryLocationImpl(manager), 1);
        this.keySerializer = InternalSerializers.create(TypeUtils.project(table.rowType(), table.primaryKeys()));
    }

    public static boolean isRemoteServiceAvailable(FileStoreTable table) {
        return table.store().newServiceManager().service("primary-key-lookup").isPresent();
    }

    @Override
    @Nullable
    public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {
        BinaryRow row;
        try {
            row = this.client.getValues(partition, bucket, new BinaryRow[]{this.keySerializer.toBinaryRow(key)}).get()[0];
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
        catch (ExecutionException e) {
            throw new IOException(e.getCause());
        }
        if (this.projection == null) {
            return row;
        }
        if (row == null) {
            return null;
        }
        return ProjectedRow.from(this.projection).replaceRow(row);
    }

    @Override
    public RemoteTableQuery withValueProjection(int[] projection) {
        return this.withValueProjection(Projection.of(projection).toNestedIndexes());
    }

    @Override
    public RemoteTableQuery withValueProjection(int[][] projection) {
        this.projection = Projection.of(projection).toTopLevelIndexes();
        return this;
    }

    @Override
    public InternalRowSerializer createValueSerializer() {
        return InternalSerializers.create(TypeUtils.project(this.table.rowType(), this.projection));
    }

    @Override
    public void close() throws IOException {
        this.client.shutdown();
    }
}

