/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sorter;

import java.util.stream.IntStream;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.MutableObjectIterator;

public class SortOperator
extends TableStreamOperator<InternalRow>
implements OneInputStreamOperator<InternalRow, InternalRow>,
BoundedOneInput {
    private final RowType keyType;
    private final RowType rowType;
    private final long maxMemory;
    private final int pageSize;
    private final int arity;
    private final int spillSortMaxNumFiles;
    private final CompressOptions spillCompression;
    private final int sinkParallelism;
    private final MemorySize maxDiskSize;
    private final boolean sequenceOrder;
    private transient BinaryExternalSortBuffer buffer;
    private transient IOManager ioManager;

    public SortOperator(RowType keyType, RowType rowType, long maxMemory, int pageSize, int spillSortMaxNumFiles, CompressOptions spillCompression, int sinkParallelism, MemorySize maxDiskSize, boolean sequenceOrder) {
        this.keyType = keyType;
        this.rowType = rowType;
        this.maxMemory = maxMemory;
        this.pageSize = pageSize;
        this.arity = rowType.getFieldCount();
        this.spillSortMaxNumFiles = spillSortMaxNumFiles;
        this.spillCompression = spillCompression;
        this.sinkParallelism = sinkParallelism;
        this.maxDiskSize = maxDiskSize;
        this.sequenceOrder = sequenceOrder;
    }

    public void open() throws Exception {
        super.open();
        this.initBuffer();
        if (this.sinkParallelism != RuntimeContextUtils.getNumberOfParallelSubtasks((RuntimeContext)this.getRuntimeContext())) {
            throw new IllegalArgumentException("Please ensure that the runtime parallelism of the sink matches the initial configuration to avoid potential issues with skewed range partitioning.");
        }
    }

    @VisibleForTesting
    void initBuffer() {
        this.ioManager = IOManager.create(this.getContainingTask().getEnvironment().getIOManager().getSpillingDirectoriesPaths());
        this.buffer = BinaryExternalSortBuffer.create(this.ioManager, this.rowType, IntStream.range(0, this.keyType.getFieldCount()).toArray(), this.maxMemory, this.pageSize, this.spillSortMaxNumFiles, this.spillCompression, this.maxDiskSize, this.sequenceOrder);
    }

    public void endInput() throws Exception {
        if (this.buffer.size() > 0) {
            MutableObjectIterator<BinaryRow> iterator = this.buffer.sortedIterator();
            BinaryRow binaryRow = new BinaryRow(this.arity);
            while ((binaryRow = iterator.next(binaryRow)) != null) {
                this.output.collect((Object)new StreamRecord((Object)binaryRow));
            }
        }
    }

    public void close() throws Exception {
        super.close();
        if (this.buffer != null) {
            this.buffer.clear();
        }
        if (this.ioManager != null) {
            this.ioManager.close();
        }
    }

    public void processElement(StreamRecord<InternalRow> element) throws Exception {
        this.buffer.write((InternalRow)element.getValue());
    }

    @VisibleForTesting
    BinaryExternalSortBuffer getBuffer() {
        return this.buffer;
    }
}

