/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeCasts;

public abstract class ActionBase
implements Action {
    protected final Options catalogOptions;
    protected final org.apache.paimon.catalog.Catalog catalog;
    protected final FlinkCatalog flinkCatalog;
    protected final String catalogName = "paimon-" + UUID.randomUUID();
    protected StreamExecutionEnvironment env;
    protected StreamTableEnvironment batchTEnv;

    public ActionBase(String warehouse, Map<String, String> catalogConfig) {
        this.catalogOptions = Options.fromMap(catalogConfig);
        this.catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
        if (!this.catalogOptions.contains(CatalogOptions.CACHE_ENABLED)) {
            this.catalogOptions.set(CatalogOptions.CACHE_ENABLED, false);
        }
        this.catalog = this.initPaimonCatalog();
        this.flinkCatalog = this.initFlinkCatalog();
        this.initFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment());
    }

    public ActionBase withStreamExecutionEnvironment(StreamExecutionEnvironment env) {
        this.initFlinkEnv(env);
        return this;
    }

    protected org.apache.paimon.catalog.Catalog initPaimonCatalog() {
        return FlinkCatalogFactory.createPaimonCatalog(this.catalogOptions);
    }

    protected FlinkCatalog initFlinkCatalog() {
        return FlinkCatalogFactory.createCatalog(this.catalogName, this.catalog, this.catalogOptions);
    }

    protected void initFlinkEnv(StreamExecutionEnvironment env) {
        this.env = env;
        this.env.getConfig().enableObjectReuse();
        this.batchTEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env, (EnvironmentSettings)EnvironmentSettings.inBatchMode());
        this.batchTEnv.registerCatalog(this.flinkCatalog.getName(), (Catalog)this.flinkCatalog);
        this.batchTEnv.useCatalog(this.flinkCatalog.getName());
    }

    protected void execute(String defaultName) throws Exception {
        ReadableConfig conf = this.env.getConfiguration();
        String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultName);
        this.env.execute(name);
    }

    protected Catalog.Loader catalogLoader() {
        Options catalogOptions = this.catalogOptions;
        return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
    }

    protected List<DataType> toPaimonTypes(List<org.apache.flink.table.types.DataType> flinkDataTypes) {
        return flinkDataTypes.stream().map(org.apache.flink.table.types.DataType::getLogicalType).map(LogicalTypeConversion::toDataType).collect(Collectors.toList());
    }

    protected boolean compatibleCheck(List<DataType> actualTypes, List<DataType> expectedTypes) {
        if (actualTypes.size() != expectedTypes.size()) {
            return false;
        }
        for (int i = 0; i < actualTypes.size(); ++i) {
            if (DataTypeCasts.supportsCompatibleCast(actualTypes.get(i), expectedTypes.get(i))) continue;
            return false;
        }
        return true;
    }

    @VisibleForTesting
    public Map<String, String> catalogConfig() {
        return this.catalogOptions.toMap();
    }
}

