/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.migrator.framework;

import com.oceanbase.tools.migrator.dao.HistoryJobDao;
import com.oceanbase.tools.migrator.dao.HistoryWorkerDao;
import com.oceanbase.tools.migrator.datasource.DataSourceAdapter;
import com.oceanbase.tools.migrator.datasource.DataSourceManager;
import com.oceanbase.tools.migrator.framework.CommandConsumer;
import com.oceanbase.tools.migrator.framework.NativeFSLock;
import com.oceanbase.tools.migrator.framework.VersionInfo;
import java.io.File;
import java.net.InetAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MainFramework {
    private static final Logger log = LoggerFactory.getLogger(MainFramework.class);
    private static String clientIp = "0.0.0.0";
    private static DataSourceAdapter taskDataSource = null;
    private static DataSourceAdapter metaDataSource = null;
    private static int jobLimit;
    private static String workerGroup;
    private static List<String> tldcGroup;
    private static int jobCount;
    private static int updateConfigInterval;
    private static int writableCheckInterval;
    private static int deletableCheckInterval;
    private static DataSourceManager dataSourceManager;
    private static ExecutorService cachedThreadPool;
    private static VersionInfo versionMessage;
    private static ExecutorService workerThreadPool;

    public static String getClientVersion() {
        return versionMessage.getVersion();
    }

    public static String getClientBuildTime() {
        return versionMessage.getBuildTime();
    }

    public static String getClientCommitId() {
        return versionMessage.getCommitIdAbbrev();
    }

    public static int getWritableCheckInterval() {
        return writableCheckInterval;
    }

    public static int getDeletableCheckInterval() {
        return deletableCheckInterval;
    }

    public static int getUpdateConfigInterval() {
        return updateConfigInterval;
    }

    public static void setUpdateConfigInterval(int updateConfigInterval) {
        MainFramework.updateConfigInterval = updateConfigInterval;
    }

    public static DataSourceManager getDataSourceManager() {
        return dataSourceManager;
    }

    public static DataSourceAdapter getTaskDataSource() {
        return taskDataSource;
    }

    public static void setTaskDataSource(DataSourceAdapter taskDataSource) {
        MainFramework.taskDataSource = taskDataSource;
    }

    public static DataSourceAdapter getMetaDataSource() {
        return metaDataSource;
    }

    public static void setMetaDataSource(DataSourceAdapter metaDataSource) {
        MainFramework.metaDataSource = metaDataSource;
    }

    static Integer getJobLimit() {
        return jobLimit;
    }

    static String getClientIp() {
        return clientIp;
    }

    public static void initMainFramework(String env, int workerCount, String group) throws Exception {
        String metaDb;
        String taskDb;
        clientIp = InetAddress.getLocalHost().getHostAddress();
        jobLimit = workerCount;
        workerGroup = group;
        Properties p = new Properties();
        p.load(MainFramework.class.getClassLoader().getResourceAsStream("conf/log4j.properties"));
        PropertyConfigurator.configure((Properties)p);
        if (tldcGroup.contains(group)) {
            taskDb = String.format("conf/%s/taskdb.json", group.toLowerCase(Locale.ROOT));
            metaDb = String.format("conf/%s/metadb.json", group.toLowerCase(Locale.ROOT));
        } else {
            taskDb = String.format("conf/%s/taskdb.json", env);
            metaDb = String.format("conf/%s/metadb.json", env);
        }
        taskDataSource = dataSourceManager.initMetaDb("taskdb", taskDb);
        metaDataSource = dataSourceManager.initMetaDb("metadb", metaDb);
        versionMessage = VersionInfo.versionInfo();
        workerThreadPool = Executors.newFixedThreadPool(workerCount);
        MainFramework.handleRunningJob();
        final NativeFSLock lock = new NativeFSLock(new File("./"));
        NativeFSLock.lock(lock);
        final Thread heartbeatThread = MainFramework.launchHeartbeatThread();
        final Thread consumerThread = MainFramework.launchCommandConsumerThreads();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    heartbeatThread.interrupt();
                    heartbeatThread.join();
                    consumerThread.interrupt();
                    consumerThread.join();
                }
                catch (InterruptedException e) {
                    log.error(e.toString());
                }
                log.info("Worker Main Process Exit");
                NativeFSLock.unlock(lock);
            }
        }));
    }

    private static Thread launchCommandConsumerThreads() {
        Thread consumerThread = new Thread(new CommandConsumer(workerThreadPool));
        consumerThread.start();
        return consumerThread;
    }

    private static Thread launchHeartbeatThread() throws Exception {
        try (Connection connection = metaDataSource.getConnection();){
            HistoryWorkerDao.reportHeartbeat(connection, MainFramework.getClientIp(), MainFramework.getClientVersion(), MainFramework.getJobLimit(), MainFramework.getWorkerGroup(), 0);
        }
        Thread heartbeatThread = new Thread(new Runnable(){

            @Override
            public void run() {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        TimeUnit.SECONDS.sleep(5L);
                    }
                    catch (InterruptedException e) {
                        log.warn(e.toString());
                        break;
                    }
                    try {
                        int jobCount = MainFramework.getJobCount();
                        try (Connection connection = metaDataSource.getConnection();){
                            HistoryWorkerDao.reportHeartbeat(connection, MainFramework.getClientIp(), MainFramework.getClientVersion(), MainFramework.getJobLimit(), MainFramework.getWorkerGroup(), MainFramework.getJobCount());
                        }
                        log.info("Report Heartbeat. current Job Count: {}", (Object)jobCount);
                    }
                    catch (SQLException e) {
                        log.warn(e.toString());
                    }
                }
                log.info("Heartbeat Thread Is Interrupted");
                try (Connection connection = metaDataSource.getConnection();){
                    HistoryWorkerDao.deleteHeartbeat(connection, MainFramework.getClientIp());
                }
                catch (SQLException e) {
                    log.warn(e.toString());
                }
            }
        });
        heartbeatThread.start();
        return heartbeatThread;
    }

    public static CommandLine parseCommandArguments(String[] args) {
        CommandLine line = null;
        DefaultParser parser = new DefaultParser();
        Options options = new Options();
        options.addOption("v", "version", false, "print client version");
        options.addOption("e", "env", true, "run as new platform");
        options.addOption("w", "worker_count", true, "worker count");
        options.addOption("g", "worker_group", true, "worker group");
        try {
            line = parser.parse(options, args);
        }
        catch (ParseException e) {
            System.out.println("invalid command arguments");
            e.printStackTrace();
            System.exit(-1);
        }
        return line;
    }

    public static void runGetVersion() {
        versionMessage = VersionInfo.versionInfo();
        System.out.println("obhistory-worker");
        System.out.println("Version: " + versionMessage.getVersion());
    }

    public static void runNewMain(String env, int workerCount, String group) {
        try {
            MainFramework.initMainFramework(env, workerCount, group);
        }
        catch (Exception e) {
            log.warn("fail to init mainFramework: ", (Throwable)e);
            System.exit(1);
        }
    }

    public static void main(String[] args) {
        CommandLine commandLine = MainFramework.parseCommandArguments(args);
        if (commandLine.hasOption("v")) {
            MainFramework.runGetVersion();
        } else {
            String env = commandLine.getOptionValue("e");
            int workerCount = 16;
            if (commandLine.hasOption("w")) {
                workerCount = Integer.parseInt(commandLine.getOptionValue("w"));
            }
            if (StringUtils.isEmpty((String)env)) {
                throw new RuntimeException("Option env is empty");
            }
            String group = workerGroup;
            if (commandLine.hasOption("g")) {
                group = commandLine.getOptionValue("g");
            }
            MainFramework.runNewMain(env, workerCount, group);
        }
    }

    public static ExecutorService getCachedThreadPool() {
        return cachedThreadPool;
    }

    private static void handleRunningJob() throws SQLException {
        try (Connection connection = metaDataSource.getConnection();){
            int updatedCount = HistoryJobDao.updateRunningJobToRunnable(connection, MainFramework.getClientIp());
            if (updatedCount > 0) {
                log.info("handle {} running job", (Object)updatedCount);
            }
        }
    }

    public static String getWorkerGroup() {
        return workerGroup;
    }

    public static int getJobCount() {
        return jobCount;
    }

    public static void setJobCount(int jobCount) {
        MainFramework.jobCount = jobCount;
    }

    static {
        workerGroup = "DEFAULT";
        tldcGroup = Arrays.asList("INS", "SIM", "ACF", "CF");
        jobCount = 0;
        updateConfigInterval = 10;
        writableCheckInterval = 15;
        deletableCheckInterval = 5;
        dataSourceManager = new DataSourceManager();
        cachedThreadPool = Executors.newCachedThreadPool();
        versionMessage = null;
    }
}

