/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup.partitioner;

import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;

public class BucketIdExtractor
implements Serializable {
    private final int numBuckets;
    private final TableSchema tableSchema;
    private final List<String> joinKeyFieldNames;
    private final List<String> bucketKeyFieldNames;
    private Projection bucketKeyProjection;

    public BucketIdExtractor(int numBuckets, TableSchema tableSchema, List<String> joinKeyFieldNames, List<String> bucketKeyFieldNames) {
        Preconditions.checkState(new HashSet<String>(joinKeyFieldNames).containsAll(bucketKeyFieldNames), "The join keys must contain all bucket keys.");
        Preconditions.checkState(numBuckets > 0, "Number of buckets should be positive.");
        this.numBuckets = numBuckets;
        this.joinKeyFieldNames = joinKeyFieldNames;
        this.bucketKeyFieldNames = bucketKeyFieldNames;
        this.tableSchema = tableSchema;
    }

    public int extractBucketId(RowData joinKeyRow) {
        FlinkRowWrapper internalRow;
        BinaryRow bucketKey;
        int bucket;
        Preconditions.checkState(joinKeyRow.getArity() == this.joinKeyFieldNames.size());
        if (this.bucketKeyProjection == null) {
            this.bucketKeyProjection = this.generateBucketKeyProjection();
        }
        Preconditions.checkState((bucket = KeyAndBucketExtractor.bucket(KeyAndBucketExtractor.bucketKeyHashCode(bucketKey = this.bucketKeyProjection.apply(internalRow = new FlinkRowWrapper(joinKeyRow))), this.numBuckets)) < this.numBuckets);
        return bucket;
    }

    private Projection generateBucketKeyProjection() {
        int[] bucketKeyIndexes = this.bucketKeyFieldNames.stream().mapToInt(this.joinKeyFieldNames::indexOf).toArray();
        List<DataField> joinKeyDataFields = this.joinKeyFieldNames.stream().map(joinKeyFieldName -> this.tableSchema.fields().get(this.tableSchema.fieldNames().indexOf(joinKeyFieldName))).collect(Collectors.toList());
        return CodeGenUtils.newProjection(new RowType(joinKeyDataFields), bucketKeyIndexes);
    }
}

