/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.List;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.mergetree.SortBufferWriteBuffer;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.localmerge.HashMapLocalMerger;
import org.apache.paimon.mergetree.localmerge.LocalMerger;
import org.apache.paimon.mergetree.localmerge.SortBufferLocalMerger;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.UserDefinedSeqComparator;

public class LocalMergeOperator
extends AbstractStreamOperator<InternalRow>
implements OneInputStreamOperator<InternalRow, InternalRow>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private final TableSchema schema;
    private final boolean ignoreDelete;
    private transient Projection keyProjection;
    private transient RowKindGenerator rowKindGenerator;
    private transient LocalMerger merger;
    private transient long currentWatermark;
    private transient boolean endOfInput;

    private LocalMergeOperator(StreamOperatorParameters<InternalRow> parameters, TableSchema schema) {
        Preconditions.checkArgument(schema.primaryKeys().size() > 0, "LocalMergeOperator currently only support tables with primary keys");
        this.schema = schema;
        this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete();
        this.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
    }

    public void open() throws Exception {
        super.open();
        List<String> primaryKeys = this.schema.primaryKeys();
        RowType valueType = this.schema.logicalRowType();
        CoreOptions options = new CoreOptions(this.schema.options());
        this.keyProjection = CodeGenUtils.newProjection(valueType, this.schema.projection(primaryKeys));
        this.rowKindGenerator = RowKindGenerator.create(this.schema, options);
        MergeFunction<KeyValue> mergeFunction = PrimaryKeyTableUtils.createMergeFunctionFactory(this.schema, new KeyValueFieldsExtractor(){
            private static final long serialVersionUID = 1L;

            @Override
            public List<DataField> keyFields(TableSchema schema) {
                return PrimaryKeyTableUtils.addKeyNamePrefix(schema.primaryKeysFields());
            }

            @Override
            public List<DataField> valueFields(TableSchema schema) {
                return schema.fields();
            }
        }).create();
        boolean canHashMerger = true;
        for (DataField field : valueType.getFields()) {
            if (primaryKeys.contains(field.name()) || BinaryRow.isInFixedLengthPart(field.type())) continue;
            canHashMerger = false;
            break;
        }
        HeapMemorySegmentPool pool = new HeapMemorySegmentPool(options.localMergeBufferSize(), options.pageSize());
        UserDefinedSeqComparator udsComparator = UserDefinedSeqComparator.create(valueType, options);
        if (canHashMerger) {
            this.merger = new HashMapLocalMerger(valueType, primaryKeys, pool, mergeFunction, udsComparator);
        } else {
            RowType keyType = PrimaryKeyTableUtils.addKeyNamePrefix(this.schema.logicalPrimaryKeysType());
            SortBufferWriteBuffer sortBuffer = new SortBufferWriteBuffer(keyType, valueType, udsComparator, pool, false, MemorySize.MAX_VALUE, options.localSortMaxNumFileHandles(), options.spillCompressOptions(), null);
            this.merger = new SortBufferLocalMerger(sortBuffer, new KeyComparatorSupplier(keyType).get(), mergeFunction);
        }
        this.currentWatermark = Long.MIN_VALUE;
        this.endOfInput = false;
    }

    public void processElement(StreamRecord<InternalRow> record) throws Exception {
        InternalRow row = (InternalRow)record.getValue();
        RowKind rowKind = RowKindGenerator.getRowKind(this.rowKindGenerator, row);
        if (this.ignoreDelete && rowKind.isRetract()) {
            return;
        }
        row.setRowKind(RowKind.INSERT);
        BinaryRow key = this.keyProjection.apply(row);
        if (!this.merger.put(rowKind, key, row)) {
            this.flushBuffer();
            if (!this.merger.put(rowKind, key, row)) {
                row.setRowKind(rowKind);
                this.output.collect(record);
            }
        }
    }

    public void processWatermark(Watermark mark) {
        this.currentWatermark = mark.getTimestamp();
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        if (!this.endOfInput) {
            this.flushBuffer();
        }
    }

    public void endInput() throws Exception {
        this.endOfInput = true;
        this.flushBuffer();
    }

    public void close() throws Exception {
        if (this.merger != null) {
            this.merger.clear();
        }
        super.close();
    }

    private void flushBuffer() throws Exception {
        if (this.merger.size() == 0) {
            return;
        }
        this.merger.forEach(row -> this.output.collect((Object)new StreamRecord(row)));
        this.merger.clear();
        if (this.currentWatermark != Long.MIN_VALUE) {
            super.processWatermark(new Watermark(this.currentWatermark));
            this.currentWatermark = Long.MIN_VALUE;
        }
    }

    @VisibleForTesting
    LocalMerger merger() {
        return this.merger;
    }

    @VisibleForTesting
    void setOutput(Output<StreamRecord<InternalRow>> output) {
        this.output = output;
    }

    public static class Factory
    extends AbstractStreamOperatorFactory<InternalRow>
    implements OneInputStreamOperatorFactory<InternalRow, InternalRow> {
        private final TableSchema schema;

        public Factory(TableSchema schema) {
            this.chainingStrategy = ChainingStrategy.ALWAYS;
            this.schema = schema;
        }

        public <T extends StreamOperator<InternalRow>> T createStreamOperator(StreamOperatorParameters<InternalRow> parameters) {
            return (T)((Object)new LocalMergeOperator(parameters, this.schema));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return LocalMergeOperator.class;
        }
    }
}

