/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.lookup.FixedBucketFromPkExtractor;
import org.apache.paimon.flink.lookup.LookupFileStoreTable;
import org.apache.paimon.flink.lookup.LookupTable;
import org.apache.paimon.flink.query.RemoteTableQuery;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ProjectedRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrimaryKeyPartialLookupTable
implements LookupTable {
    private final QueryExecutorFactory executorFactory;
    private final FixedBucketFromPkExtractor extractor;
    @Nullable
    private final ProjectedRow keyRearrange;
    @Nullable
    private final ProjectedRow trimmedKeyRearrange;
    private Predicate specificPartition;
    @Nullable
    private Filter<InternalRow> cacheRowFilter;
    private QueryExecutor queryExecutor;

    private PrimaryKeyPartialLookupTable(QueryExecutorFactory executorFactory, FileStoreTable table, List<String> joinKey) {
        this.executorFactory = executorFactory;
        if (table.bucketMode() != BucketMode.HASH_FIXED) {
            throw new UnsupportedOperationException("Unsupported mode for partial lookup: " + (Object)((Object)table.bucketMode()));
        }
        this.extractor = new FixedBucketFromPkExtractor(table.schema());
        ProjectedRow keyRearrange = null;
        if (!table.primaryKeys().equals(joinKey)) {
            keyRearrange = ProjectedRow.from(table.primaryKeys().stream().map(joinKey::indexOf).mapToInt(value -> value).toArray());
        }
        this.keyRearrange = keyRearrange;
        List<String> trimmedPrimaryKeys = table.schema().trimmedPrimaryKeys();
        ProjectedRow trimmedKeyRearrange = null;
        if (!trimmedPrimaryKeys.equals(joinKey)) {
            trimmedKeyRearrange = ProjectedRow.from(trimmedPrimaryKeys.stream().map(joinKey::indexOf).mapToInt(value -> value).toArray());
        }
        this.trimmedKeyRearrange = trimmedKeyRearrange;
    }

    @VisibleForTesting
    QueryExecutor queryExecutor() {
        return this.queryExecutor;
    }

    @Override
    public void specificPartitionFilter(Predicate filter) {
        this.specificPartition = filter;
    }

    @Override
    public void open() throws Exception {
        this.queryExecutor = this.executorFactory.create(this.specificPartition, this.cacheRowFilter);
        this.refresh();
    }

    @Override
    public List<InternalRow> get(InternalRow key) throws IOException {
        InternalRow kv;
        InternalRow adjustedKey = key;
        if (this.keyRearrange != null) {
            adjustedKey = this.keyRearrange.replaceRow(adjustedKey);
        }
        this.extractor.setRecord(adjustedKey);
        int bucket = this.extractor.bucket();
        BinaryRow partition = this.extractor.partition();
        InternalRow trimmedKey = key;
        if (this.trimmedKeyRearrange != null) {
            trimmedKey = this.trimmedKeyRearrange.replaceRow(trimmedKey);
        }
        if ((kv = this.queryExecutor.lookup(partition, bucket, trimmedKey)) == null) {
            return Collections.emptyList();
        }
        return Collections.singletonList(kv);
    }

    @Override
    public void refresh() {
        this.queryExecutor.refresh();
    }

    @Override
    public void specifyCacheRowFilter(Filter<InternalRow> filter) {
        this.cacheRowFilter = filter;
    }

    @Override
    public void close() throws IOException {
        if (this.queryExecutor != null) {
            this.queryExecutor.close();
        }
    }

    public static PrimaryKeyPartialLookupTable createLocalTable(FileStoreTable table, int[] projection, File tempPath, List<String> joinKey, Set<Integer> requireCachedBucketIds) {
        return new PrimaryKeyPartialLookupTable((filter, cacheRowFilter) -> new LocalQueryExecutor(new LookupFileStoreTable(table, joinKey), projection, tempPath, filter, requireCachedBucketIds, cacheRowFilter), table, joinKey);
    }

    public static PrimaryKeyPartialLookupTable createRemoteTable(FileStoreTable table, int[] projection, List<String> joinKey) {
        return new PrimaryKeyPartialLookupTable((filter, cacheRowFilter) -> new RemoteQueryExecutor(table, projection), table, joinKey);
    }

    static class RemoteQueryExecutor
    implements QueryExecutor {
        private final RemoteTableQuery tableQuery;

        private RemoteQueryExecutor(FileStoreTable table, int[] projection) {
            this.tableQuery = new RemoteTableQuery(table).withValueProjection(projection);
        }

        @Override
        public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {
            return this.tableQuery.lookup(partition, bucket, key);
        }

        @Override
        public void refresh() {
        }

        @Override
        public void close() throws IOException {
            this.tableQuery.close();
        }
    }

    static class LocalQueryExecutor
    implements QueryExecutor {
        private static final Logger LOG = LoggerFactory.getLogger(LocalQueryExecutor.class);
        private final LocalTableQuery tableQuery;
        private final StreamTableScan scan;
        private final String tableName;

        private LocalQueryExecutor(FileStoreTable table, int[] projection, File tempPath, @Nullable Predicate filter, Set<Integer> requireCachedBucketIds, @Nullable Filter<InternalRow> cacheRowFilter) {
            this.tableQuery = table.newLocalTableQuery().withValueProjection(projection).withIOManager(new IOManagerImpl(tempPath.toString()));
            if (cacheRowFilter != null) {
                this.tableQuery.withCacheRowFilter(cacheRowFilter);
            }
            this.scan = table.newReadBuilder().dropStats().withFilter(filter).withBucketFilter(requireCachedBucketIds == null ? null : requireCachedBucketIds::contains).newStreamScan();
            this.tableName = table.name();
        }

        @Override
        public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {
            return this.tableQuery.lookup(partition, bucket, key);
        }

        @Override
        public void refresh() {
            block0: while (true) {
                List<Split> splits = this.scan.plan().splits();
                this.log(splits);
                if (splits.isEmpty()) {
                    return;
                }
                Iterator<Split> iterator2 = splits.iterator();
                while (true) {
                    if (!iterator2.hasNext()) continue block0;
                    Split split = iterator2.next();
                    BinaryRow partition = ((DataSplit)split).partition();
                    int bucket = ((DataSplit)split).bucket();
                    List<DataFileMeta> before = ((DataSplit)split).beforeFiles();
                    List<DataFileMeta> after = ((DataSplit)split).dataFiles();
                    this.tableQuery.refreshFiles(partition, bucket, before, after);
                }
                break;
            }
        }

        @Override
        public void close() throws IOException {
            this.tableQuery.close();
        }

        private void log(List<Split> splits) {
            if (splits.isEmpty()) {
                LOG.info("LocalQueryExecutor didn't get splits from {}.", (Object)this.tableName);
                return;
            }
            DataSplit dataSplit = (DataSplit)splits.get(0);
            LOG.info("LocalQueryExecutor get splits from {} with snapshotId {}.", (Object)this.tableName, (Object)dataSplit.snapshotId());
        }
    }

    static interface QueryExecutor
    extends Closeable {
        public InternalRow lookup(BinaryRow var1, int var2, InternalRow var3) throws IOException;

        public void refresh();
    }

    static interface QueryExecutorFactory {
        public QueryExecutor create(Predicate var1, @Nullable Filter<InternalRow> var2);
    }
}

