/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.threadlocal.InternalThreadLocal;
import org.apache.kylin.common.util.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HBaseConnection {
    public static final String HTABLE_UUID_TAG = "UUID";
    private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
    private static final Map<StorageURL, Configuration> configCache = new ConcurrentHashMap<StorageURL, Configuration>();
    private static final Map<StorageURL, Connection> connPool = new ConcurrentHashMap<StorageURL, Connection>();
    private static final InternalThreadLocal<Configuration> configThreadLocal = new InternalThreadLocal();
    private static ExecutorService coprocessorPool = null;
    public static final String JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE = "mapreduce.job.hdfs-servers.token-renewal.exclude";

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static ExecutorService getCoprocessorPool() {
        if (coprocessorPool != null) {
            return coprocessorPool;
        }
        Class<HBaseConnection> clazz = HBaseConnection.class;
        synchronized (HBaseConnection.class) {
            if (coprocessorPool != null) {
                // ** MonitorExit[var0] (shouldn't be in output)
                return coprocessorPool;
            }
            KylinConfig config = KylinConfig.getInstanceFromEnv();
            int maxThreads = config.getHBaseMaxConnectionThreads();
            int coreThreads = config.getHBaseCoreConnectionThreads();
            long keepAliveTime = config.getHBaseConnectionThreadPoolAliveSeconds();
            LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * 100);
            ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory((String)"kylin-coproc-"));
            tpe.allowCoreThreadTimeOut(true);
            logger.info("Creating coprocessor thread pool with max of {}, core of {}", (Object)maxThreads, (Object)coreThreads);
            coprocessorPool = tpe;
            // ** MonitorExit[var0] (shouldn't be in output)
            return coprocessorPool;
        }
    }

    private static void closeCoprocessorPool() {
        if (coprocessorPool == null) {
            return;
        }
        coprocessorPool.shutdown();
        try {
            if (!coprocessorPool.awaitTermination(10L, TimeUnit.SECONDS)) {
                coprocessorPool.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            coprocessorPool.shutdownNow();
        }
    }

    private static Thread closeAndRestConnPool() {
        final ArrayList<Connection> copy = new ArrayList<Connection>(connPool.values());
        connPool.clear();
        Thread t = new Thread(){

            @Override
            public void run() {
                logger.info("Closing HBase connections...");
                for (Connection conn : copy) {
                    try {
                        conn.close();
                    }
                    catch (Exception e) {
                        logger.error("error closing hbase connection " + conn, (Throwable)e);
                    }
                }
            }
        };
        t.setName("close-hbase-conn");
        t.start();
        return t;
    }

    public static void clearConnCache() {
        HBaseConnection.closeAndRestConnPool();
    }

    public static Configuration getCurrentHBaseConfiguration() {
        if (configThreadLocal.get() == null) {
            StorageURL storageUrl = KylinConfig.getInstanceFromEnv().getStorageUrl();
            configThreadLocal.set((Object)HBaseConnection.newHBaseConfiguration(storageUrl));
        }
        return (Configuration)configThreadLocal.get();
    }

    private static Configuration newHBaseConfiguration(StorageURL url) {
        if (!"hbase".equals(url.getScheme())) {
            throw new IllegalArgumentException("to use hbase storage, pls set 'kylin.storage.url=hbase' in kylin.properties");
        }
        Configuration conf = HBaseConfiguration.create((Configuration)HadoopUtil.getCurrentConfiguration());
        HBaseConnection.addHBaseClusterNNHAConfiguration(conf);
        KylinConfig kylinConf = KylinConfig.getInstanceFromEnv();
        String hbaseClusterFs = kylinConf.getHBaseClusterFs();
        if (StringUtils.isNotEmpty((String)hbaseClusterFs)) {
            conf.set("fs.defaultFS", hbaseClusterFs);
        } else {
            try {
                FileSystem fs = HadoopUtil.getWorkingFileSystem((Configuration)HadoopUtil.getCurrentConfiguration());
                conf.set("fs.defaultFS", fs.getUri().toString());
                logger.debug("Using the working dir FS for HBase: " + fs.getUri().toString());
            }
            catch (IOException e) {
                logger.error("Fail to set working dir to HBase configuration", (Throwable)e);
            }
        }
        if (StringUtils.isBlank((String)conf.get("hadoop.tmp.dir"))) {
            conf.set("hadoop.tmp.dir", "/tmp");
        }
        if (StringUtils.isBlank((String)conf.get("hbase.fs.tmp.dir"))) {
            conf.set("hbase.fs.tmp.dir", "/tmp");
        }
        for (Map.Entry entry : url.getAllParameters().entrySet()) {
            conf.set((String)entry.getKey(), (String)entry.getValue());
        }
        return conf;
    }

    public static void addHBaseClusterNNHAConfiguration(Configuration conf) {
        String hdfsConfigFile = KylinConfig.getInstanceFromEnv().getHBaseClusterHDFSConfigFile();
        if (hdfsConfigFile == null || hdfsConfigFile.isEmpty()) {
            return;
        }
        Configuration hdfsConf = new Configuration(false);
        hdfsConf.addResource(hdfsConfigFile);
        Collection nameServices = hdfsConf.getTrimmedStringCollection("dfs.nameservices");
        Collection mainNameServices = conf.getTrimmedStringCollection("dfs.nameservices");
        for (String serviceId : nameServices) {
            mainNameServices.add(serviceId);
            String serviceConfKey = "dfs.ha.namenodes." + serviceId;
            String proxyConfKey = "dfs.client.failover.proxy.provider." + serviceId;
            conf.set(serviceConfKey, hdfsConf.get(serviceConfKey, ""));
            conf.set(proxyConfKey, hdfsConf.get(proxyConfKey, ""));
            Collection nameNodes = hdfsConf.getTrimmedStringCollection(serviceConfKey);
            for (String nameNode : nameNodes) {
                String rpcConfKey = "dfs.namenode.rpc-address." + serviceId + "." + nameNode;
                conf.set(rpcConfKey, hdfsConf.get(rpcConfKey, ""));
            }
        }
        conf.setStrings("dfs.nameservices", mainNameServices.toArray(new String[0]));
        conf.setStrings(JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE, nameServices.toArray(new String[0]));
    }

    public static String makeQualifiedPathInHBaseCluster(String inPath) {
        Path path = new Path(inPath);
        path = Path.getPathWithoutSchemeAndAuthority((Path)path);
        FileSystem fs = HadoopUtil.getFileSystem((Path)path, (Configuration)HBaseConnection.getCurrentHBaseConfiguration());
        return fs.makeQualified(path).toString();
    }

    public static FileSystem getFileSystemInHBaseCluster(String inPath) {
        Path path = new Path(inPath);
        path = Path.getPathWithoutSchemeAndAuthority((Path)path);
        FileSystem fs = HadoopUtil.getFileSystem((Path)path, (Configuration)HBaseConnection.getCurrentHBaseConfiguration());
        return fs;
    }

    public static Connection get(StorageURL url) {
        Configuration conf = configCache.get(url);
        if (conf == null) {
            conf = HBaseConnection.newHBaseConfiguration(url);
            configCache.put(url, conf);
        }
        Connection connection = connPool.get(url);
        try {
            while (true) {
                if (connection == null || connection.isClosed()) {
                    logger.info("connection is null or closed, creating a new one");
                    connection = ConnectionFactory.createConnection((Configuration)conf);
                    connPool.put(url, connection);
                }
                if (connection == null || connection.isClosed()) {
                    Thread.sleep(10000L);
                    continue;
                }
                break;
            }
        }
        catch (Throwable t) {
            logger.error("Error when open connection " + url, t);
            throw new RuntimeException("Error when open connection " + url, t);
        }
        return connection;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static boolean tableExists(Connection conn, String tableName) throws IOException {
        try (Admin hbase = conn.getAdmin();){
            boolean bl = hbase.tableExists(TableName.valueOf((String)tableName));
            return bl;
        }
    }

    public static boolean tableExists(StorageURL hbaseUrl, String tableName) throws IOException {
        return HBaseConnection.tableExists(HBaseConnection.get(hbaseUrl), tableName);
    }

    public static void createHTableIfNeeded(StorageURL hbaseUrl, String tableName, String ... families) throws IOException {
        HBaseConnection.createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families);
    }

    public static void deleteTable(StorageURL hbaseUrl, String tableName) throws IOException {
        HBaseConnection.deleteTable(HBaseConnection.get(hbaseUrl), tableName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createHTableIfNeeded(Connection conn, String table, String ... families) throws IOException {
        Admin admin = conn.getAdmin();
        TableName tableName = TableName.valueOf((String)table);
        DistributedLock lock = null;
        String lockPath = HBaseConnection.getLockPath(table);
        try {
            if (HBaseConnection.tableExists(conn, table)) {
                logger.debug("HTable '" + table + "' already exists");
                Set<String> existingFamilies = HBaseConnection.getFamilyNames(admin.getTableDescriptor(tableName));
                boolean wait = false;
                for (String family : families) {
                    if (existingFamilies.contains(family)) continue;
                    logger.debug("Adding family '" + family + "' to HTable '" + table + "'");
                    admin.addColumn(tableName, HBaseConnection.newFamilyDescriptor(family));
                    wait = true;
                }
                if (wait) {
                    try {
                        Thread.sleep(10000L);
                    }
                    catch (InterruptedException e) {
                        logger.warn("", (Throwable)e);
                    }
                }
                return;
            }
            lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentProcess();
            if (!lock.lock(lockPath, Long.MAX_VALUE)) {
                throw new RuntimeException("Cannot acquire lock to create HTable " + table);
            }
            if (HBaseConnection.tableExists(conn, table)) {
                logger.debug("HTable '" + table + "' already exists");
                return;
            }
            logger.debug("Creating HTable '" + table + "'");
            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf((String)table));
            if (null != families && families.length > 0) {
                for (String family : families) {
                    HColumnDescriptor fd = HBaseConnection.newFamilyDescriptor(family);
                    desc.addFamily(fd);
                }
            }
            admin.createTable(desc);
            logger.debug("HTable '" + table + "' created");
        }
        finally {
            admin.close();
            if (lock != null && lock.isLockedByMe(lockPath)) {
                lock.unlock(lockPath);
            }
        }
    }

    private static Set<String> getFamilyNames(HTableDescriptor desc) {
        HashSet result = Sets.newHashSet();
        for (byte[] bytes : desc.getFamiliesKeys()) {
            try {
                result.add(new String(bytes, "UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                logger.error(e.toString());
            }
        }
        return result;
    }

    private static HColumnDescriptor newFamilyDescriptor(String family) {
        HColumnDescriptor fd = new HColumnDescriptor(family);
        fd.setInMemory(true);
        return fd;
    }

    public static void deleteTable(Connection conn, String tableName) throws IOException {
        try (Admin hbase = conn.getAdmin();){
            if (!HBaseConnection.tableExists(conn, tableName)) {
                logger.debug("HTable '" + tableName + "' does not exists");
                return;
            }
            logger.debug("delete HTable '" + tableName + "'");
            if (hbase.isTableEnabled(TableName.valueOf((String)tableName))) {
                hbase.disableTable(TableName.valueOf((String)tableName));
            }
            hbase.deleteTable(TableName.valueOf((String)tableName));
            logger.debug("HTable '" + tableName + "' deleted");
        }
    }

    private static String getLockPath(String table) {
        return "/create_htable/" + table + "/lock";
    }

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                try {
                    HBaseConnection.closeCoprocessorPool();
                    HBaseConnection.closeAndRestConnPool().join();
                }
                catch (InterruptedException e) {
                    logger.error("", (Throwable)e);
                }
            }
        });
    }
}

