/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.lookup.LookupFileStoreTable;
import org.apache.paimon.flink.lookup.LookupTable;
import org.apache.paimon.flink.query.RemoteTableQuery;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ProjectedRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrimaryKeyPartialLookupTable
implements LookupTable {
    private final QueryExecutorFactory executorFactory;
    @Nullable
    private final ProjectedRow keyRearrange;
    @Nullable
    private final ProjectedRow trimmedKeyRearrange;
    private Predicate specificPartition;
    @Nullable
    private Filter<InternalRow> cacheRowFilter;
    private QueryExecutor queryExecutor;
    private final Projection partitionFromPk;
    private final Projection bucketKeyFromPk;

    private PrimaryKeyPartialLookupTable(QueryExecutorFactory executorFactory, FileStoreTable table, List<String> joinKey) {
        this.executorFactory = executorFactory;
        BucketMode bucketMode = table.bucketMode();
        if (bucketMode != BucketMode.HASH_FIXED && bucketMode != BucketMode.POSTPONE_MODE) {
            throw new UnsupportedOperationException("Unsupported mode for partial lookup: " + (Object)((Object)bucketMode));
        }
        CoreOptions coreOptions = CoreOptions.fromMap(table.options());
        if (!coreOptions.needLookup() && coreOptions.mergeEngine() != CoreOptions.MergeEngine.DEDUPLICATE) {
            throw new UnsupportedOperationException("Only support deduplicate merge engine when table does not need lookup, but merge engine is:  " + coreOptions.mergeEngine());
        }
        if (coreOptions.mergeEngine() == CoreOptions.MergeEngine.DEDUPLICATE && !coreOptions.sequenceField().isEmpty()) {
            throw new UnsupportedOperationException("Unsupported sequence fields definition for partial lookup when use deduplicate merge engine, but sequence fields are:  " + coreOptions.sequenceField());
        }
        TableSchema schema = table.schema();
        this.partitionFromPk = CodeGenUtils.newProjection(schema.logicalPrimaryKeysType(), schema.partitionKeys().stream().mapToInt(schema.primaryKeys()::indexOf).toArray());
        this.bucketKeyFromPk = CodeGenUtils.newProjection(schema.logicalPrimaryKeysType(), schema.bucketKeys().stream().mapToInt(schema.primaryKeys()::indexOf).toArray());
        ProjectedRow keyRearrange = null;
        if (!table.primaryKeys().equals(joinKey)) {
            keyRearrange = ProjectedRow.from(table.primaryKeys().stream().map(joinKey::indexOf).mapToInt(value -> value).toArray());
        }
        this.keyRearrange = keyRearrange;
        List<String> trimmedPrimaryKeys = schema.trimmedPrimaryKeys();
        ProjectedRow trimmedKeyRearrange = null;
        if (!trimmedPrimaryKeys.equals(joinKey)) {
            trimmedKeyRearrange = ProjectedRow.from(trimmedPrimaryKeys.stream().map(joinKey::indexOf).mapToInt(value -> value).toArray());
        }
        this.trimmedKeyRearrange = trimmedKeyRearrange;
    }

    @VisibleForTesting
    QueryExecutor queryExecutor() {
        return this.queryExecutor;
    }

    @Override
    public void specificPartitionFilter(Predicate filter) {
        this.specificPartition = filter;
    }

    @Override
    public void open() throws Exception {
        this.queryExecutor = this.executorFactory.create(this.specificPartition, this.cacheRowFilter);
        this.refresh();
    }

    @Override
    public List<InternalRow> get(InternalRow key) throws IOException {
        InternalRow kv;
        BinaryRow partition;
        Integer numBuckets;
        InternalRow adjustedKey = key;
        if (this.keyRearrange != null) {
            adjustedKey = this.keyRearrange.replaceRow(adjustedKey);
        }
        if ((numBuckets = this.queryExecutor.numBuckets(partition = this.partitionFromPk.apply(adjustedKey))) == null) {
            return Collections.emptyList();
        }
        int bucket = this.bucket(numBuckets, adjustedKey);
        InternalRow trimmedKey = key;
        if (this.trimmedKeyRearrange != null) {
            trimmedKey = this.trimmedKeyRearrange.replaceRow(trimmedKey);
        }
        if ((kv = this.queryExecutor.lookup(partition, bucket, trimmedKey)) == null) {
            return Collections.emptyList();
        }
        return Collections.singletonList(kv);
    }

    private int bucket(int numBuckets, InternalRow primaryKey) {
        BinaryRow bucketKey = this.bucketKeyFromPk.apply(primaryKey);
        return KeyAndBucketExtractor.bucket(KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), numBuckets);
    }

    @Override
    public void refresh() {
        this.queryExecutor.refresh();
    }

    @Override
    public void specifyCacheRowFilter(Filter<InternalRow> filter) {
        this.cacheRowFilter = filter;
    }

    @Override
    public void close() throws IOException {
        if (this.queryExecutor != null) {
            this.queryExecutor.close();
        }
    }

    public static PrimaryKeyPartialLookupTable createLocalTable(FileStoreTable table, int[] projection, File tempPath, List<String> joinKey, Set<Integer> requireCachedBucketIds) {
        return new PrimaryKeyPartialLookupTable((filter, cacheRowFilter) -> new LocalQueryExecutor(new LookupFileStoreTable(table, joinKey), projection, tempPath, filter, requireCachedBucketIds, cacheRowFilter), table, joinKey);
    }

    public static PrimaryKeyPartialLookupTable createRemoteTable(FileStoreTable table, int[] projection, List<String> joinKey) {
        return new PrimaryKeyPartialLookupTable((filter, cacheRowFilter) -> new RemoteQueryExecutor(table, projection), table, joinKey);
    }

    static class RemoteQueryExecutor
    implements QueryExecutor {
        private final RemoteTableQuery tableQuery;
        private final Integer numBuckets;

        private RemoteQueryExecutor(FileStoreTable table, int[] projection) {
            this.tableQuery = new RemoteTableQuery(table).withValueProjection(projection);
            int numBuckets = table.bucketSpec().getNumBuckets();
            if (numBuckets == -2) {
                throw new UnsupportedOperationException("Remote query does not support POSTPONE_BUCKET.");
            }
            this.numBuckets = numBuckets;
        }

        @Override
        @Nullable
        public Integer numBuckets(BinaryRow partition) {
            return this.numBuckets;
        }

        @Override
        public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {
            return this.tableQuery.lookup(partition, bucket, key);
        }

        @Override
        public void refresh() {
        }

        @Override
        public void close() throws IOException {
            this.tableQuery.close();
        }
    }

    static class LocalQueryExecutor
    implements QueryExecutor {
        private static final Logger LOG = LoggerFactory.getLogger(LocalQueryExecutor.class);
        private final LocalTableQuery tableQuery;
        private final StreamTableScan scan;
        private final String tableName;
        private final Integer defaultNumBuckets;
        private final Map<BinaryRow, Integer> numBuckets;

        private LocalQueryExecutor(FileStoreTable table, int[] projection, File tempPath, @Nullable Predicate filter, Set<Integer> requireCachedBucketIds, @Nullable Filter<InternalRow> cacheRowFilter) {
            this.tableQuery = table.newLocalTableQuery().withValueProjection(projection).withIOManager(new IOManagerImpl(tempPath.toString()));
            if (cacheRowFilter != null) {
                this.tableQuery.withCacheRowFilter(cacheRowFilter);
            }
            this.scan = table.newReadBuilder().dropStats().withFilter(filter).withBucketFilter(requireCachedBucketIds == null ? null : requireCachedBucketIds::contains).newStreamScan();
            this.tableName = table.name();
            this.defaultNumBuckets = table.bucketSpec().getNumBuckets();
            this.numBuckets = new HashMap<BinaryRow, Integer>();
        }

        @Override
        @Nullable
        public Integer numBuckets(BinaryRow partition) {
            return this.numBuckets.get(partition);
        }

        @Override
        public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {
            return this.tableQuery.lookup(partition, bucket, key);
        }

        @Override
        public void refresh() {
            block0: while (true) {
                List<Split> splits = this.scan.plan().splits();
                this.log(splits);
                if (splits.isEmpty()) {
                    return;
                }
                Iterator<Split> iterator2 = splits.iterator();
                while (true) {
                    if (!iterator2.hasNext()) continue block0;
                    Split split = iterator2.next();
                    this.refreshSplit((DataSplit)split);
                }
                break;
            }
        }

        @VisibleForTesting
        void refreshSplit(DataSplit split) {
            BinaryRow partition = split.partition();
            int bucket = split.bucket();
            List<DataFileMeta> before = split.beforeFiles();
            List<DataFileMeta> after = split.dataFiles();
            this.tableQuery.refreshFiles(partition, bucket, before, after);
            Integer totalBuckets = split.totalBuckets();
            if (totalBuckets == null) {
                Preconditions.checkArgument(this.defaultNumBuckets > 0, "This is a bug, old version table numBuckets should be greater than 0.");
                totalBuckets = this.defaultNumBuckets;
            }
            this.numBuckets.put(partition, totalBuckets);
        }

        @Override
        public void close() throws IOException {
            this.tableQuery.close();
        }

        private void log(List<Split> splits) {
            if (splits.isEmpty()) {
                LOG.info("LocalQueryExecutor didn't get splits from {}.", (Object)this.tableName);
                return;
            }
            DataSplit dataSplit = (DataSplit)splits.get(0);
            LOG.info("LocalQueryExecutor get splits from {} with snapshotId {}.", (Object)this.tableName, (Object)dataSplit.snapshotId());
        }
    }

    static interface QueryExecutor
    extends Closeable {
        @Nullable
        public Integer numBuckets(BinaryRow var1);

        public InternalRow lookup(BinaryRow var1, int var2, InternalRow var3) throws IOException;

        public void refresh();
    }

    static interface QueryExecutorFactory {
        public QueryExecutor create(Predicate var1, @Nullable Filter<InternalRow> var2);
    }
}

