/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil;
import org.apache.flink.table.gateway.workflow.scheduler.EmbeddedQuartzScheduler;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.refresh.ContinuousRefreshHandler;
import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer;
import org.apache.flink.table.shaded.org.quartz.JobKey;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.paimon.utils.CommonTestUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public class MaterializedTableITCase {
    private static final String FILE_CATALOG_STORE = "file_store";
    private static final String TEST_CATALOG_PREFIX = "test_catalog";
    protected static final String TEST_DEFAULT_DATABASE = "default";
    private static final AtomicLong COUNTER = new AtomicLong(0L);
    @RegisterExtension
    @Order(value=1)
    static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(2).build());
    @RegisterExtension
    @Order(value=2)
    protected static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = new SqlGatewayServiceExtension(() -> ((MiniClusterExtension)MINI_CLUSTER).getClientConfiguration());
    @RegisterExtension
    @Order(value=3)
    protected static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension(() -> Executors.newCachedThreadPool((ThreadFactory)new ExecutorThreadFactory("SqlGatewayService Test Pool", (Thread.UncaughtExceptionHandler)IgnoreExceptionHandler.INSTANCE)));
    @RegisterExtension
    @Order(value=4)
    protected static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = new SqlGatewayRestEndpointExtension(() -> ((SqlGatewayServiceExtension)SQL_GATEWAY_SERVICE_EXTENSION).getService());
    protected static SqlGatewayServiceImpl service;
    private static SessionEnvironment defaultSessionEnvironment;
    private static Path baseCatalogPath;
    private String paimonWarehousePath;
    protected String paimonCatalogName;
    protected SessionHandle sessionHandle;
    protected RestClusterClient<?> restClusterClient;

    @BeforeAll
    static void setUp(@TempDir Path temporaryFolder) throws Exception {
        service = (SqlGatewayServiceImpl)SQL_GATEWAY_SERVICE_EXTENSION.getService();
        Path fileCatalogStore = temporaryFolder.resolve(FILE_CATALOG_STORE);
        Files.createDirectory(fileCatalogStore, new FileAttribute[0]);
        HashMap<String, String> catalogStoreOptions = new HashMap<String, String>();
        catalogStoreOptions.put(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND.key(), "file");
        catalogStoreOptions.put("table.catalog-store.file.path", fileCatalogStore.toString());
        baseCatalogPath = temporaryFolder.resolve(TEST_CATALOG_PREFIX);
        Files.createDirectory(baseCatalogPath, new FileAttribute[0]);
        HashMap<String, String> workflowSchedulerConfig = new HashMap<String, String>();
        workflowSchedulerConfig.put(FactoryUtil.WORKFLOW_SCHEDULER_TYPE.key(), "embedded");
        workflowSchedulerConfig.put("sql-gateway.endpoint.rest.address", SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress());
        workflowSchedulerConfig.put("sql-gateway.endpoint.rest.port", String.valueOf(SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort()));
        HashMap<String, String> testConf = new HashMap<String, String>();
        testConf.put("k1", "v1");
        testConf.put("k2", "v2");
        defaultSessionEnvironment = SessionEnvironment.newBuilder().addSessionConfig(catalogStoreOptions).addSessionConfig(workflowSchedulerConfig).addSessionConfig(testConf).setSessionEndpointVersion(new EndpointVersion(){}).build();
    }

    @BeforeEach
    void before(@InjectClusterClient RestClusterClient<?> injectClusterClient) throws Exception {
        String randomStr = String.valueOf(COUNTER.incrementAndGet());
        Path fileCatalogPath = baseCatalogPath.resolve(randomStr);
        Files.createDirectory(fileCatalogPath, new FileAttribute[0]);
        this.paimonWarehousePath = fileCatalogPath.toString();
        this.paimonCatalogName = TEST_CATALOG_PREFIX + randomStr;
        this.sessionHandle = this.initializeSession();
        this.restClusterClient = injectClusterClient;
    }

    @AfterEach
    void after() throws Exception {
        Set tableInfos = service.listTables(this.sessionHandle, this.paimonCatalogName, TEST_DEFAULT_DATABASE, Collections.singleton(CatalogBaseTable.TableKind.TABLE));
        for (TableInfo tableInfo : tableInfos) {
            ResolvedCatalogBaseTable resolvedTable = service.getTable(this.sessionHandle, tableInfo.getIdentifier());
            if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE != resolvedTable.getTableKind()) continue;
            String dropTableDDL = String.format("DROP MATERIALIZED TABLE %s", tableInfo.getIdentifier().asSerializableString());
            OperationHandle dropTableHandle = service.executeStatement(this.sessionHandle, dropTableDDL, -1L, new Configuration());
            SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, (SessionHandle)this.sessionHandle, (OperationHandle)dropTableHandle);
        }
    }

    @Test
    void testCreateMaterializedTableInContinuousMode() throws Exception {
        String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops PARTITIONED BY (ds)\n WITH(\n   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum,\n  SUM (1) AS pv\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, (SessionHandle)this.sessionHandle, (OperationHandle)materializedTableHandle);
        ResolvedCatalogMaterializedTable actualMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.paimonCatalogName, (String)TEST_DEFAULT_DATABASE, (String)"users_shops"));
        ResolvedSchema expectedSchema = ResolvedSchema.of(Arrays.asList(Column.physical((String)"user_id", (DataType)DataTypes.BIGINT()), Column.physical((String)"shop_id", (DataType)DataTypes.BIGINT()), Column.physical((String)"ds", (DataType)DataTypes.STRING()), Column.physical((String)"payed_buy_fee_sum", (DataType)DataTypes.BIGINT()), Column.physical((String)"pv", (DataType)((DataType)DataTypes.INT().notNull()))));
        Assertions.assertThat((Object)actualMaterializedTable.getResolvedSchema()).isEqualTo((Object)expectedSchema);
        Assertions.assertThat((Duration)actualMaterializedTable.getFreshness()).isEqualTo((Object)Duration.ofSeconds(30L));
        Assertions.assertThat((Comparable)actualMaterializedTable.getLogicalRefreshMode()).isEqualTo((Object)CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
        Assertions.assertThat((Comparable)actualMaterializedTable.getRefreshMode()).isEqualTo((Object)CatalogMaterializedTable.RefreshMode.CONTINUOUS);
        Assertions.assertThat((Comparable)actualMaterializedTable.getRefreshStatus()).isEqualTo((Object)CatalogMaterializedTable.RefreshStatus.ACTIVATED);
        Assertions.assertThat((Optional)actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty();
        Assertions.assertThat((byte[])actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty();
        ContinuousRefreshHandler activeRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(actualMaterializedTable.getSerializedRefreshHandler(), this.getClass().getClassLoader());
        TestUtils.waitUntilAllTasksAreRunning(this.restClusterClient, (JobID)JobID.fromHexString((String)activeRefreshHandler.getJobId()));
        String describeJobDDL = String.format("DESCRIBE JOB '%s'", activeRefreshHandler.getJobId());
        OperationHandle describeJobHandle = service.executeStatement(this.sessionHandle, describeJobDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, (SessionHandle)this.sessionHandle, (OperationHandle)describeJobHandle);
        List jobResults = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, (SessionHandle)this.sessionHandle, (OperationHandle)describeJobHandle);
        Assertions.assertThat((String)((RowData)jobResults.get(0)).getString(2).toString()).isEqualTo("RUNNING");
        long checkpointInterval = this.getCheckpointIntervalConfig(this.restClusterClient, activeRefreshHandler.getJobId());
        Assertions.assertThat((long)checkpointInterval).isEqualTo(30000L);
    }

    @Test
    void testAlterMaterializedTableRefresh() throws Exception {
        long timeout = Duration.ofSeconds(20L).toMillis();
        long pause = Duration.ofSeconds(2L).toMillis();
        ArrayList<Row> data = new ArrayList<Row>();
        data.add(Row.of((Object[])new Object[]{1L, 1L, 1L, "2024-01-01"}));
        data.add(Row.of((Object[])new Object[]{2L, 2L, 2L, "2024-01-02"}));
        data.add(Row.of((Object[])new Object[]{3L, 3L, 3L, "2024-01-02"}));
        this.createAndVerifyCreateMaterializedTableWithData("my_materialized_table", data, Collections.singletonMap("ds", "yyyy-MM-dd"), CatalogMaterializedTable.RefreshMode.CONTINUOUS);
        data.remove(2);
        long currentTime = System.currentTimeMillis();
        String alterStatement = "ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds = '2024-01-02')";
        OperationHandle alterHandle = service.executeStatement(this.sessionHandle, alterStatement, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, (SessionHandle)this.sessionHandle, (OperationHandle)alterHandle);
        List result = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, (SessionHandle)this.sessionHandle, (OperationHandle)alterHandle);
        Assertions.assertThat((int)result.size()).isEqualTo(1);
        String jobId = ((RowData)result.get(0)).getString(0).toString();
        this.verifyRefreshJobCreated(this.restClusterClient, jobId, currentTime);
        try (ExecutionInBatchModeRunner ignored = new ExecutionInBatchModeRunner();){
            CommonTestUtils.waitUtil(() -> this.fetchTableData(this.sessionHandle, "SELECT * FROM my_materialized_table").size() == data.size(), (Duration)Duration.ofMillis(timeout), (Duration)Duration.ofMillis(pause), (String)"Failed to verify the data in materialized table.");
            Assertions.assertThat((int)this.fetchTableData(this.sessionHandle, "SELECT * FROM my_materialized_table where ds = '2024-01-02'").size()).isEqualTo(1);
        }
    }

    @Test
    void testDropMaterializedTable() throws Exception {
        this.createAndVerifyCreateMaterializedTableWithData("users_shops", Collections.emptyList(), Collections.emptyMap(), CatalogMaterializedTable.RefreshMode.FULL);
        JobKey jobKey = JobKey.jobKey((String)("quartz_job_" + ObjectIdentifier.of((String)this.paimonCatalogName, (String)TEST_DEFAULT_DATABASE, (String)"users_shops").asSerializableString()), (String)"default_group");
        EmbeddedQuartzScheduler embeddedWorkflowScheduler = SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getSqlGatewayRestEndpoint().getQuartzScheduler();
        Assertions.assertThat((boolean)embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isTrue();
        String dropTableUsingMaterializedTableDDL = "DROP TABLE users_shops";
        OperationHandle dropTableUsingMaterializedTableHandle = service.executeStatement(this.sessionHandle, dropTableUsingMaterializedTableDDL, -1L, new Configuration());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, (SessionHandle)this.sessionHandle, (OperationHandle)dropTableUsingMaterializedTableHandle)).rootCause().isInstanceOf(ValidationException.class)).hasMessage(String.format("Table with identifier '%s' does not exist.", ObjectIdentifier.of((String)this.paimonCatalogName, (String)TEST_DEFAULT_DATABASE, (String)"users_shops").asSummaryString()));
        String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS users_shops";
        OperationHandle dropMaterializedTableHandle = service.executeStatement(this.sessionHandle, dropMaterializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, (SessionHandle)this.sessionHandle, (OperationHandle)dropMaterializedTableHandle);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.paimonCatalogName, (String)TEST_DEFAULT_DATABASE, (String)"users_shops"))).isInstanceOf(SqlGatewayException.class)).hasMessageContaining("Failed to getTable.");
        Assertions.assertThat((boolean)embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isFalse();
    }

    private long getCheckpointIntervalConfig(RestClusterClient<?> restClusterClient, String jobId) throws Exception {
        CheckpointConfigInfo checkpointConfigInfo = (CheckpointConfigInfo)MaterializedTableITCase.sendJobRequest(restClusterClient, CheckpointConfigHeaders.getInstance(), EmptyRequestBody.getInstance(), jobId);
        return RestMapperUtils.getStrictObjectMapper().readTree(RestMapperUtils.getStrictObjectMapper().writeValueAsString((Object)checkpointConfigInfo)).get("interval").asLong();
    }

    private static <M extends JobMessageParameters, R extends RequestBody, P extends ResponseBody> P sendJobRequest(RestClusterClient<?> restClusterClient, MessageHeaders<R, P, M> headers, R requestBody, String jobId) throws Exception {
        JobMessageParameters jobMessageParameters = (JobMessageParameters)headers.getUnresolvedMessageParameters();
        jobMessageParameters.jobPathParameter.resolve((Object)JobID.fromHexString((String)jobId));
        return (P)((ResponseBody)restClusterClient.sendRequest(headers, (MessageParameters)jobMessageParameters, requestBody).get(5L, TimeUnit.SECONDS));
    }

    private SessionHandle initializeSession() {
        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
        String catalogDDL = String.format("CREATE CATALOG %s\nWITH (\n  'type' = 'paimon',\n  'warehouse' = '%s'  )", this.paimonCatalogName, this.paimonWarehousePath);
        service.configureSession(sessionHandle, catalogDDL, -1L);
        service.configureSession(sessionHandle, String.format("USE CATALOG %s", this.paimonCatalogName), -1L);
        String dataGenSource = "CREATE TEMPORARY TABLE datagenSource (\n  order_id BIGINT,\n  order_number VARCHAR(20),\n  user_id BIGINT,\n  shop_id BIGINT,\n  product_id BIGINT,\n  status BIGINT,\n  order_type BIGINT,\n  order_created_at TIMESTAMP,\n  payment_amount_cents BIGINT\n)\nWITH (\n  'connector' = 'datagen',\n  'rows-per-second' = '10'\n)";
        service.configureSession(sessionHandle, dataGenSource, -1L);
        return sessionHandle;
    }

    public void createAndVerifyCreateMaterializedTableWithData(String materializedTableName, List<Row> data, Map<String, String> partitionFormatter, CatalogMaterializedTable.RefreshMode refreshMode) throws Exception {
        long timeout = Duration.ofSeconds(20L).toMillis();
        long pause = Duration.ofSeconds(2L).toMillis();
        String dataId = TestValuesTableFactory.registerData(data);
        String sourceDdl = String.format("CREATE TEMPORARY TABLE IF NOT EXISTS my_source (\n  order_id BIGINT,\n  user_id BIGINT,\n  shop_id BIGINT,\n  order_created_at STRING\n)\nWITH (\n  'connector' = 'values',\n  'bounded' = 'true',\n  'data-id' = '%s'\n)", dataId);
        OperationHandle sourceHandle = service.executeStatement(this.sessionHandle, sourceDdl, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, (SessionHandle)this.sessionHandle, (OperationHandle)sourceHandle);
        String partitionFields = partitionFormatter != null && !partitionFormatter.isEmpty() ? partitionFormatter.entrySet().stream().map(e -> String.format("'partition.fields.%s.date-formatter' = '%s'", e.getKey(), e.getValue())).collect(Collectors.joining(",\n", "", ",\n")) : "\n";
        String materializedTableDDL = String.format("CREATE MATERIALIZED TABLE %s PARTITIONED BY (ds)\n WITH(\n    %s   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n REFRESH_MODE = %s\n AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  COUNT(order_id) AS order_cnt\n FROM (\n    SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source ) AS tmp\n GROUP BY (user_id, shop_id, ds)", materializedTableName, partitionFields, refreshMode.toString());
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, (SessionHandle)this.sessionHandle, (OperationHandle)materializedTableHandle);
        try (ExecutionInBatchModeRunner ignore = new ExecutionInBatchModeRunner();){
            org.apache.flink.core.testutils.CommonTestUtils.waitUtil(() -> this.fetchTableData(this.sessionHandle, String.format("SELECT * FROM %s", materializedTableName)).size() == data.size(), (Duration)Duration.ofMillis(timeout), (Duration)Duration.ofMillis(pause), (String)"Failed to verify the data in materialized table.");
        }
    }

    public List<RowData> fetchTableData(SessionHandle sessionHandle, String query) {
        Configuration configuration = new Configuration();
        OperationHandle queryHandle = service.executeStatement(sessionHandle, query, -1L, configuration);
        return SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, (SessionHandle)sessionHandle, (OperationHandle)queryHandle);
    }

    public void verifyRefreshJobCreated(RestClusterClient<?> restClusterClient, String jobId, long startTime) throws Exception {
        long timeout = Duration.ofSeconds(20L).toMillis();
        long pause = Duration.ofSeconds(2L).toMillis();
        Optional<JobStatusMessage> job = ((Collection)restClusterClient.listJobs().get(timeout, TimeUnit.MILLISECONDS)).stream().filter(j -> j.getJobId().toString().equals(jobId)).findFirst();
        Assertions.assertThat(job).isPresent();
        Assertions.assertThat((long)job.get().getStartTime()).isGreaterThan(startTime);
        JobDetailsInfo jobDetailsInfo = (JobDetailsInfo)restClusterClient.getJobDetails(JobID.fromHexString((String)jobId)).get(timeout, TimeUnit.MILLISECONDS);
        Assertions.assertThat((Comparable)jobDetailsInfo.getJobType()).isEqualTo((Object)JobType.BATCH);
        org.apache.flink.core.testutils.CommonTestUtils.waitUtil(() -> {
            try {
                return JobStatus.FINISHED.equals(restClusterClient.getJobStatus(JobID.fromHexString((String)jobId)).get(5L, TimeUnit.SECONDS));
            }
            catch (Exception exception) {
                return false;
            }
        }, (Duration)Duration.ofMillis(timeout), (Duration)Duration.ofMillis(pause), (String)"Failed to verify whether the job is finished.");
    }

    protected class ExecutionInBatchModeRunner
    implements AutoCloseable {
        private final String oldMode;

        ExecutionInBatchModeRunner() {
            this.oldMode = (String)service.getSessionConfig(MaterializedTableITCase.this.sessionHandle).get("execution.runtime-mode");
            service.configureSession(MaterializedTableITCase.this.sessionHandle, "SET 'execution.runtime-mode' = 'batch'", -1L);
        }

        @Override
        public void close() throws Exception {
            if (this.oldMode != null) {
                service.configureSession(MaterializedTableITCase.this.sessionHandle, String.format("SET 'execution.runtime-mode' = '%s'", this.oldMode), -1L);
            }
        }
    }
}

