/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.qdrant.source;

import io.qdrant.client.QdrantClient;
import io.qdrant.client.WithPayloadSelectorFactory;
import io.qdrant.client.WithVectorsSelectorFactory;
import io.qdrant.client.grpc.JsonWithInt;
import io.qdrant.client.grpc.Points;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.BufferUtils;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters;
import org.apache.seatunnel.connectors.seatunnel.qdrant.exception.QdrantConnectorException;

public class QdrantSourceReader
extends AbstractSingleSplitReader<SeaTunnelRow> {
    private final QdrantParameters qdrantParameters;
    private final SingleSplitReaderContext context;
    private final TableSchema tableSchema;
    private final TablePath tablePath;
    private QdrantClient qdrantClient;

    public QdrantSourceReader(QdrantParameters qdrantParameters, SingleSplitReaderContext context, CatalogTable catalogTable) {
        this.qdrantParameters = qdrantParameters;
        this.context = context;
        this.tableSchema = catalogTable.getTableSchema();
        this.tablePath = catalogTable.getTablePath();
    }

    public void open() throws Exception {
        this.qdrantClient = this.qdrantParameters.buildQdrantClient();
        this.qdrantClient.healthCheckAsync().get();
    }

    public void close() {
        if (Objects.nonNull(this.qdrantClient)) {
            this.qdrantClient.close();
        }
    }

    @Override
    public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception {
        int SCROLL_SIZE = 64;
        Points.ScrollPoints request = Points.ScrollPoints.newBuilder().setCollectionName(this.qdrantParameters.getCollectionName()).setLimit(SCROLL_SIZE).setWithPayload(WithPayloadSelectorFactory.enable(true)).setWithVectors(WithVectorsSelectorFactory.enable(true)).build();
        while (true) {
            Points.ScrollResponse response = (Points.ScrollResponse)this.qdrantClient.scrollAsync(request).get();
            List<Points.RetrievedPoint> points = response.getResultList();
            for (Points.RetrievedPoint point : points) {
                SeaTunnelRow seaTunnelRow = this.convertToSeaTunnelRow(point);
                output.collect((Object)seaTunnelRow);
            }
            Points.PointId offset = response.getNextPageOffset();
            if (!offset.hasNum() && !offset.hasUuid()) break;
            request = request.toBuilder().setOffset(offset).build();
        }
        this.context.signalNoMoreElement();
    }

    private SeaTunnelRow convertToSeaTunnelRow(Points.RetrievedPoint point) {
        SeaTunnelRowType typeInfo = this.tableSchema.toPhysicalRowDataType();
        PrimaryKey primaryKey = this.tableSchema.getPrimaryKey();
        Map<String, JsonWithInt.Value> payloadMap = point.getPayloadMap();
        Points.Vectors vectors = point.getVectors();
        Map<Object, Object> vectorsMap = new HashMap<String, Points.Vector>();
        String DEFAULT_VECTOR_KEY = "default_vector";
        if (vectors.hasVector()) {
            vectorsMap.put(DEFAULT_VECTOR_KEY, vectors.getVector());
        } else if (vectors.hasVectors()) {
            vectorsMap = vectors.getVectors().getVectorsMap();
        }
        Object[] fields = new Object[typeInfo.getTotalFields()];
        String[] fieldNames = typeInfo.getFieldNames();
        block8: for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); ++fieldIndex) {
            SeaTunnelDataType seaTunnelDataType = typeInfo.getFieldType(fieldIndex);
            String fieldName = fieldNames[fieldIndex];
            if (PrimaryKey.isPrimaryKeyField((PrimaryKey)primaryKey, (String)fieldName)) {
                Points.PointId id = point.getId();
                if (id.hasNum()) {
                    fields[fieldIndex] = id.getNum();
                    continue;
                }
                if (!id.hasUuid()) continue;
                fields[fieldIndex] = id.getUuid();
                continue;
            }
            JsonWithInt.Value value = payloadMap.get(fieldName);
            Points.Vector vector = (Points.Vector)vectorsMap.get(fieldName);
            switch (seaTunnelDataType.getSqlType()) {
                case NULL: {
                    fields[fieldIndex] = null;
                    continue block8;
                }
                case STRING: {
                    fields[fieldIndex] = value.getStringValue();
                    continue block8;
                }
                case BOOLEAN: {
                    fields[fieldIndex] = value.getBoolValue();
                    continue block8;
                }
                case TINYINT: 
                case SMALLINT: 
                case INT: 
                case BIGINT: {
                    fields[fieldIndex] = value.getIntegerValue();
                    continue block8;
                }
                case FLOAT: 
                case DECIMAL: 
                case DOUBLE: {
                    fields[fieldIndex] = value.getDoubleValue();
                    continue block8;
                }
                case BINARY_VECTOR: 
                case FLOAT_VECTOR: 
                case FLOAT16_VECTOR: 
                case BFLOAT16_VECTOR: {
                    List<Float> list = vector.getDataList();
                    Float[] vectorArray = new Float[list.size()];
                    list.toArray(vectorArray);
                    fields[fieldIndex] = BufferUtils.toByteBuffer((Float[])vectorArray);
                    continue block8;
                }
                default: {
                    throw new QdrantConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unexpected value: " + seaTunnelDataType.getSqlType().name());
                }
            }
        }
        SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
        seaTunnelRow.setTableId(this.tablePath.getFullName());
        seaTunnelRow.setRowKind(RowKind.INSERT);
        return seaTunnelRow;
    }
}

