/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.List;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.mergetree.SortBufferWriteBuffer;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;

public class LocalMergeOperator
extends AbstractStreamOperator<InternalRow>
implements OneInputStreamOperator<InternalRow, InternalRow>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    TableSchema schema;
    private transient Projection keyProjection;
    private transient RecordComparator keyComparator;
    private transient long recordCount;
    private transient SequenceGenerator sequenceGenerator;
    private transient MergeFunction<KeyValue> mergeFunction;
    private transient SortBufferWriteBuffer buffer;
    private transient long currentWatermark;
    private transient boolean endOfInput;

    public LocalMergeOperator(TableSchema schema) {
        Preconditions.checkArgument(schema.primaryKeys().size() > 0, "LocalMergeOperator currently only support tables with primary keys");
        this.schema = schema;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void open() throws Exception {
        super.open();
        RowType keyType = PrimaryKeyTableUtils.addKeyNamePrefix(this.schema.logicalPrimaryKeysType());
        RowType valueType = this.schema.logicalRowType();
        CoreOptions options = new CoreOptions(this.schema.options());
        this.keyProjection = CodeGenUtils.newProjection(this.schema.logicalRowType(), this.schema.projection(this.schema.primaryKeys()));
        this.keyComparator = new KeyComparatorSupplier(keyType).get();
        this.recordCount = 0L;
        this.sequenceGenerator = SequenceGenerator.create(this.schema, options);
        this.mergeFunction = PrimaryKeyTableUtils.createMergeFunctionFactory(this.schema, new KeyValueFieldsExtractor(){
            private static final long serialVersionUID = 1L;

            @Override
            public List<DataField> keyFields(TableSchema schema) {
                return schema.primaryKeysFields();
            }

            @Override
            public List<DataField> valueFields(TableSchema schema) {
                return schema.fields();
            }
        }).create();
        this.buffer = new SortBufferWriteBuffer(keyType, valueType, new HeapMemorySegmentPool(options.localMergeBufferSize(), options.pageSize()), false, options.localSortMaxNumFileHandles(), null);
        this.currentWatermark = Long.MIN_VALUE;
        this.endOfInput = false;
    }

    public void processElement(StreamRecord<InternalRow> record) throws Exception {
        long sequenceNumber;
        ++this.recordCount;
        InternalRow row = (InternalRow)record.getValue();
        RowKind rowKind = row.getRowKind();
        row.setRowKind(RowKind.INSERT);
        BinaryRow key = this.keyProjection.apply(row);
        long l = sequenceNumber = this.sequenceGenerator == null ? this.recordCount : this.sequenceGenerator.generate(row);
        if (!this.buffer.put(sequenceNumber, rowKind, key, row)) {
            this.flushBuffer();
            if (!this.buffer.put(sequenceNumber, rowKind, key, row)) {
                row.setRowKind(rowKind);
                this.output.collect(record);
            }
        }
    }

    public void processWatermark(Watermark mark) {
        this.currentWatermark = mark.getTimestamp();
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        if (!this.endOfInput) {
            this.flushBuffer();
        }
    }

    public void endInput() throws Exception {
        this.endOfInput = true;
        this.flushBuffer();
    }

    public void close() throws Exception {
        if (this.buffer != null) {
            this.buffer.clear();
        }
        super.close();
    }

    private void flushBuffer() throws Exception {
        if (this.buffer.size() == 0) {
            return;
        }
        this.buffer.forEach(this.keyComparator, this.mergeFunction, null, kv -> {
            InternalRow row = kv.value();
            row.setRowKind(kv.valueKind());
            this.output.collect((Object)new StreamRecord((Object)row));
        });
        this.buffer.clear();
        if (this.currentWatermark != Long.MIN_VALUE) {
            super.processWatermark(new Watermark(this.currentWatermark));
            this.currentWatermark = Long.MIN_VALUE;
        }
    }
}

