/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.InternalOperatorCoordinatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSplitEnumeratorMetricGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.util.AbstractID;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.source.ContinuousFileSplitEnumerator;
import org.apache.paimon.flink.source.ContinuousFileStoreSource;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.FlinkSource;
import org.apache.paimon.flink.source.LogHybridSourceFactory;
import org.apache.paimon.flink.source.StaticFileStoreSource;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class FileStoreSourceMetricsTest {
    private FileStoreTable table;
    private TestingSplitEnumeratorContextWithRegisteringGroup context;
    private MetricGroup scanMetricGroup;

    @BeforeEach
    public void before(@TempDir java.nio.file.Path path) throws Exception {
        LocalFileIO fileIO = LocalFileIO.create();
        Path tablePath = new Path(path.toString());
        SchemaManager schemaManager = new SchemaManager((FileIO)fileIO, tablePath);
        TableSchema tableSchema = schemaManager.createTable(Schema.newBuilder().column("a", (DataType)DataTypes.INT()).column("b", (DataType)DataTypes.BIGINT()).build());
        this.table = FileStoreTableFactory.create((FileIO)fileIO, (Path)tablePath, (TableSchema)tableSchema);
        this.context = new TestingSplitEnumeratorContextWithRegisteringGroup(1);
        this.scanMetricGroup = this.context.metricGroup().addGroup("paimon").addGroup("table", this.table.name()).addGroup("scan");
    }

    @Test
    public void staticFileStoreSourceScanMetricsTest() throws Exception {
        this.writeOnce();
        StaticFileStoreSource staticFileStoreSource = new StaticFileStoreSource(this.table.newReadBuilder(), null, 1, FlinkConnectorOptions.SplitAssignMode.FAIR);
        staticFileStoreSource.restoreEnumerator((SplitEnumeratorContext)this.context, null);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScannedManifests").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScanResultedTableFiles").getValue()).isEqualTo((Object)1L);
    }

    @Test
    public void continuousFileStoreSourceScanMetricsTest() throws Exception {
        this.writeOnce();
        ContinuousFileStoreSource continuousFileStoreSource = new ContinuousFileStoreSource(this.table.newReadBuilder(), this.table.options(), null);
        ContinuousFileSplitEnumerator enumerator = (ContinuousFileSplitEnumerator)continuousFileStoreSource.restoreEnumerator((SplitEnumeratorContext)this.context, null);
        enumerator.scanNextSnapshot();
        Assertions.assertThat((long)TestingMetricUtils.getHistogram(this.scanMetricGroup, "scanDuration").getCount()).isEqualTo(1L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScannedManifests").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScanResultedTableFiles").getValue()).isEqualTo((Object)1L);
        this.writeAgain();
        enumerator.scanNextSnapshot();
        Assertions.assertThat((long)TestingMetricUtils.getHistogram(this.scanMetricGroup, "scanDuration").getCount()).isEqualTo(2L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScannedManifests").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScanResultedTableFiles").getValue()).isEqualTo((Object)1L);
    }

    @Test
    public void logHybridFileStoreSourceScanMetricsTest() throws Exception {
        this.writeOnce();
        FlinkSource logHybridFileStoreSource = LogHybridSourceFactory.buildHybridFirstSource((Table)this.table, (int[][])null, null);
        logHybridFileStoreSource.restoreEnumerator((SplitEnumeratorContext)this.context, null);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScannedManifests").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScanResultedTableFiles").getValue()).isEqualTo((Object)1L);
    }

    private void writeOnce() throws Exception {
        TableWriteImpl writer = this.table.newWrite("test");
        TableCommitImpl commit = this.table.newCommit("test");
        writer.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 2L}));
        writer.write((InternalRow)GenericRow.of((Object[])new Object[]{3, 4L}));
        writer.write((InternalRow)GenericRow.of((Object[])new Object[]{5, 6L}));
        writer.write((InternalRow)GenericRow.of((Object[])new Object[]{7, 8L}));
        writer.write((InternalRow)GenericRow.of((Object[])new Object[]{9, 10L}));
        commit.commit(writer.prepareCommit());
        commit.close();
        writer.close();
    }

    private void writeAgain() throws Exception {
        TableWriteImpl writer = this.table.newWrite("test");
        TableCommitImpl commit = this.table.newCommit("test");
        writer.write((InternalRow)GenericRow.of((Object[])new Object[]{10, 2L}));
        writer.write((InternalRow)GenericRow.of((Object[])new Object[]{13, 24L}));
        writer.write((InternalRow)GenericRow.of((Object[])new Object[]{15, 26L}));
        writer.write((InternalRow)GenericRow.of((Object[])new Object[]{17, 28L}));
        writer.write((InternalRow)GenericRow.of((Object[])new Object[]{19, 10L}));
        commit.commit(writer.prepareCommit());
        commit.close();
        writer.close();
    }

    private class TestingSplitEnumeratorContextWithRegisteringGroup
    extends TestingSplitEnumeratorContext<FileStoreSourceSplit> {
        private final SplitEnumeratorMetricGroup metricGroup;

        public TestingSplitEnumeratorContextWithRegisteringGroup(int parallelism) {
            super(parallelism);
            JobID jobId = new JobID();
            JobVertexID jobVertexId = new JobVertexID();
            OperatorID operatorId = new OperatorID();
            TestingMetricRegistry registry = TestingMetricRegistry.builder().build();
            JobManagerOperatorMetricGroup jmJobGroup = JobManagerMetricGroup.createJobManagerMetricGroup((MetricRegistry)registry, (String)"localhost").addJob(jobId, "myJobName").getOrAddOperator((AbstractID)jobVertexId, "taskName", operatorId, "opName");
            InternalOperatorCoordinatorMetricGroup operatorCoordinatorMetricGroup = new InternalOperatorCoordinatorMetricGroup((MetricGroup)jmJobGroup);
            InternalSplitEnumeratorMetricGroup splitEnumeratorMetricGroup = new InternalSplitEnumeratorMetricGroup((MetricGroup)operatorCoordinatorMetricGroup);
            this.metricGroup = splitEnumeratorMetricGroup;
        }

        public SplitEnumeratorMetricGroup metricGroup() {
            return this.metricGroup;
        }
    }
}

