/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.flink.lookup.FullCacheLookupTable;
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBListState;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.KeyProjectedRow;
import org.apache.paimon.utils.TypeUtils;

public class NoPrimaryKeyLookupTable
extends FullCacheLookupTable {
    private final RocksDBListState<InternalRow, InternalRow> state;
    private final KeyProjectedRow joinKeyRow;

    public NoPrimaryKeyLookupTable(FullCacheLookupTable.Context context, long lruCacheSize) throws IOException {
        super(context);
        List<String> fieldNames = this.projectedType.getFieldNames();
        int[] joinKeyMapping = context.joinKey.stream().mapToInt(fieldNames::indexOf).toArray();
        this.joinKeyRow = new KeyProjectedRow(joinKeyMapping);
        this.state = this.stateFactory.listState("join-key-index", InternalSerializers.create(TypeUtils.project(this.projectedType, joinKeyMapping)), InternalSerializers.create(this.projectedType), lruCacheSize);
    }

    @Override
    public List<InternalRow> innerGet(InternalRow key) throws IOException {
        return this.state.get(key);
    }

    @Override
    public void refresh(Iterator<InternalRow> incremental, boolean orderByLastField) throws IOException {
        if (orderByLastField) {
            throw new IllegalArgumentException("Append table does not support order by last field.");
        }
        while (incremental.hasNext()) {
            InternalRow row = incremental.next();
            this.joinKeyRow.replaceRow(row);
            if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
                if (!this.recordFilter().test(row)) continue;
                this.state.add(this.joinKeyRow, row);
                continue;
            }
            throw new RuntimeException(String.format("Received %s message. Only INSERT/UPDATE_AFTER values are expected here.", new Object[]{row.getRowKind()}));
        }
    }

    @Override
    public byte[] toKeyBytes(InternalRow row) throws IOException {
        this.joinKeyRow.replaceRow(row);
        return this.state.serializeKey(this.joinKeyRow);
    }

    @Override
    public byte[] toValueBytes(InternalRow row) throws IOException {
        return this.state.serializeValue(row);
    }

    @Override
    public FullCacheLookupTable.TableBulkLoader createBulkLoader() {
        final BulkLoader bulkLoader = this.state.createBulkLoader();
        return new FullCacheLookupTable.TableBulkLoader(){
            private final List<byte[]> values = new ArrayList<byte[]>();
            private byte[] currentKey;

            @Override
            public void write(byte[] key, byte[] value) throws IOException {
                if (this.currentKey != null && !Arrays.equals(key, this.currentKey)) {
                    this.flush();
                }
                this.currentKey = key;
                this.values.add(value);
            }

            @Override
            public void finish() throws IOException {
                this.flush();
                bulkLoader.finish();
            }

            private void flush() throws IOException {
                if (this.currentKey != null && this.values.size() > 0) {
                    try {
                        bulkLoader.write(this.currentKey, NoPrimaryKeyLookupTable.this.state.serializeList(this.values));
                    }
                    catch (BulkLoader.WriteException e) {
                        throw new RuntimeException(e);
                    }
                }
                this.currentKey = null;
                this.values.clear();
            }
        };
    }
}

