/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;

@InterfaceAudience.Private
public class HFileReplicator {
    public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY = "hbase.replication.bulkload.copy.maxthreads";
    public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10;
    public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY = "hbase.replication.bulkload.copy.hfiles.perthread";
    public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10;
    private static final Log LOG = LogFactory.getLog(HFileReplicator.class);
    private final String UNDERSCORE = "_";
    private static final FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
    private Configuration sourceClusterConf;
    private String sourceBaseNamespaceDirPath;
    private String sourceHFileArchiveDirPath;
    private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap;
    private FileSystem sinkFs;
    private FsDelegationToken fsDelegationToken;
    private UserProvider userProvider;
    private Configuration conf;
    private Connection connection;
    private String hbaseStagingDir;
    private ThreadPoolExecutor exec;
    private int maxCopyThreads;
    private int copiesPerThread;

    public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath, Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf, Connection connection) throws IOException {
        this.sourceClusterConf = sourceClusterConf;
        this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
        this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
        this.bulkLoadHFileMap = tableQueueMap;
        this.conf = conf;
        this.connection = connection;
        this.userProvider = UserProvider.instantiate(conf);
        this.fsDelegationToken = new FsDelegationToken(this.userProvider, "renewer");
        this.hbaseStagingDir = conf.get("hbase.bulkload.staging.dir");
        this.maxCopyThreads = this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY, 10);
        ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
        builder.setNameFormat("HFileReplicationCallable-%1$d");
        this.exec = new ThreadPoolExecutor(1, this.maxCopyThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), builder.build());
        this.exec.allowCoreThreadTimeOut(true);
        this.copiesPerThread = conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY, 10);
        this.sinkFs = FileSystem.get(conf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Void replicate() throws IOException {
        Map<String, Path> tableStagingDirsMap = this.copyHFilesToStagingDir();
        int maxRetries = this.conf.getInt("hbase.bulkload.retries.number", 10);
        for (Map.Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) {
            String tableNameString = tableStagingDir.getKey();
            Path stagingDir = tableStagingDir.getValue();
            LoadIncrementalHFiles loadHFiles = null;
            try {
                loadHFiles = new LoadIncrementalHFiles(this.conf);
            }
            catch (Exception e) {
                LOG.error((Object)"Failed to initialize LoadIncrementalHFiles for replicating bulk loaded data.", (Throwable)e);
                throw new IOException(e);
            }
            Configuration newConf = HBaseConfiguration.create(this.conf);
            newConf.set("create.table", "no");
            loadHFiles.setConf(newConf);
            TableName tableName = TableName.valueOf(tableNameString);
            Table table = this.connection.getTable(tableName);
            LinkedList<LoadIncrementalHFiles.LoadQueueItem> queue = new LinkedList<LoadIncrementalHFiles.LoadQueueItem>();
            loadHFiles.prepareHFileQueue(stagingDir, table, queue, false);
            if (queue.isEmpty()) {
                LOG.warn((Object)("Replication process did not find any files to replicate in directory " + stagingDir.toUri()));
                return null;
            }
            try {
                RegionLocator locator = this.connection.getRegionLocator(tableName);
                Throwable throwable = null;
                try {
                    this.fsDelegationToken.acquireDelegationToken(this.sinkFs);
                    loadHFiles.setBulkToken(stagingDir.toString());
                    this.doBulkLoad(loadHFiles, table, queue, locator, maxRetries);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (locator == null) continue;
                    if (throwable != null) {
                        try {
                            locator.close();
                        }
                        catch (Throwable x2) {
                            throwable.addSuppressed(x2);
                        }
                        continue;
                    }
                    locator.close();
                }
            }
            finally {
                this.cleanup(stagingDir.toString(), table);
            }
        }
        return null;
    }

    private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table, Deque<LoadIncrementalHFiles.LoadQueueItem> queue, RegionLocator locator, int maxRetries) throws IOException {
        int count = 0;
        while (!queue.isEmpty()) {
            Pair<byte[][], byte[][]> startEndKeys = locator.getStartEndKeys();
            if (count != 0) {
                LOG.warn((Object)("Error occurred while replicating HFiles, retry attempt " + count + " with " + queue.size() + " files still remaining to replicate."));
            }
            if (maxRetries != 0 && count >= maxRetries) {
                throw new IOException("Retry attempted " + count + " times without completing, bailing out.");
            }
            ++count;
            loadHFiles.loadHFileQueue(table, this.connection, queue, startEndKeys);
        }
    }

    private void cleanup(String stagingDir, Table table) {
        this.fsDelegationToken.releaseDelegationToken();
        if (stagingDir != null) {
            try {
                this.sinkFs.delete(new Path(stagingDir), true);
            }
            catch (IOException e) {
                LOG.warn((Object)("Failed to delete the staging directory " + stagingDir), (Throwable)e);
            }
        }
        if (table != null) {
            try {
                table.close();
            }
            catch (IOException e) {
                LOG.warn((Object)"Failed to close the table.", (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<String, Path> copyHFilesToStagingDir() throws IOException {
        HashMap<String, Path> mapOfCopiedHFiles = new HashMap<String, Path>();
        FileSystem sourceFs = null;
        try {
            Path sourceClusterPath = new Path(this.sourceBaseNamespaceDirPath);
            String sourceScheme = sourceClusterPath.toUri().getScheme();
            String disableCacheName = String.format("fs.%s.impl.disable.cache", sourceScheme);
            this.sourceClusterConf.setBoolean(disableCacheName, true);
            sourceFs = sourceClusterPath.getFileSystem(this.sourceClusterConf);
            User user = this.userProvider.getCurrent();
            for (Map.Entry<String, List<Pair<byte[], List<String>>>> tableEntry : this.bulkLoadHFileMap.entrySet()) {
                String tableName = tableEntry.getKey();
                Path stagingDir = this.createStagingDir(new Path(this.hbaseStagingDir), user, TableName.valueOf(tableName));
                List<Pair<byte[], List<String>>> familyHFilePathsPairsList = tableEntry.getValue();
                int familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();
                for (int i = 0; i < familyHFilePathsPairsListSize; ++i) {
                    Future<Void> future;
                    Copier c;
                    Pair<byte[], List<String>> familyHFilePathsPair = familyHFilePathsPairsList.get(i);
                    byte[] family = familyHFilePathsPair.getFirst();
                    List<String> hfilePaths = familyHFilePathsPair.getSecond();
                    Path familyStagingDir = new Path(stagingDir, Bytes.toString(family));
                    int totalNoOfHFiles = hfilePaths.size();
                    ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
                    int currentCopied = 0;
                    while (totalNoOfHFiles > currentCopied + this.copiesPerThread) {
                        c = new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied, currentCopied + this.copiesPerThread));
                        future = this.exec.submit(c);
                        futures.add(future);
                        currentCopied += this.copiesPerThread;
                    }
                    int remaining = totalNoOfHFiles - currentCopied;
                    if (remaining > 0) {
                        c = new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied, currentCopied + remaining));
                        future = this.exec.submit(c);
                        futures.add(future);
                    }
                    for (Future future2 : futures) {
                        try {
                            future2.get();
                        }
                        catch (InterruptedException e) {
                            InterruptedIOException iioe = new InterruptedIOException("Failed to copy HFiles to local file system. This will be retried again by the source cluster.");
                            iioe.initCause(e);
                            throw iioe;
                        }
                        catch (ExecutionException e) {
                            throw new IOException("Failed to copy HFiles to local file system. This will be retried again by the source cluster.", e);
                        }
                    }
                }
                mapOfCopiedHFiles.put(tableName, stagingDir);
            }
            HashMap<String, Path> hashMap = mapOfCopiedHFiles;
            return hashMap;
        }
        finally {
            if (sourceFs != null) {
                sourceFs.close();
            }
            if (this.exec != null) {
                this.exec.shutdown();
            }
        }
    }

    private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException {
        String tblName = tableName.getNameAsString().replace(":", "_");
        int RANDOM_WIDTH = 320;
        int RANDOM_RADIX = 32;
        String doubleUnderScore = "__";
        String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore + new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX);
        return this.createStagingDir(baseDir, user, randomDir);
    }

    private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException {
        Path p = new Path(baseDir, randomDir);
        this.sinkFs.mkdirs(p, PERM_ALL_ACCESS);
        this.sinkFs.setPermission(p, PERM_ALL_ACCESS);
        return p;
    }

    private class Copier
    implements Callable<Void> {
        private FileSystem sourceFs;
        private Path stagingDir;
        private List<String> hfiles;

        public Copier(FileSystem sourceFs, Path stagingDir, List<String> hfiles) throws IOException {
            this.sourceFs = sourceFs;
            this.stagingDir = stagingDir;
            this.hfiles = hfiles;
        }

        @Override
        public Void call() throws IOException {
            int totalHFiles = this.hfiles.size();
            for (int i = 0; i < totalHFiles; ++i) {
                Path sourceHFilePath = new Path(HFileReplicator.this.sourceBaseNamespaceDirPath, this.hfiles.get(i));
                Path localHFilePath = new Path(this.stagingDir, sourceHFilePath.getName());
                try {
                    FileUtil.copy(this.sourceFs, sourceHFilePath, HFileReplicator.this.sinkFs, localHFilePath, false, HFileReplicator.this.conf);
                }
                catch (FileNotFoundException e) {
                    LOG.info((Object)("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath + ". Trying to copy from hfile archive directory."), (Throwable)e);
                    sourceHFilePath = new Path(HFileReplicator.this.sourceHFileArchiveDirPath, this.hfiles.get(i));
                    try {
                        FileUtil.copy(this.sourceFs, sourceHFilePath, HFileReplicator.this.sinkFs, localHFilePath, false, HFileReplicator.this.conf);
                    }
                    catch (FileNotFoundException e1) {
                        LOG.error((Object)("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath + ". Hence ignoring this hfile from replication.."), (Throwable)e1);
                        continue;
                    }
                }
                HFileReplicator.this.sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS);
            }
            return null;
        }
    }
}

