/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sorter;

import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sorter.SortUtils;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.shade.guava30.com.google.common.primitives.UnsignedBytes;
import org.apache.paimon.sort.hilbert.HilbertIndexer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

public class HilbertSorter
extends TableSorter {
    private static final RowType KEY_TYPE = new RowType(Collections.singletonList(new DataField(0, "H_INDEX", DataTypes.BYTES())));
    private final TableSortInfo tableSortInfo;

    public HilbertSorter(StreamExecutionEnvironment batchTEnv, DataStream<RowData> origin, FileStoreTable table, TableSortInfo tableSortInfo) {
        super(batchTEnv, origin, table, tableSortInfo.getSortColumns());
        this.tableSortInfo = tableSortInfo;
    }

    @Override
    public DataStream<RowData> sort() {
        return this.sortStreamByHilbert((DataStream<RowData>)this.origin, this.table);
    }

    private DataStream<RowData> sortStreamByHilbert(DataStream<RowData> inputStream2, FileStoreTable table) {
        final HilbertIndexer hilbertIndexer = new HilbertIndexer(table.rowType(), this.orderColNames);
        return SortUtils.sortStreamByKey(inputStream2, table, KEY_TYPE, TypeInformation.of(byte[].class), () -> (b1, b2) -> {
            assert (((byte[])b1).length == ((byte[])b2).length);
            for (int i = 0; i < ((byte[])b1).length; ++i) {
                int ret = UnsignedBytes.compare(b1[i], b2[i]);
                if (ret == 0) continue;
                return ret;
            }
            return 0;
        }, new SortUtils.KeyAbstract<byte[]>(){

            @Override
            public void open() {
                hilbertIndexer.open();
            }

            @Override
            public byte[] apply(RowData value) {
                byte[] hilbert = hilbertIndexer.index(new FlinkRowWrapper(value));
                return Arrays.copyOf(hilbert, hilbert.length);
            }
        }, xva$0 -> GenericRow.of(xva$0), this.tableSortInfo);
    }
}

