/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.function.Predicate;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.flink.lookup.LookupTable;
import org.apache.paimon.lookup.RocksDBListState;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyProjectedRow;
import org.apache.paimon.utils.TypeUtils;

public class NoPrimaryKeyLookupTable
implements LookupTable {
    private final RocksDBListState<InternalRow, InternalRow> state;
    private final Predicate<InternalRow> recordFilter;
    private final KeyProjectedRow joinKeyRow;

    public NoPrimaryKeyLookupTable(RocksDBStateFactory stateFactory, RowType rowType, List<String> joinKeys, Predicate<InternalRow> recordFilter, long lruCacheSize) throws IOException {
        List<String> fieldNames = rowType.getFieldNames();
        int[] joinKeyMapping = joinKeys.stream().mapToInt(fieldNames::indexOf).toArray();
        this.joinKeyRow = new KeyProjectedRow(joinKeyMapping);
        this.state = stateFactory.listState("join-key-index", InternalSerializers.create(TypeUtils.project(rowType, joinKeyMapping)), InternalSerializers.create(rowType), lruCacheSize);
        this.recordFilter = recordFilter;
    }

    @Override
    public List<InternalRow> get(InternalRow key) throws IOException {
        return this.state.get(key);
    }

    @Override
    public void refresh(Iterator<InternalRow> incremental) throws IOException {
        while (incremental.hasNext()) {
            InternalRow row = incremental.next();
            this.joinKeyRow.replaceRow(row);
            if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
                if (!this.recordFilter.test(row)) continue;
                this.state.add(this.joinKeyRow, row);
                continue;
            }
            throw new RuntimeException(String.format("Received %s message. Only INSERT/UPDATE_AFTER values are expected here.", new Object[]{row.getRowKind()}));
        }
    }
}

