/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator;
import org.apache.paimon.flink.sink.FileStoreSink;
import org.apache.paimon.flink.sink.RowDataStoreWriteOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class FlinkSinkTest {
    @TempDir
    java.nio.file.Path tempPath;
    protected static final RowType ROW_TYPE = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT()}, (String[])new String[]{"pk", "pt0"});

    @Test
    public void testOptimizeKeyValueWriterForBatch() throws Exception {
        FileStoreTable fileStoreTable = this.createFileStoreTable();
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        Assertions.assertThat((boolean)this.testSpillable(streamExecutionEnvironment, fileStoreTable)).isTrue();
        streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        Assertions.assertThat((boolean)this.testSpillable(streamExecutionEnvironment, fileStoreTable)).isFalse();
    }

    @Test
    public void testCompactionMetrics() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        RowDataStoreWriteOperator operator = this.createWriteOperator(table);
        OneInputStreamOperatorTestHarness<InternalRow, Committable> testHarness = this.createTestHarness((OneInputStreamOperator<InternalRow, Committable>)operator);
        MetricGroup compactionMetricGroup = operator.getMetricGroup().addGroup("paimon").addGroup("table", table.name()).addGroup("partition", "_").addGroup("bucket", "0").addGroup("compaction");
        testHarness.open();
        GenericRow row1 = GenericRow.of((Object[])new Object[]{1, 2});
        GenericRow row2 = GenericRow.of((Object[])new Object[]{2, 3});
        GenericRow row3 = GenericRow.of((Object[])new Object[]{3, 4});
        GenericRow row4 = GenericRow.of((Object[])new Object[]{4, 5});
        ArrayList<StreamRecord> streamRecords = new ArrayList<StreamRecord>();
        streamRecords.add(new StreamRecord((Object)row1));
        streamRecords.add(new StreamRecord((Object)row2));
        streamRecords.add(new StreamRecord((Object)row3));
        long cpId = 1L;
        testHarness.processElements(streamRecords);
        operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
        operator.write.prepareCommit(true, cpId++);
        Assertions.assertThat((Object)MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted").getValue()).isEqualTo((Object)0L);
        testHarness.processElement((Object)row4, 0L);
        operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
        operator.write.prepareCommit(true, cpId);
        Assertions.assertThat((Object)MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore").getValue()).isEqualTo((Object)2L);
        Assertions.assertThat((Object)MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted").getValue()).isEqualTo((Object)0L);
        testHarness.close();
        Assertions.assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore")).isNull();
        Assertions.assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter")).isNull();
        Assertions.assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted")).isNull();
    }

    @Test
    public void testDynamicBucketCompactionMetrics() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        DynamicBucketRowWriteOperator operator = this.createDynamicBucketWriteOperator(table);
        OneInputStreamOperatorTestHarness<Tuple2<InternalRow, Integer>, Committable> testHarness = this.createDynamicBucketTestHarness((OneInputStreamOperator<Tuple2<InternalRow, Integer>, Committable>)operator);
        MetricGroup compactionMetricGroup = operator.getMetricGroup().addGroup("paimon").addGroup("table", table.name()).addGroup("partition", "_").addGroup("bucket", "0").addGroup("compaction");
        testHarness.open();
        GenericRow row1 = GenericRow.of((Object[])new Object[]{1, 2});
        GenericRow row2 = GenericRow.of((Object[])new Object[]{2, 3});
        GenericRow row3 = GenericRow.of((Object[])new Object[]{3, 4});
        GenericRow row4 = GenericRow.of((Object[])new Object[]{4, 5});
        ArrayList<StreamRecord> streamRecords = new ArrayList<StreamRecord>();
        streamRecords.add(new StreamRecord((Object)Tuple2.of((Object)row1, (Object)0)));
        streamRecords.add(new StreamRecord((Object)Tuple2.of((Object)row2, (Object)1)));
        streamRecords.add(new StreamRecord((Object)Tuple2.of((Object)row3, (Object)2)));
        long cpId = 1L;
        testHarness.processElements(streamRecords);
        operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
        operator.write.prepareCommit(true, cpId++);
        Assertions.assertThat((Object)MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted").getValue()).isEqualTo((Object)0L);
        testHarness.processElement((Object)Tuple2.of((Object)row4, (Object)0), 0L);
        operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
        operator.write.prepareCommit(true, cpId);
        Assertions.assertThat((Object)MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore").getValue()).isEqualTo((Object)2L);
        Assertions.assertThat((Object)MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted").getValue()).isEqualTo((Object)0L);
        testHarness.close();
        Assertions.assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore")).isNull();
        Assertions.assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter")).isNull();
        Assertions.assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted")).isNull();
    }

    private boolean testSpillable(StreamExecutionEnvironment streamExecutionEnvironment, FileStoreTable fileStoreTable) throws Exception {
        DataStreamSource source = streamExecutionEnvironment.fromCollection(Collections.singletonList(GenericRow.of((Object[])new Object[]{1, 1})));
        FileStoreSink flinkSink = new FileStoreSink(fileStoreTable, null, null);
        SingleOutputStreamOperator written = flinkSink.doWrite((DataStream)source, "123", Integer.valueOf(1));
        RowDataStoreWriteOperator operator = (RowDataStoreWriteOperator)((SimpleOperatorFactory)((OneInputTransformation)written.getTransformation()).getOperatorFactory()).getOperator();
        StateInitializationContextImpl context = new StateInitializationContextImpl(null, (OperatorStateStore)new MockOperatorStateStore(){

            public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
                return this.getListState(stateDescriptor);
            }
        }, null, null, null);
        operator.initStateAndWriter((StateInitializationContext)context, (a, b, c) -> true, (IOManager)new IOManagerAsync(), "123");
        return ((KeyValueFileStoreWrite)((StoreSinkWriteImpl)operator.write).write.getWrite()).bufferSpillable();
    }

    private FileStoreTable createFileStoreTable() throws Exception {
        Path tablePath = new Path(this.tempPath.toString());
        Options conf = new Options();
        conf.set(CoreOptions.PATH, (Object)tablePath.toString());
        TableSchema tableSchema = SchemaUtils.forceCommit((SchemaManager)new SchemaManager((FileIO)LocalFileIO.create(), tablePath), (Schema)new Schema(ROW_TYPE.getFields(), Collections.emptyList(), Arrays.asList("pk"), conf.toMap(), ""));
        return FileStoreTableFactory.create((FileIO)FileIOFinder.find((Path)tablePath), (Path)tablePath, (TableSchema)tableSchema, (Options)conf, (CatalogEnvironment)new CatalogEnvironment(Lock.emptyFactory(), null, null));
    }

    private OneInputStreamOperatorTestHarness<InternalRow, Committable> createTestHarness(OneInputStreamOperator<InternalRow, Committable> operator) throws Exception {
        TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness(operator);
        harness.setup(serializer);
        return harness;
    }

    private OneInputStreamOperatorTestHarness<Tuple2<InternalRow, Integer>, Committable> createDynamicBucketTestHarness(OneInputStreamOperator<Tuple2<InternalRow, Integer>, Committable> operator) throws Exception {
        TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness(operator);
        harness.setup(serializer);
        return harness;
    }

    protected RowDataStoreWriteOperator createWriteOperator(FileStoreTable table) {
        return new RowDataStoreWriteOperator(table, null, (StoreSinkWrite.Provider & Serializable)(t, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl(t, commitUser, state, ioManager, false, false, true, memoryPool, metricGroup), "test");
    }

    protected DynamicBucketRowWriteOperator createDynamicBucketWriteOperator(FileStoreTable table) {
        return new DynamicBucketRowWriteOperator(table, (StoreSinkWrite.Provider & Serializable)(t, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl(t, commitUser, state, ioManager, false, false, true, memoryPool, metricGroup), "test");
    }
}

