/*
 * Decompiled with CFR 0.152.
 */
package org.broadinstitute.hellbender.tools.spark;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.argparser.BetaFeature;
import org.broadinstitute.barclay.argparser.CommandLineProgramProperties;
import org.broadinstitute.barclay.help.DocumentedFeature;
import org.broadinstitute.hellbender.engine.spark.GATKSparkTool;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.gcs.BucketUtils;
import org.broadinstitute.hellbender.utils.io.IOUtils;
import picard.cmdline.programgroups.OtherProgramGroup;
import scala.Tuple2;

@DocumentedFeature
@CommandLineProgramProperties(oneLineSummary="Parallel copy a file or directory from Google Cloud Storage into the HDFS file system used by Spark", summary="This tool uses a Spark cluster to do a parallel copy of either a single file or a directory from Google Cloud Storage (GCS) into the HDFS file system used by Spark to support Resilient Distributed Datasets (RDDs). Files are divided into chunks of size equal to the HDFS block size (with the exception of the final chunk) and each Spark task is responsible for copying one chunk. To copy all of the files in a GCS directory, provide the GCS directory path, including the trailing slash. Directory copies are non-recursive so subdirectories will be skipped. Within directories each file is divided into chunks independently (so this will be inefficient if you have lots of files smaller than the block size). After all chunks are copied, the HDFS concat method is used to stitch together chunks into single files without re-copying them. This functionality is used by the structural variation workflow to copy reference data when a Spark cluster is created, and may also be used to copy sample data to a Spark cluster.", programGroup=OtherProgramGroup.class)
@BetaFeature
public class ParallelCopyGCSDirectoryIntoHDFSSpark
extends GATKSparkTool {
    private static final long serialVersionUID = 1L;
    public static final int SIXTY_FOUR_MIB = 0x4000000;
    public static final String INPUT_GCS_PATH_LONG_NAME = "input-gcs-path";
    public static final String OUTPUT_HDFS_DIRECTORY_LONG_NAME = "output-hdfs-directory";
    public static final String INPUT_GLOB = "input-file-glob";
    public static final String INPUT_GLOB_ALL_FILES = "*";
    @Argument(doc="input GCS file path (add trailing slash when specifying a directory)", fullName="input-gcs-path")
    private String inputGCSPath = null;
    @Argument(doc="optional wildcard glob to subset files in the input directory to copy", fullName="input-file-glob")
    private String inputGlob = "*";
    @Argument(doc="output directory on HDFS to into which to transfer the data (will be created by the tool)", fullName="output-hdfs-directory")
    private String outputHDFSDirectory;

    @Override
    protected void runTool(JavaSparkContext ctx) {
        if (!BucketUtils.isGcsUrl(this.inputGCSPath)) {
            throw new UserException("Input path " + this.inputGCSPath + " is not a GCS URI");
        }
        if (!BucketUtils.isHadoopUrl(this.outputHDFSDirectory)) {
            throw new UserException("Output directory " + this.outputHDFSDirectory + " is not an HDFS URI");
        }
        String inputGCSPathFinal = this.inputGCSPath;
        String outputDirectoryFinal = this.outputHDFSDirectory;
        Path outputHdfsDirectoryPath = new Path(this.outputHDFSDirectory);
        try (FileSystem fs = outputHdfsDirectoryPath.getFileSystem(new Configuration());){
            if (fs.exists(outputHdfsDirectoryPath)) {
                throw new UserException("Specified output directory " + outputHdfsDirectoryPath + " already exists. Please specify a new directory name.");
            }
            fs.mkdirs(outputHdfsDirectoryPath);
            long chunkSize = ParallelCopyGCSDirectoryIntoHDFSSpark.getChunkSize(fs);
            List<java.nio.file.Path> gcsNIOPaths = this.getGCSFilePathsToCopy(inputGCSPathFinal, this.inputGlob);
            List<Tuple2<String, Integer>> chunkList = this.setupChunks(chunkSize, gcsNIOPaths);
            if (chunkList.size() == 0) {
                this.logger.info("no files found to copy");
                return;
            }
            JavaPairRDD chunkRDD = ctx.parallelizePairs(chunkList, chunkList.size());
            JavaPairRDD chunkMappingRDD = chunkRDD.mapToPair((PairFunction & Serializable)p -> new Tuple2(p._1(), ParallelCopyGCSDirectoryIntoHDFSSpark.readChunkToHdfs((String)p._1(), chunkSize, (Integer)p._2(), outputDirectoryFinal)));
            Map chunksByFilePath = chunkMappingRDD.groupByKey().collectAsMap();
            this.concatenateChunks(outputDirectoryFinal, fs, gcsNIOPaths, chunksByFilePath);
        }
        catch (NoSuchFileException e) {
            throw new UserException("Could not locate input path " + e.getFile() + ". If you are trying to copy an entire directory, please include a trailing slash on your path.");
        }
        catch (IOException e) {
            throw new GATKException(e.getMessage(), e);
        }
    }

    private void concatenateChunks(String outputDirectoryFinal, FileSystem fs, List<java.nio.file.Path> gcsNIOPaths, Map<String, Iterable<Tuple2<Integer, String>>> chunksByFilePath) throws IOException {
        for (java.nio.file.Path path : gcsNIOPaths) {
            if (Files.isDirectory(path, new LinkOption[0])) continue;
            String filePath = path.toUri().toString();
            Iterable<Tuple2<Integer, String>> chunkListForFile = chunksByFilePath.get(filePath);
            String basename = path.getName(path.getNameCount() - 1).toString();
            Path outFilePath = new Path(outputDirectoryFinal + "/" + basename);
            fs.createNewFile(outFilePath);
            TreeMap<Object, Object> chunkMap = new TreeMap<Object, Object>();
            for (Tuple2<Integer, String> entry : chunkListForFile) {
                chunkMap.put(entry._1(), entry._2());
            }
            Path[] chunkPaths = new Path[chunkMap.size()];
            for (Integer next : chunkMap.keySet()) {
                String chunkPath = (String)chunkMap.get(next);
                chunkPaths[next.intValue()] = new Path(chunkPath);
            }
            fs.concat(outFilePath, chunkPaths);
        }
    }

    private List<Tuple2<String, Integer>> setupChunks(long chunkSize, List<java.nio.file.Path> gcsNIOPaths) throws IOException {
        ArrayList<Tuple2<String, Integer>> chunkList = new ArrayList<Tuple2<String, Integer>>();
        for (java.nio.file.Path path : gcsNIOPaths) {
            if (Files.isDirectory(path, new LinkOption[0])) {
                this.logger.info("skipping directory " + path);
                continue;
            }
            long fileSize = Files.size(path);
            long chunks = fileSize / chunkSize + (long)(fileSize % chunkSize == 0L ? 0 : 1);
            this.logger.info("processing path " + path + ", size = " + fileSize + ", chunks = " + chunks);
            int i = 0;
            while ((long)i < chunks) {
                chunkList.add((Tuple2<String, Integer>)new Tuple2((Object)path.toUri().toString(), (Object)i));
                ++i;
            }
        }
        return chunkList;
    }

    private List<java.nio.file.Path> getGCSFilePathsToCopy(String inputGCSPathFinal, String inputGlob) throws IOException {
        List<java.nio.file.Path> gcsNIOPaths;
        java.nio.file.Path inputGCSNIOPath = IOUtils.getPath(inputGCSPathFinal);
        if (Files.isDirectory(inputGCSNIOPath, new LinkOption[0])) {
            this.logger.info("transferring input directory: " + inputGCSPathFinal);
            gcsNIOPaths = Utils.stream(Files.newDirectoryStream(inputGCSNIOPath, inputGlob)).collect(Collectors.toList());
        } else {
            this.logger.info("transferring single file: " + inputGCSNIOPath);
            if (!INPUT_GLOB_ALL_FILES.equals(inputGlob)) {
                this.logger.warn("Input glob " + inputGlob + " specified, but input argument was not a directory. Ignoring glob.");
            }
            gcsNIOPaths = Collections.singletonList(inputGCSNIOPath);
        }
        return gcsNIOPaths;
    }

    static long getChunkSize(FileSystem fs) {
        return Long.parseLong(fs.getConf().get("dfs.blocksize"));
    }

    private static final Tuple2<Integer, String> readChunkToHdfs(String inputGCSPathFinal, long chunkSize, Integer chunkNum, String outputDirectory) {
        java.nio.file.Path gcsPath = IOUtils.getPath(inputGCSPathFinal);
        String basename = gcsPath.getName(gcsPath.getNameCount() - 1).toString();
        Path outputPath = new Path(outputDirectory);
        String chunkPath = outputPath + "/" + basename + ".chunk." + chunkNum;
        try (SeekableByteChannel channel = Files.newByteChannel(gcsPath, new OpenOption[0]);
             BufferedOutputStream outputStream = new BufferedOutputStream(BucketUtils.createFile(chunkPath));){
            long start = chunkSize * (long)chunkNum.intValue();
            channel.position(start);
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect((int)Math.min(0x4000000L, chunkSize));
            long bytesRead = 0L;
            while (channel.read(byteBuffer) > 0) {
                byteBuffer.flip();
                while (byteBuffer.hasRemaining() && bytesRead < chunkSize) {
                    byte b = byteBuffer.get();
                    ((OutputStream)outputStream).write(b);
                    ++bytesRead;
                }
                if (bytesRead == chunkSize) {
                    break;
                }
                if (bytesRead > chunkSize) {
                    throw new GATKException("Encountered an unknown error condition and read too many bytes; output file may be corrupt");
                }
                byteBuffer.clear();
            }
        }
        catch (IOException e) {
            throw new GATKException(e.getMessage() + "; inputGCSPathFinal = " + inputGCSPathFinal, e);
        }
        return new Tuple2((Object)chunkNum, (Object)chunkPath);
    }
}

