/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.lookup.FixedBucketFromPkExtractor;
import org.apache.paimon.flink.lookup.LookupTable;
import org.apache.paimon.flink.query.RemoteTableQuery;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.Projection;

public class PrimaryKeyPartialLookupTable
implements LookupTable {
    private final QueryExecutor queryExecutor;
    private final FixedBucketFromPkExtractor extractor;
    @Nullable
    private final ProjectedRow keyRearrange;

    private PrimaryKeyPartialLookupTable(QueryExecutor queryExecutor, FileStoreTable table, List<String> joinKey) {
        this.queryExecutor = queryExecutor;
        if (table.partitionKeys().size() > 0) {
            throw new UnsupportedOperationException("The partitioned table are not supported in partial cache mode.");
        }
        if (table.bucketMode() != BucketMode.FIXED) {
            throw new UnsupportedOperationException("Unsupported mode for partial lookup: " + (Object)((Object)table.bucketMode()));
        }
        this.extractor = new FixedBucketFromPkExtractor(table.schema());
        ProjectedRow keyRearrange = null;
        if (!table.primaryKeys().equals(joinKey)) {
            keyRearrange = ProjectedRow.from(table.primaryKeys().stream().map(joinKey::indexOf).mapToInt(value -> value).toArray());
        }
        this.keyRearrange = keyRearrange;
    }

    @VisibleForTesting
    QueryExecutor queryExecutor() {
        return this.queryExecutor;
    }

    @Override
    public void open() throws Exception {
        this.refresh();
    }

    @Override
    public List<InternalRow> get(InternalRow key) throws IOException {
        if (this.keyRearrange != null) {
            key = this.keyRearrange.replaceRow(key);
        }
        this.extractor.setRecord(key);
        int bucket = this.extractor.bucket();
        BinaryRow partition = this.extractor.partition();
        InternalRow kv = this.queryExecutor.lookup(partition, bucket, key);
        if (kv == null) {
            return Collections.emptyList();
        }
        return Collections.singletonList(kv);
    }

    @Override
    public void refresh() {
        this.queryExecutor.refresh();
    }

    @Override
    public void close() throws IOException {
        this.queryExecutor.close();
    }

    public static PrimaryKeyPartialLookupTable createLocalTable(FileStoreTable table, int[] projection, File tempPath, List<String> joinKey) {
        LocalQueryExecutor queryExecutor = new LocalQueryExecutor(table, projection, tempPath);
        return new PrimaryKeyPartialLookupTable(queryExecutor, table, joinKey);
    }

    public static PrimaryKeyPartialLookupTable createRemoteTable(FileStoreTable table, int[] projection, List<String> joinKey) {
        RemoveQueryExecutor queryExecutor = new RemoveQueryExecutor(table, projection);
        return new PrimaryKeyPartialLookupTable(queryExecutor, table, joinKey);
    }

    static class RemoveQueryExecutor
    implements QueryExecutor {
        private final RemoteTableQuery tableQuery;

        private RemoveQueryExecutor(FileStoreTable table, int[] projection) {
            this.tableQuery = new RemoteTableQuery(table).withValueProjection(projection);
        }

        @Override
        public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {
            return this.tableQuery.lookup(partition, bucket, key);
        }

        @Override
        public void refresh() {
        }

        @Override
        public void close() throws IOException {
            this.tableQuery.close();
        }
    }

    static class LocalQueryExecutor
    implements QueryExecutor {
        private final LocalTableQuery tableQuery;
        private final StreamTableScan scan;

        private LocalQueryExecutor(FileStoreTable table, int[] projection, File tempPath) {
            this.tableQuery = table.newLocalTableQuery().withValueProjection(Projection.of(projection).toNestedIndexes()).withIOManager(new IOManagerImpl(tempPath.toString()));
            HashMap<String, String> dynamicOptions = new HashMap<String, String>();
            dynamicOptions.put(CoreOptions.STREAM_SCAN_MODE.key(), CoreOptions.StreamScanMode.FILE_MONITOR.getValue());
            dynamicOptions.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), null);
            this.scan = table.copy(dynamicOptions).newReadBuilder().newStreamScan();
        }

        @Override
        public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {
            return this.tableQuery.lookup(partition, bucket, key);
        }

        @Override
        public void refresh() {
            List<Split> splits;
            block0: while (!(splits = this.scan.plan().splits()).isEmpty()) {
                Iterator<Split> iterator = splits.iterator();
                while (true) {
                    if (!iterator.hasNext()) continue block0;
                    Split split = iterator.next();
                    if (!(split instanceof DataSplit)) {
                        throw new IllegalArgumentException("Unsupported split: " + split.getClass());
                    }
                    BinaryRow partition = ((DataSplit)split).partition();
                    int bucket = ((DataSplit)split).bucket();
                    List<DataFileMeta> before = ((DataSplit)split).beforeFiles();
                    List<DataFileMeta> after = ((DataSplit)split).dataFiles();
                    this.tableQuery.refreshFiles(partition, bucket, before, after);
                }
                break;
            }
            return;
        }

        @Override
        public void close() throws IOException {
            this.tableQuery.close();
        }
    }

    static interface QueryExecutor
    extends Closeable {
        public InternalRow lookup(BinaryRow var1, int var2, InternalRow var3) throws IOException;

        public void refresh();
    }
}

