/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.AsyncLookupSinkWrite;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.RowDataStoreWriteOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.flink.utils.InternalTypeSerializer;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class WriterOperatorTest {
    @TempDir
    public java.nio.file.Path tempDir;
    private Path tablePath;
    private String commitUser;

    @BeforeEach
    public void before() {
        this.tablePath = new Path(this.tempDir.toString());
        this.commitUser = UUID.randomUUID().toString();
    }

    @Test
    public void testPrimaryKeyTableMetrics() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT()}, (String[])new String[]{"a", "b"});
        Options options = new Options();
        options.set("bucket", "1");
        options.set("write-buffer-size", "256 b");
        options.set("write-buffer-spillable", "false");
        options.set("page-size", "32 b");
        FileStoreTable table = this.createFileStoreTable(rowType, Collections.singletonList("a"), Collections.emptyList(), options);
        this.testMetricsImpl(table);
    }

    @Test
    public void testAppendOnlyTableMetrics() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT()}, (String[])new String[]{"a", "b"});
        Options options = new Options();
        options.set("write-buffer-for-append", "true");
        options.set("write-buffer-size", "256 b");
        options.set("page-size", "32 b");
        options.set("write-buffer-spillable", "false");
        FileStoreTable table = this.createFileStoreTable(rowType, Collections.emptyList(), Collections.emptyList(), options);
        this.testMetricsImpl(table);
    }

    private void testMetricsImpl(FileStoreTable fileStoreTable) throws Exception {
        String tableName = this.tablePath.getName();
        RowDataStoreWriteOperator.Factory operatorFactory = this.getStoreSinkWriteOperatorFactory(fileStoreTable);
        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness = this.createHarness(operatorFactory);
        TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        harness.setup(serializer);
        harness.open();
        int size = 10;
        for (int i = 0; i < size; ++i) {
            GenericRow row = GenericRow.of((Object[])new Object[]{1, 1});
            harness.processElement((Object)row, 1L);
        }
        harness.prepareSnapshotPreBarrier(1L);
        harness.snapshot(1L, 2L);
        harness.notifyOfCompletedCheckpoint(1L);
        OperatorMetricGroup metricGroup = harness.getOneInputOperator().getMetricGroup();
        MetricGroup writerBufferMetricGroup = metricGroup.addGroup("paimon").addGroup("table", tableName).addGroup("writerBuffer");
        Gauge bufferPreemptCount = TestingMetricUtils.getGauge(writerBufferMetricGroup, "bufferPreemptCount");
        Assertions.assertThat((Long)((Long)bufferPreemptCount.getValue())).isEqualTo(0L);
        Gauge totalWriteBufferSizeByte = TestingMetricUtils.getGauge(writerBufferMetricGroup, "totalWriteBufferSizeByte");
        Assertions.assertThat((Long)((Long)totalWriteBufferSizeByte.getValue())).isEqualTo(256L);
        GenericRow row = GenericRow.of((Object[])new Object[]{1, 1});
        harness.processElement((Object)row, 1L);
        Gauge usedWriteBufferSizeByte = TestingMetricUtils.getGauge(writerBufferMetricGroup, "usedWriteBufferSizeByte");
        Assertions.assertThat((Long)((Long)usedWriteBufferSizeByte.getValue())).isGreaterThan(0L);
        harness.close();
    }

    @Test
    public void testAsyncLookupWithFailure() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT()}, (String[])new String[]{"pt", "k", "v"});
        Options options = new Options();
        options.set("bucket", "1");
        options.set("changelog-producer", "lookup");
        FileStoreTable fileStoreTable = this.createFileStoreTable(rowType, Arrays.asList("pt", "k"), Collections.singletonList("k"), options);
        RowDataStoreWriteOperator.Factory operatorFactory = this.getAsyncLookupWriteOperatorFactory(fileStoreTable, false);
        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness = this.createHarness(operatorFactory);
        TableCommitImpl commit = fileStoreTable.newCommit(this.commitUser);
        TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        harness.setup(serializer);
        harness.open();
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{1, 10, 100}), 1L);
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{2, 20, 200}), 2L);
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{3, 30, 300}), 3L);
        harness.prepareSnapshotPreBarrier(1L);
        harness.snapshot(1L, 10L);
        harness.notifyOfCompletedCheckpoint(1L);
        this.commitAll(harness, commit, 1L);
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{1, 10, 101}), 11L);
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{3, 30, 301}), 13L);
        harness.prepareSnapshotPreBarrier(2L);
        OperatorSubtaskState state = harness.snapshot(2L, 20L);
        harness.notifyOfCompletedCheckpoint(2L);
        this.commitAll(harness, commit, 2L);
        harness.close();
        operatorFactory = this.getAsyncLookupWriteOperatorFactory(fileStoreTable, true);
        harness = this.createHarness(operatorFactory);
        harness.setup(serializer);
        harness.initializeState(state);
        harness.open();
        harness.prepareSnapshotPreBarrier(3L);
        harness.snapshot(3L, 30L);
        harness.notifyOfCompletedCheckpoint(3L);
        this.commitAll(harness, commit, 3L);
        harness.close();
        commit.close();
        ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
        StreamTableScan scan = readBuilder.newStreamScan();
        List splits = scan.plan().splits();
        TableRead read = readBuilder.newRead();
        RecordReader reader = read.createReader(splits);
        ArrayList actual = new ArrayList();
        reader.forEachRemaining(row -> actual.add(String.format("%s[%d, %d, %d]", row.getRowKind().shortString(), row.getInt(0), row.getInt(1), row.getInt(2))));
        Assertions.assertThat(actual).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, 101]", "+I[2, 20, 200]", "+I[3, 30, 301]"});
    }

    @Test
    public void testGentleLookupWithFailure() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT()}, (String[])new String[]{"pt", "k", "v"});
        int lookupCompactMaxInterval = 5;
        Options options = new Options();
        options.set("bucket", "1");
        options.set("changelog-producer", "lookup");
        options.set(CoreOptions.LOOKUP_COMPACT, (Object)CoreOptions.LookupCompactMode.GENTLE);
        options.set(CoreOptions.LOOKUP_COMPACT_MAX_INTERVAL, (Object)lookupCompactMaxInterval);
        FileStoreTable fileStoreTable = this.createFileStoreTable(rowType, Arrays.asList("pt", "k"), Collections.singletonList("k"), options);
        RowDataStoreWriteOperator.Factory operatorFactory = this.getAsyncLookupWriteOperatorFactory(fileStoreTable, false);
        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness = this.createHarness(operatorFactory);
        TableCommitImpl commit = fileStoreTable.newCommit(this.commitUser);
        TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        harness.setup(serializer);
        harness.open();
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{1, 10, 100}), 1L);
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{2, 20, 200}), 2L);
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{3, 30, 300}), 3L);
        harness.prepareSnapshotPreBarrier(1L);
        harness.snapshot(1L, 10L);
        harness.notifyOfCompletedCheckpoint(1L);
        this.commitAll(harness, commit, 1L);
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{1, 10, 101}), 11L);
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{3, 30, 301}), 13L);
        harness.prepareSnapshotPreBarrier(2L);
        OperatorSubtaskState state = harness.snapshot(2L, 20L);
        harness.notifyOfCompletedCheckpoint(2L);
        this.commitAll(harness, commit, 2L);
        harness.close();
        operatorFactory = this.getAsyncLookupWriteOperatorFactory(fileStoreTable, true);
        harness = this.createHarness(operatorFactory);
        harness.setup(serializer);
        harness.initializeState(state);
        harness.open();
        harness.prepareSnapshotPreBarrier(3L);
        harness.snapshot(3L, 30L);
        harness.notifyOfCompletedCheckpoint(3L);
        this.commitAll(harness, commit, 3L);
        harness.close();
        ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
        StreamTableScan scan = readBuilder.newStreamScan();
        List splits = scan.plan().splits();
        TableRead read = readBuilder.newRead();
        RecordReader reader = read.createReader(splits);
        ArrayList actual = new ArrayList();
        reader.forEachRemaining(row -> actual.add(String.format("%s[%d, %d, %d]", row.getRowKind().shortString(), row.getInt(0), row.getInt(1), row.getInt(2))));
        Assertions.assertThat(actual).isEmpty();
        operatorFactory = this.getAsyncLookupWriteOperatorFactory(fileStoreTable, true);
        harness = this.createHarness(operatorFactory);
        harness.setup(serializer);
        harness.initializeState(state);
        harness.open();
        for (int i = 1; i < lookupCompactMaxInterval; ++i) {
            long checkpointId = i + 3;
            harness.prepareSnapshotPreBarrier(checkpointId);
            harness.snapshot(checkpointId, (long)(i + 30));
            harness.notifyOfCompletedCheckpoint(checkpointId);
            this.commitAll(harness, commit, checkpointId);
        }
        harness.close();
        commit.close();
        readBuilder = fileStoreTable.newReadBuilder();
        scan = readBuilder.newStreamScan();
        splits = scan.plan().splits();
        read = readBuilder.newRead();
        reader = read.createReader(splits);
        ArrayList finalResult = new ArrayList();
        reader.forEachRemaining(row -> finalResult.add(String.format("%s[%d, %d, %d]", row.getRowKind().shortString(), row.getInt(0), row.getInt(1), row.getInt(2))));
        Assertions.assertThat(finalResult).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, 101]", "+I[2, 20, 200]", "+I[3, 30, 301]"});
    }

    @Test
    public void testChangelog() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT()}, (String[])new String[]{"pt", "k", "v"});
        Options options = new Options();
        options.set("bucket", "1");
        options.set("changelog-producer", "input");
        FileStoreTable fileStoreTable = this.createFileStoreTable(rowType, Arrays.asList("pt", "k"), Collections.singletonList("k"), options);
        RowDataStoreWriteOperator.Factory operatorFactory = this.getStoreSinkWriteOperatorFactory(fileStoreTable);
        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness = this.createHarness(operatorFactory);
        TableCommitImpl commit = fileStoreTable.newCommit(this.commitUser);
        TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        harness.setup(serializer);
        harness.open();
        harness.processElement((Object)GenericRow.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 10, 100}), 1L);
        harness.processElement((Object)GenericRow.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{1, 10, 200}), 2L);
        harness.processElement((Object)GenericRow.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 10, 300}), 3L);
        harness.prepareSnapshotPreBarrier(1L);
        harness.snapshot(1L, 10L);
        harness.notifyOfCompletedCheckpoint(1L);
        this.commitAll(harness, commit, 1L);
        harness.close();
        commit.close();
        ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
        StreamTableScan scan = readBuilder.newStreamScan();
        scan.restore(Long.valueOf(1L));
        List splits = scan.plan().splits();
        TableRead read = readBuilder.newRead();
        RecordReader reader = read.createReader(splits);
        ArrayList actual = new ArrayList();
        reader.forEachRemaining(row -> actual.add(String.format("%s[%d, %d, %d]", row.getRowKind().shortString(), row.getInt(0), row.getInt(1), row.getInt(2))));
        Assertions.assertThat(actual).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, 100]", "-D[1, 10, 200]", "+I[1, 10, 300]"});
    }

    @Test
    public void testNumWritersMetric() throws Exception {
        String tableName = this.tablePath.getName();
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT()}, (String[])new String[]{"pt", "k", "v"});
        Options options = new Options();
        options.set("bucket", "1");
        options.set("write-buffer-size", "256 b");
        options.set("write-buffer-spillable", "false");
        options.set("page-size", "32 b");
        FileStoreTable fileStoreTable = this.createFileStoreTable(rowType, Arrays.asList("pt", "k"), Collections.singletonList("pt"), options);
        TableCommitImpl commit = fileStoreTable.newCommit(this.commitUser);
        RowDataStoreWriteOperator.Factory operatorFactory = this.getStoreSinkWriteOperatorFactory(fileStoreTable);
        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness = this.createHarness(operatorFactory);
        TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        harness.setup(serializer);
        harness.open();
        OperatorMetricGroup metricGroup = harness.getOneInputOperator().getMetricGroup();
        MetricGroup writerBufferMetricGroup = metricGroup.addGroup("paimon").addGroup("table", tableName).addGroup("writerBuffer");
        Gauge numWriters = TestingMetricUtils.getGauge(writerBufferMetricGroup, "numWriters");
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{1, 10, 100}), 1L);
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{2, 20, 200}), 2L);
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{3, 30, 300}), 3L);
        Assertions.assertThat((Integer)((Integer)numWriters.getValue())).isEqualTo(3);
        harness.prepareSnapshotPreBarrier(1L);
        harness.snapshot(1L, 10L);
        harness.notifyOfCompletedCheckpoint(1L);
        commit.commit(1L, harness.extractOutputValues().stream().map(c -> (CommitMessage)c.wrappedCommittable()).collect(Collectors.toList()));
        Assertions.assertThat((Integer)((Integer)numWriters.getValue())).isEqualTo(3);
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{1, 11, 110}), 11L);
        harness.processElement((Object)GenericRow.of((Object[])new Object[]{3, 13, 130}), 13L);
        Assertions.assertThat((Integer)((Integer)numWriters.getValue())).isEqualTo(3);
        harness.prepareSnapshotPreBarrier(2L);
        harness.snapshot(2L, 20L);
        harness.notifyOfCompletedCheckpoint(2L);
        commit.commit(2L, harness.extractOutputValues().stream().map(c -> (CommitMessage)c.wrappedCommittable()).collect(Collectors.toList()));
        harness.prepareSnapshotPreBarrier(3L);
        harness.snapshot(3L, 30L);
        harness.notifyOfCompletedCheckpoint(3L);
        Assertions.assertThat((Integer)((Integer)numWriters.getValue())).isEqualTo(2);
        harness.endInput();
        harness.close();
        commit.close();
    }

    private RowDataStoreWriteOperator.Factory getStoreSinkWriteOperatorFactory(FileStoreTable fileStoreTable) {
        return new RowDataStoreWriteOperator.Factory(fileStoreTable, null, (StoreSinkWrite.Provider & Serializable)(table, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl(table, commitUser, state, ioManager, false, false, true, memoryPool, metricGroup), this.commitUser);
    }

    private RowDataStoreWriteOperator.Factory getAsyncLookupWriteOperatorFactory(FileStoreTable fileStoreTable, boolean waitCompaction) {
        return new RowDataStoreWriteOperator.Factory(fileStoreTable, null, (StoreSinkWrite.Provider & Serializable)(table, commitUser, state, ioManager, memoryPool, metricGroup) -> new AsyncLookupSinkWrite(table, commitUser, state, ioManager, false, waitCompaction, true, memoryPool, metricGroup), this.commitUser);
    }

    private void commitAll(OneInputStreamOperatorTestHarness<InternalRow, Committable> harness, TableCommitImpl commit, long commitIdentifier) {
        ArrayList<CommitMessage> commitMessages = new ArrayList<CommitMessage>();
        while (!harness.getOutput().isEmpty()) {
            Committable committable = (Committable)((StreamRecord)harness.getOutput().poll()).getValue();
            Assertions.assertThat((Comparable)committable.kind()).isEqualTo((Object)Committable.Kind.FILE);
            commitMessages.add((CommitMessage)committable.wrappedCommittable());
        }
        commit.commit(commitIdentifier, commitMessages);
    }

    private FileStoreTable createFileStoreTable(RowType rowType, List<String> primaryKeys, List<String> partitionKeys, Options conf) throws Exception {
        conf.set(CoreOptions.PATH, (Object)this.tablePath.toString());
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), this.tablePath);
        schemaManager.createTable(new Schema(rowType.getFields(), partitionKeys, primaryKeys, conf.toMap(), ""));
        return FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Options)conf);
    }

    private OneInputStreamOperatorTestHarness<InternalRow, Committable> createHarness(RowDataStoreWriteOperator.Factory operatorFactory) throws Exception {
        InternalTypeInfo internalRowInternalTypeInfo = new InternalTypeInfo((InternalTypeSerializer)new InternalRowTypeSerializer(RowType.builder().build()));
        return new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)operatorFactory, internalRowInternalTypeInfo.createSerializer(new ExecutionConfig()));
    }
}

