/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.function.Predicate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.cdc.CdcDynamicBucketWriteOperator;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator;
import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class CdcDynamicBucketWriteOperatorTest {
    @TempDir
    java.nio.file.Path tempDir;
    private Path tablePath;
    private String commitUser;

    @BeforeEach
    public void before() {
        this.tablePath = new Path("traceable://" + this.tempDir.toString());
        this.commitUser = UUID.randomUUID().toString();
    }

    @AfterEach
    public void after() {
        Predicate<Path> pathPredicate = path -> path.toString().contains(this.tempDir.toString());
        Assertions.assertThat((List)TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
        Assertions.assertThat((List)TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
    }

    @Test
    public void testCompactionMetrics() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT()}, (String[])new String[]{"pk", "col1"});
        FileStoreTable table = this.createFileStoreTable(rowType, Collections.emptyList(), Collections.singletonList("pk"));
        OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>, Committable> harness = this.createTestHarness(table);
        CdcDynamicBucketWriteOperator operator = (CdcDynamicBucketWriteOperator)harness.getOneInputOperator();
        harness.open();
        MetricGroup compactionMetricGroup = operator.getMetricGroup().addGroup("paimon").addGroup("table", table.name()).addGroup("partition", "_").addGroup("bucket", "0").addGroup("compaction");
        long timestamp = 0L;
        long cpId = 1L;
        HashMap<String, String> fields = new HashMap<String, String>();
        fields.put("pk", "1");
        fields.put("col1", "2");
        harness.processElement((Object)Tuple2.of((Object)new CdcRecord(RowKind.INSERT, fields), (Object)0), timestamp++);
        operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
        operator.getWrite().prepareCommit(true, cpId++);
        Assertions.assertThat((Object)MetricUtils.getGauge((MetricGroup)compactionMetricGroup, (String)"lastTableFilesCompactedBefore").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)MetricUtils.getGauge((MetricGroup)compactionMetricGroup, (String)"lastTableFilesCompactedAfter").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)MetricUtils.getGauge((MetricGroup)compactionMetricGroup, (String)"lastChangelogFilesCompacted").getValue()).isEqualTo((Object)0L);
        fields.put("pk", "1");
        fields.put("col1", "3");
        harness.processElement((Object)Tuple2.of((Object)new CdcRecord(RowKind.INSERT, fields), (Object)0), timestamp);
        operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
        operator.getWrite().prepareCommit(true, cpId);
        Assertions.assertThat((Object)MetricUtils.getGauge((MetricGroup)compactionMetricGroup, (String)"lastTableFilesCompactedBefore").getValue()).isEqualTo((Object)2L);
        Assertions.assertThat((Object)MetricUtils.getGauge((MetricGroup)compactionMetricGroup, (String)"lastTableFilesCompactedAfter").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)MetricUtils.getGauge((MetricGroup)compactionMetricGroup, (String)"lastChangelogFilesCompacted").getValue()).isEqualTo((Object)0L);
        harness.close();
        Assertions.assertThat((Object)MetricUtils.getGauge((MetricGroup)compactionMetricGroup, (String)"lastTableFilesCompactedBefore")).isNull();
        Assertions.assertThat((Object)MetricUtils.getGauge((MetricGroup)compactionMetricGroup, (String)"lastTableFilesCompactedAfter")).isNull();
        Assertions.assertThat((Object)MetricUtils.getGauge((MetricGroup)compactionMetricGroup, (String)"lastChangelogFilesCompacted")).isNull();
    }

    private OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>, Committable> createTestHarness(FileStoreTable table) throws Exception {
        CdcDynamicBucketWriteOperator operator = new CdcDynamicBucketWriteOperator(table, (StoreSinkWrite.Provider & Serializable)(t, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl(t, commitUser, state, ioManager, false, false, true, memoryPool, metricGroup), this.commitUser);
        JavaSerializer inputSerializer = new JavaSerializer();
        TypeSerializer outputSerializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (TypeSerializer)inputSerializer);
        harness.setup(outputSerializer);
        return harness;
    }

    private FileStoreTable createFileStoreTable(RowType rowType, List<String> partitions, List<String> primaryKeys) throws Exception {
        Options conf = new Options();
        conf.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME, (Object)Duration.ofMillis(10L));
        TableSchema tableSchema = SchemaUtils.forceCommit((SchemaManager)new SchemaManager((FileIO)LocalFileIO.create(), this.tablePath), (Schema)new Schema(rowType.getFields(), partitions, primaryKeys, conf.toMap(), ""));
        return FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Path)this.tablePath, (TableSchema)tableSchema);
    }
}

