/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.RowDataStoreWriteOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.flink.utils.InternalTypeSerializer;
import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public abstract class WriterOperatorTestBase {
    private static final RowType ROW_TYPE = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT()}, (String[])new String[]{"a", "b"});
    @TempDir
    public java.nio.file.Path tempDir;
    protected Path tablePath;

    @BeforeEach
    public void before() {
        this.tablePath = new Path(this.tempDir.toString());
    }

    @Test
    public void testMetric() throws Exception {
        String tableName = this.tablePath.getName();
        FileStoreTable fileStoreTable = this.createFileStoreTable();
        RowDataStoreWriteOperator rowDataStoreWriteOperator = WriterOperatorTestBase.getRowDataStoreWriteOperator(fileStoreTable);
        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness = WriterOperatorTestBase.createWriteOperatorHarness(fileStoreTable, rowDataStoreWriteOperator);
        TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        harness.setup(serializer);
        harness.open();
        int size = 10;
        for (int i = 0; i < size; ++i) {
            GenericRow row = GenericRow.of((Object[])new Object[]{1, 1});
            harness.processElement((Object)row, 1L);
        }
        harness.prepareSnapshotPreBarrier(1L);
        harness.snapshot(1L, 2L);
        harness.notifyOfCompletedCheckpoint(1L);
        OperatorMetricGroup metricGroup = rowDataStoreWriteOperator.getMetricGroup();
        MetricGroup writerMetricGroup = metricGroup.addGroup("paimon").addGroup("table", tableName).addGroup("partition", "_").addGroup("bucket", "0").addGroup("writer");
        Counter writeRecordCount = MetricUtils.getCounter(writerMetricGroup, "writeRecordCount");
        Assertions.assertThat((long)writeRecordCount.getCount()).isEqualTo((long)size);
        Histogram flushCostMS = MetricUtils.getHistogram(writerMetricGroup, "flushCostMillis");
        Assertions.assertThat((long)flushCostMS.getCount()).isGreaterThan(0L);
        Histogram prepareCommitCostMS = MetricUtils.getHistogram(writerMetricGroup, "prepareCommitCostMillis");
        Assertions.assertThat((long)prepareCommitCostMS.getCount()).isGreaterThan(0L);
        MetricGroup writerBufferMetricGroup = metricGroup.addGroup("paimon").addGroup("table", tableName).addGroup("writerBuffer");
        Gauge bufferPreemptCount = MetricUtils.getGauge(writerBufferMetricGroup, "bufferPreemptCount");
        Assertions.assertThat((Long)((Long)bufferPreemptCount.getValue())).isEqualTo(0L);
        Gauge totalWriteBufferSizeByte = MetricUtils.getGauge(writerBufferMetricGroup, "totalWriteBufferSizeByte");
        Assertions.assertThat((Long)((Long)totalWriteBufferSizeByte.getValue())).isEqualTo(256L);
        GenericRow row = GenericRow.of((Object[])new Object[]{1, 1});
        harness.processElement((Object)row, 1L);
        Gauge usedWriteBufferSizeByte = MetricUtils.getGauge(writerBufferMetricGroup, "usedWriteBufferSizeByte");
        Assertions.assertThat((Long)((Long)usedWriteBufferSizeByte.getValue())).isGreaterThan(0L);
    }

    @NotNull
    private static OneInputStreamOperatorTestHarness<InternalRow, Committable> createWriteOperatorHarness(FileStoreTable fileStoreTable, RowDataStoreWriteOperator operator) throws Exception {
        InternalTypeInfo internalRowInternalTypeInfo = new InternalTypeInfo((InternalTypeSerializer)new InternalRowTypeSerializer(ROW_TYPE));
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, internalRowInternalTypeInfo.createSerializer(new ExecutionConfig()));
        return harness;
    }

    @NotNull
    private static RowDataStoreWriteOperator getRowDataStoreWriteOperator(FileStoreTable fileStoreTable) {
        StoreSinkWrite.Provider & Serializable provider = (StoreSinkWrite.Provider & Serializable)(table, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl(table, commitUser, state, ioManager, false, false, true, memoryPool, metricGroup);
        RowDataStoreWriteOperator operator = new RowDataStoreWriteOperator(fileStoreTable, null, (StoreSinkWrite.Provider)provider, "test");
        return operator;
    }

    abstract void setTableConfig(Options var1);

    protected FileStoreTable createFileStoreTable() throws Exception {
        Options conf = new Options();
        conf.set(CoreOptions.PATH, (Object)this.tablePath.toString());
        this.setTableConfig(conf);
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), this.tablePath);
        List<String> primaryKeys = WriterOperatorTestBase.setKeys(conf, (ConfigOption<String>)CoreOptions.PRIMARY_KEY);
        List<String> paritionKeys = WriterOperatorTestBase.setKeys(conf, (ConfigOption<String>)CoreOptions.PARTITION);
        schemaManager.createTable(new Schema(ROW_TYPE.getFields(), paritionKeys, primaryKeys, conf.toMap(), ""));
        return FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Options)conf);
    }

    @NotNull
    private static List<String> setKeys(Options conf, ConfigOption<String> primaryKey) {
        List<String> primaryKeys = Optional.ofNullable(conf.get(CoreOptions.PRIMARY_KEY)).map(key -> Arrays.asList(key.split(","))).orElse(Collections.emptyList());
        conf.remove(primaryKey.key());
        return primaryKeys;
    }
}

