/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.procedure;

import java.util.List;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.paimon.flink.procedure.ProcedureBase;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MigrateDatabaseProcedure
extends ProcedureBase {
    private static final Logger LOG = LoggerFactory.getLogger(MigrateDatabaseProcedure.class);

    @Override
    public String identifier() {
        return "migrate_database";
    }

    public String[] call(ProcedureContext procedureContext, String connector, String sourceDatabasePath) throws Exception {
        return this.call(procedureContext, connector, sourceDatabasePath, "");
    }

    public String[] call(ProcedureContext procedureContext, String connector, String sourceDatabasePath, String properties) throws Exception {
        List<Migrator> migrators = TableMigrationUtils.getImporters(connector, this.catalog, sourceDatabasePath, Runtime.getRuntime().availableProcessors(), ParameterUtils.parseCommaSeparatedKeyValues(properties));
        String retStr = this.handleMigrators(migrators);
        return new String[]{retStr};
    }

    public String[] call(ProcedureContext procedureContext, String connector, String sourceDatabasePath, String properties, Integer parallelism) throws Exception {
        Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism.intValue();
        List<Migrator> migrators = TableMigrationUtils.getImporters(connector, this.catalog, sourceDatabasePath, p, ParameterUtils.parseCommaSeparatedKeyValues(properties));
        String retStr = this.handleMigrators(migrators);
        return new String[]{retStr};
    }

    public String handleMigrators(List<Migrator> migrators) {
        int errorCount = 0;
        int successCount = 0;
        for (Migrator migrator : migrators) {
            try {
                migrator.executeMigrate();
                migrator.renameTable(false);
                ++successCount;
            }
            catch (Exception e) {
                ++errorCount;
                LOG.error("Call migrate_database error:" + e.getMessage());
            }
        }
        String retStr = String.format("migrate database is finished, success cnt: %s , failed cnt: %s", String.valueOf(successCount), String.valueOf(errorCount));
        return retStr;
    }
}

