/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.flink.lookup.FullCacheLookupTable;
import org.apache.paimon.flink.lookup.PrimaryKeyLookupTable;
import org.apache.paimon.lookup.SetState;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.KeyProjectedRow;
import org.apache.paimon.utils.TypeUtils;

public class SecondaryIndexLookupTable
extends PrimaryKeyLookupTable {
    private final KeyProjectedRow secKeyRow;
    private SetState<InternalRow, InternalRow> indexState;

    public SecondaryIndexLookupTable(FullCacheLookupTable.Context context, long lruCacheSize) {
        super(context, lruCacheSize / 2L, context.table.primaryKeys());
        List<String> fieldNames = this.projectedType.getFieldNames();
        int[] secKeyMapping = context.joinKey.stream().mapToInt(fieldNames::indexOf).toArray();
        this.secKeyRow = new KeyProjectedRow(secKeyMapping);
    }

    @Override
    public void open() throws Exception {
        this.init();
        this.createTableState();
        this.indexState = this.stateFactory.setState("sec-index", InternalSerializers.create(TypeUtils.project(this.projectedType, this.secKeyRow.indexMapping())), InternalSerializers.create(TypeUtils.project(this.projectedType, this.primaryKeyRow.indexMapping())), this.lruCacheSize);
        this.bootstrap();
    }

    @Override
    public List<InternalRow> innerGet(InternalRow key) throws IOException {
        List<InternalRow> pks = this.indexState.get(key);
        ArrayList<InternalRow> values = new ArrayList<InternalRow>(pks.size());
        for (InternalRow pk : pks) {
            InternalRow row = (InternalRow)this.tableState.get(pk);
            if (row == null) continue;
            values.add(row);
        }
        return values;
    }

    @Override
    protected void refreshRow(InternalRow row, Predicate predicate) throws IOException {
        this.primaryKeyRow.replaceRow(row);
        boolean previousFetched = false;
        InternalRow previous = null;
        if (this.userDefinedSeqComparator != null) {
            previous = (InternalRow)this.tableState.get(this.primaryKeyRow);
            previousFetched = true;
            if (previous != null && this.userDefinedSeqComparator.compare(previous, row) > 0) {
                return;
            }
        }
        if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
            if (!previousFetched) {
                previous = (InternalRow)this.tableState.get(this.primaryKeyRow);
            }
            if (previous != null) {
                this.indexState.retract(this.secKeyRow.replaceRow(previous), this.primaryKeyRow);
            }
            if (predicate == null || predicate.test(row)) {
                this.tableState.put(this.primaryKeyRow, row);
                this.indexState.add(this.secKeyRow.replaceRow(row), this.primaryKeyRow);
            } else {
                this.tableState.delete(this.primaryKeyRow);
            }
        } else {
            this.tableState.delete(this.primaryKeyRow);
            this.indexState.retract(this.secKeyRow.replaceRow(row), this.primaryKeyRow);
        }
    }

    @Override
    public void bulkLoadWritePlus(byte[] key, byte[] value) throws IOException {
        InternalRow row = (InternalRow)this.tableState.deserializeValue(value);
        this.indexState.add(this.secKeyRow.replaceRow(row), this.primaryKeyRow.replaceRow(row));
    }
}

