/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.lookup.LookupTable;
import org.apache.paimon.flink.lookup.NoPrimaryKeyLookupTable;
import org.apache.paimon.flink.lookup.PrimaryKeyLookupTable;
import org.apache.paimon.flink.lookup.SecondaryIndexLookupTable;
import org.apache.paimon.flink.lookup.TableStreamingReader;
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBState;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.PartialRow;
import org.apache.paimon.utils.TypeUtils;

public abstract class FullCacheLookupTable
implements LookupTable {
    protected final Context context;
    protected final RocksDBStateFactory stateFactory;
    protected final RowType projectedType;
    private final TableStreamingReader reader;
    private final boolean sequenceFieldEnabled;

    public FullCacheLookupTable(Context context) throws IOException {
        this.context = context;
        this.stateFactory = new RocksDBStateFactory(context.tempPath.toString(), context.table.coreOptions().toConfiguration(), null);
        FileStoreTable table = context.table;
        this.reader = new TableStreamingReader(table, context.projection, context.predicate);
        this.sequenceFieldEnabled = table.primaryKeys().size() > 0 && new CoreOptions(table.options()).sequenceField().isPresent();
        RowType projectedType = TypeUtils.project(table.rowType(), context.projection);
        if (this.sequenceFieldEnabled) {
            projectedType = projectedType.appendDataField("_SEQUENCE_NUMBER", DataTypes.BIGINT());
        }
        this.projectedType = projectedType;
    }

    @Override
    public void open() throws Exception {
        BinaryExternalSortBuffer bulkLoadSorter = RocksDBState.createBulkLoadSorter(IOManager.create(this.context.tempPath.toString()), this.context.table.coreOptions());
        try (RecordReaderIterator<InternalRow> batch = new RecordReaderIterator<InternalRow>(this.reader.nextBatch(true, this.sequenceFieldEnabled));){
            while (batch.hasNext()) {
                InternalRow row = batch.next();
                if (!this.recordFilter().test(row)) continue;
                bulkLoadSorter.write(GenericRow.of(this.toKeyBytes(row), this.toValueBytes(row)));
            }
        }
        MutableObjectIterator<BinaryRow> keyIterator = bulkLoadSorter.sortedIterator();
        BinaryRow row = new BinaryRow(2);
        TableBulkLoader bulkLoader = this.createBulkLoader();
        try {
            while ((row = keyIterator.next(row)) != null) {
                bulkLoader.write(row.getBinary(0), row.getBinary(1));
            }
        }
        catch (BulkLoader.WriteException e) {
            throw new RuntimeException("Exception in bulkLoad, the most suspicious reason is that your data contains duplicates, please check your lookup table. ", e.getCause());
        }
        bulkLoader.finish();
        bulkLoadSorter.clear();
    }

    @Override
    public void refresh() throws Exception {
        while (true) {
            RecordReaderIterator<InternalRow> batch = new RecordReaderIterator<InternalRow>(this.reader.nextBatch(false, this.sequenceFieldEnabled));
            Throwable throwable = null;
            try {
                if (!batch.hasNext()) {
                    return;
                }
                this.refresh(batch, this.sequenceFieldEnabled);
                continue;
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (batch == null) continue;
                if (throwable != null) {
                    try {
                        batch.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                batch.close();
                continue;
            }
            break;
        }
    }

    @Override
    public final List<InternalRow> get(InternalRow key) throws IOException {
        List<InternalRow> values = this.innerGet(key);
        if (!this.sequenceFieldEnabled) {
            return values;
        }
        ArrayList<InternalRow> dropSequence = new ArrayList<InternalRow>(values.size());
        for (InternalRow matchedRow : values) {
            dropSequence.add(new PartialRow(matchedRow.getFieldCount() - 1, matchedRow));
        }
        return dropSequence;
    }

    public abstract List<InternalRow> innerGet(InternalRow var1) throws IOException;

    public abstract void refresh(Iterator<InternalRow> var1, boolean var2) throws IOException;

    public Filter<InternalRow> recordFilter() {
        return this.context.recordFilter;
    }

    public abstract byte[] toKeyBytes(InternalRow var1) throws IOException;

    public abstract byte[] toValueBytes(InternalRow var1) throws IOException;

    public abstract TableBulkLoader createBulkLoader();

    @Override
    public void close() throws IOException {
        this.stateFactory.close();
    }

    static FullCacheLookupTable create(Context context, long lruCacheSize) throws IOException {
        List<String> primaryKeys = context.table.primaryKeys();
        if (primaryKeys.isEmpty()) {
            return new NoPrimaryKeyLookupTable(context, lruCacheSize);
        }
        if (new HashSet<String>(primaryKeys).equals(new HashSet<String>(context.joinKey))) {
            return new PrimaryKeyLookupTable(context, lruCacheSize, context.joinKey);
        }
        return new SecondaryIndexLookupTable(context, lruCacheSize);
    }

    public static class Context {
        public final FileStoreTable table;
        public final int[] projection;
        @Nullable
        public final Predicate predicate;
        public final File tempPath;
        public final Filter<InternalRow> recordFilter;
        public final List<String> joinKey;

        public Context(FileStoreTable table, int[] projection, @Nullable Predicate predicate, File tempPath, Filter<InternalRow> recordFilter, List<String> joinKey) {
            this.table = table;
            this.projection = projection;
            this.predicate = predicate;
            this.tempPath = tempPath;
            this.recordFilter = recordFilter;
            this.joinKey = joinKey;
        }
    }

    public static interface TableBulkLoader {
        public void write(byte[] var1, byte[] var2) throws BulkLoader.WriteException, IOException;

        public void finish() throws IOException;
    }
}

