/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.Snapshot;
import org.apache.paimon.SnapshotTest;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.BeforeEach;

public abstract class CatalogITCaseBase
extends AbstractTestBase {
    protected TableEnvironment tEnv;
    protected TableEnvironment sEnv;
    protected String path;

    @BeforeEach
    public void before() throws IOException {
        this.tEnv = this.tableEnvironmentBuilder().batchMode().build();
        String catalog = "PAIMON";
        this.path = this.getTempDirPath();
        String inferScan = !this.inferScanParallelism() ? ",\n'table-default.scan.infer-parallelism'='false'" : "";
        HashMap<String, String> options = new HashMap<String, String>(this.catalogOptions());
        options.put("type", "paimon");
        if (this.supportDefineWarehouse()) {
            options.put("warehouse", this.toWarehouse(this.path));
        }
        this.tEnv.executeSql(String.format("CREATE CATALOG %s WITH (%s" + inferScan + ")", catalog, options.entrySet().stream().map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())).collect(Collectors.joining(","))));
        this.tEnv.useCatalog(catalog);
        this.sEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build();
        this.sEnv.registerCatalog(catalog, (org.apache.flink.table.catalog.Catalog)this.tEnv.getCatalog(catalog).get());
        this.sEnv.useCatalog(catalog);
        this.setParallelism(this.defaultParallelism());
        this.prepareEnv();
    }

    protected Map<String, String> catalogOptions() {
        return Collections.emptyMap();
    }

    protected boolean supportDefineWarehouse() {
        return true;
    }

    protected boolean inferScanParallelism() {
        return false;
    }

    protected void prepareEnv() {
        Parser parser = ((TableEnvironmentImpl)this.tEnv).getParser();
        for (String ddl : this.ddl()) {
            Operation operation;
            this.tEnv.executeSql(ddl);
            List operations = parser.parse(ddl);
            if (operations.size() != 1 || !((operation = (Operation)operations.get(0)) instanceof CreateCatalogOperation)) continue;
            String name = ((CreateCatalogOperation)operation).getCatalogName();
            this.sEnv.registerCatalog(name, (org.apache.flink.table.catalog.Catalog)this.tEnv.getCatalog(name).orElse(null));
        }
    }

    protected void setParallelism(int parallelism) {
        this.tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)parallelism);
        this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)parallelism);
    }

    protected int defaultParallelism() {
        return 2;
    }

    protected List<String> ddl() {
        return Collections.emptyList();
    }

    protected List<Row> batchSql(String query, Object ... args) {
        return this.sql(query, args);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected List<Row> sql(String query, Object ... args) {
        try (CloseableIterator iter = this.tEnv.executeSql(String.format(query, args)).collect();){
            ImmutableList immutableList = ImmutableList.copyOf((Iterator)iter);
            return immutableList;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected void sqlAssertWithRetry(String query, Consumer<ListAssert<Row>> checker, Object ... args) {
        long start = System.currentTimeMillis();
        try {
            while (true) {
                CloseableIterator iter = this.tEnv.executeSql(String.format(query, args)).collect();
                Throwable throwable = null;
                try {
                    checker.accept((ListAssert<Row>)Assertions.assertThat((List)ImmutableList.copyOf((Iterator)iter)));
                    return;
                }
                catch (AssertionError e) {
                    if (System.currentTimeMillis() - start < 180000L) continue;
                    throw new RuntimeException((Throwable)((Object)e));
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (iter == null) continue;
                    if (throwable != null) {
                        try {
                            iter.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    iter.close();
                    continue;
                }
                break;
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected CloseableIterator<Row> streamSqlIter(String query, Object ... args) {
        return this.sEnv.executeSql(String.format(query, args)).collect();
    }

    protected BlockingIterator<Row, Row> streamSqlBlockIter(String query, Object ... args) {
        return BlockingIterator.of((Iterator)this.sEnv.executeSql(String.format(query, args)).collect());
    }

    protected CatalogTable table(String tableName) throws TableNotExistException {
        FlinkCatalog catalog = this.flinkCatalog();
        CatalogBaseTable table = catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), tableName));
        return (CatalogTable)table;
    }

    protected FileStoreTable paimonTable(String tableName) throws Catalog.TableNotExistException {
        Catalog catalog = this.flinkCatalog().catalog();
        return (FileStoreTable)catalog.getTable(Identifier.create((String)this.tEnv.getCurrentDatabase(), (String)tableName));
    }

    protected FlinkCatalog flinkCatalog() {
        return (FlinkCatalog)this.tEnv.getCatalog(this.tEnv.getCurrentCatalog()).get();
    }

    protected Path getTableDirectory(String tableName) {
        return new Path(new File(this.path, String.format("%s.db/%s", this.tEnv.getCurrentDatabase(), tableName)).toString());
    }

    @Nullable
    protected Snapshot findLatestSnapshot(String tableName) {
        SnapshotManager snapshotManager = SnapshotTest.newSnapshotManager((FileIO)LocalFileIO.create(), (Path)this.getTableDirectory(tableName));
        Long id = snapshotManager.latestSnapshotId();
        return id == null ? null : snapshotManager.snapshot(id.longValue());
    }

    @Nullable
    protected Snapshot findSnapshot(String tableName, long snapshotId) {
        SnapshotManager snapshotManager = SnapshotTest.newSnapshotManager((FileIO)LocalFileIO.create(), (Path)this.getTableDirectory(tableName));
        Long id = snapshotManager.latestSnapshotId();
        return id == null ? null : (id >= snapshotId ? snapshotManager.snapshot(snapshotId) : null);
    }

    protected String toWarehouse(String path) {
        return path;
    }

    protected List<Row> queryAndSort(String sql) {
        return this.sql(sql, new Object[0]).stream().sorted(Comparator.comparingInt(r -> (Integer)r.getFieldAs(0))).collect(Collectors.toList());
    }
}

