/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.nio.file.Path;
import java.util.Collection;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.paimon.flink.util.MiniClusterWithClientExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public class SourceMetricsITCase {
    private static final int DEFAULT_PARALLELISM = 4;
    private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
    @RegisterExtension
    protected static final MiniClusterWithClientExtension MINI_CLUSTER_EXTENSION = new MiniClusterWithClientExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).setConfiguration(reporter.addToConfiguration(new Configuration())).build());
    @TempDir
    Path tempPath;

    @AfterEach
    public final void cleanupRunningJobs() throws Exception {
        RestClusterClient<MiniClusterClient.MiniClusterId> clusterClient = MINI_CLUSTER_EXTENSION.createRestClusterClient();
        for (JobStatusMessage path : (Collection)clusterClient.listJobs().get()) {
            if (path.getJobState().isTerminalState()) continue;
            try {
                clusterClient.cancel(path.getJobId()).get();
            }
            catch (Exception exception) {}
        }
    }

    @Test
    public void testNumRecordsIn() throws Exception {
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.newInstance().inBatchMode().build());
        tEnv.executeSql("CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + this.tempPath + "' )");
        tEnv.executeSql("USE CATALOG mycat");
        tEnv.executeSql("CREATE TABLE T ( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED )");
        tEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 30)").await();
        tEnv.executeSql("CREATE TEMPORARY TABLE B ( k INT, v INT ) WITH ( 'connector' = 'blackhole' )");
        TableResult tableResult = tEnv.executeSql("INSERT INTO B SELECT * FROM T/*+ OPTIONS('scan.parallelism'='1')*/");
        JobClient client = (JobClient)tableResult.getJobClient().get();
        JobID jobId = client.getJobID();
        tableResult.await();
        for (OperatorMetricGroup group : reporter.findOperatorMetricGroups(jobId, "Source: T")) {
            Assertions.assertThat((long)group.getIOMetricGroup().getNumRecordsInCounter().getCount()).isEqualTo(3L);
        }
    }

    @Test
    public void testNumRecordsInWithConsumerId() throws Exception {
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
        tEnv.executeSql("CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + this.tempPath + "' )");
        tEnv.executeSql("USE CATALOG mycat");
        tEnv.executeSql("CREATE TABLE T ( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'changelog-producer' = 'lookup' )");
        tEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 30)").await();
        tEnv.executeSql("CREATE TEMPORARY TABLE B ( k INT, v INT ) WITH ( 'connector' = 'blackhole' )");
        TableResult tableResult = tEnv.executeSql("INSERT INTO B SELECT * FROM T /*+ OPTIONS('consumer-id' = 'test','consumer.expiration-time'='3h') */");
        JobClient client = (JobClient)tableResult.getJobClient().get();
        JobID jobId = client.getJobID();
        Assertions.assertThat((boolean)this.testNumRecordsInWithConsumerIdChecker(jobId)).isTrue();
        client.cancel().get();
    }

    private boolean testNumRecordsInWithConsumerIdChecker(JobID jobId) throws Exception {
        for (int tries = 1; tries <= 20; ++tries) {
            for (OperatorMetricGroup group : reporter.findOperatorMetricGroups(jobId, "T\\[")) {
                try {
                    long numRecordsIn = group.getIOMetricGroup().getNumRecordsInCounter().getCount();
                    if (numRecordsIn != 3L) continue;
                    return true;
                }
                catch (Exception exception) {
                }
            }
            Thread.sleep(1000L);
        }
        return false;
    }
}

