/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sorter;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.shuffle.RangeShuffle;
import org.apache.paimon.flink.sorter.SortOperator;
import org.apache.paimon.shade.guava30.com.google.common.primitives.UnsignedBytes;
import org.apache.paimon.sort.zorder.ZIndexer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyProjectedRow;
import org.apache.paimon.utils.Pair;

public class ZorderSorterUtils {
    private static final RowType KEY_TYPE = new RowType(Collections.singletonList(new DataField(0, "Z_INDEX", DataTypes.BYTES())));

    public static DataStream<RowData> sortStreamByZorder(DataStream<RowData> inputStream, final ZIndexer zIndexer, FileStoreTable table) {
        RowType valueRowType = table.rowType();
        final int fieldCount = valueRowType.getFieldCount();
        int parallelism = inputStream.getParallelism();
        int sampleSize = parallelism * 1000;
        int rangeNum = parallelism * 10;
        long maxSortMemory = table.coreOptions().writeBufferSize();
        int pageSize = table.coreOptions().pageSize();
        SingleOutputStreamOperator inputWithKey = inputStream.map((MapFunction)new RichMapFunction<RowData, Pair<byte[], RowData>>(){

            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                zIndexer.open();
            }

            public Pair<byte[], RowData> map(RowData value) {
                byte[] zorder = zIndexer.index(new FlinkRowWrapper(value));
                return Pair.of(Arrays.copyOf(zorder, zorder.length), value);
            }
        }).setParallelism(parallelism);
        return RangeShuffle.rangeShuffleByKey(inputWithKey, (Comparator & Serializable)(b1, b2) -> {
            assert (((byte[])b1).length == ((byte[])b2).length);
            for (int i = 0; i < ((byte[])b1).length; ++i) {
                int ret = UnsignedBytes.compare(b1[i], b2[i]);
                if (ret == 0) continue;
                return ret;
            }
            return 0;
        }, byte[].class, sampleSize, rangeNum).map((MapFunction & Serializable)a -> new JoinedRow(GenericRow.of(a.getLeft()), new FlinkRowWrapper((RowData)a.getRight())), TypeInformation.of(InternalRow.class)).setParallelism(parallelism).transform("LOCAL SORT", TypeInformation.of(InternalRow.class), (OneInputStreamOperator)new SortOperator(KEY_TYPE, valueRowType, maxSortMemory, pageSize)).setParallelism(parallelism).map((MapFunction)new RichMapFunction<InternalRow, InternalRow>(){
            private transient KeyProjectedRow keyProjectedRow;

            public void open(Configuration parameters) {
                int[] map = new int[fieldCount];
                for (int i = 0; i < map.length; ++i) {
                    map[i] = i + 1;
                }
                this.keyProjectedRow = new KeyProjectedRow(map);
            }

            public InternalRow map(InternalRow value) {
                return this.keyProjectedRow.replaceRow(value);
            }
        }).setParallelism(parallelism).map(FlinkRowData::new, inputStream.getType()).setParallelism(parallelism);
    }
}

