/*
 * Decompiled with CFR 0.152.
 */
package org.broadinstitute.hellbender.engine.spark.datasources;

import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.SAMSequenceDictionary;
import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.parquet.avro.AvroParquetOutputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.bdgenomics.adam.models.ReadGroupDictionary;
import org.bdgenomics.adam.models.SequenceDictionary;
import org.bdgenomics.formats.avro.AlignmentRecord;
import org.broadinstitute.hellbender.engine.GATKPath;
import org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSource;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.gcs.BucketUtils;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.GATKReadToBDGAlignmentRecordConverter;
import org.broadinstitute.hellbender.utils.read.HeaderlessSAMRecordCoordinateComparator;
import org.broadinstitute.hellbender.utils.read.ReadsWriteFormat;
import org.broadinstitute.hellbender.utils.spark.SparkUtils;
import org.disq_bio.disq.BaiWriteOption;
import org.disq_bio.disq.FileCardinalityWriteOption;
import org.disq_bio.disq.HtsjdkReadsRdd;
import org.disq_bio.disq.HtsjdkReadsRddStorage;
import org.disq_bio.disq.ReadsFormatWriteOption;
import org.disq_bio.disq.SbiWriteOption;
import org.disq_bio.disq.TempPartsDirectoryWriteOption;
import org.disq_bio.disq.WriteOption;
import scala.Tuple2;

public final class ReadsSparkSink {
    private static final Logger logger = LogManager.getLogger(ReadsSparkSink.class);

    public static void writeReads(JavaSparkContext ctx, String outputFile, GATKPath referencePathSpecifier, JavaRDD<GATKRead> reads, SAMFileHeader header, ReadsWriteFormat format) throws IOException {
        ReadsSparkSink.writeReads(ctx, outputFile, referencePathSpecifier, reads, header, format, 0, null, true, 4096L);
    }

    public static void writeReads(JavaSparkContext ctx, String outputFile, GATKPath referencePathSpecifier, JavaRDD<GATKRead> reads, SAMFileHeader header, ReadsWriteFormat format, int numReducers, String outputPartsDir, boolean sortReadsToHeader, long splittingIndexGranularity) throws IOException {
        ReadsSparkSink.writeReads(ctx, outputFile, referencePathSpecifier, reads, header, format, numReducers, outputPartsDir, true, true, sortReadsToHeader, splittingIndexGranularity);
    }

    public static void writeReads(JavaSparkContext ctx, String outputFile, GATKPath referencePathSpecifier, JavaRDD<GATKRead> reads, SAMFileHeader header, ReadsWriteFormat format, int numReducers, String outputPartsDir, boolean writeBai, boolean writeSbi, boolean sortReadsToHeader, long splittingIndexGranularity) throws IOException {
        JavaRDD<SAMRecord> readsToOutput;
        String absoluteOutputFile = BucketUtils.makeFilePathAbsolute(outputFile);
        ReadsSparkSource.checkCramReference(ctx, new GATKPath(absoluteOutputFile), referencePathSpecifier);
        JavaRDD<SAMRecord> samReads = reads.map((Function & Serializable)read -> read.convertToSAMRecord(null));
        JavaRDD<SAMRecord> javaRDD = readsToOutput = sortReadsToHeader ? ReadsSparkSink.sortSamRecordsToMatchHeader(samReads, header, numReducers) : samReads;
        if (format == ReadsWriteFormat.SINGLE) {
            FileCardinalityWriteOption fileCardinalityWriteOption = FileCardinalityWriteOption.SINGLE;
            String outputPartsDirectory = outputPartsDir == null ? ReadsSparkSink.getDefaultPartsDirectory(outputFile) : outputPartsDir;
            TempPartsDirectoryWriteOption tempPartsDirectoryWriteOption = new TempPartsDirectoryWriteOption(outputPartsDirectory);
            BaiWriteOption baiWriteOption = BaiWriteOption.fromBoolean((boolean)writeBai);
            SbiWriteOption sbiWriteOption = SbiWriteOption.fromBoolean((boolean)writeSbi);
            if (absoluteOutputFile.endsWith(".bam") || absoluteOutputFile.endsWith(".cram") || absoluteOutputFile.endsWith(".sam")) {
                ReadsSparkSink.writeReads(ctx, absoluteOutputFile, referencePathSpecifier, readsToOutput, header, splittingIndexGranularity, new WriteOption[]{fileCardinalityWriteOption, tempPartsDirectoryWriteOption, baiWriteOption, sbiWriteOption});
            } else {
                ReadsFormatWriteOption formatWriteOption = ReadsFormatWriteOption.BAM;
                ReadsSparkSink.writeReads(ctx, absoluteOutputFile, referencePathSpecifier, readsToOutput, header, splittingIndexGranularity, new WriteOption[]{formatWriteOption, fileCardinalityWriteOption, tempPartsDirectoryWriteOption, baiWriteOption, sbiWriteOption});
            }
        } else if (format == ReadsWriteFormat.SHARDED) {
            if (outputPartsDir != null) {
                throw new GATKException(String.format("You specified the bam output parts directory %s, but requested a sharded output format which does not use this option", outputPartsDir));
            }
            ReadsFormatWriteOption formatWriteOption = ReadsFormatWriteOption.BAM;
            FileCardinalityWriteOption fileCardinalityWriteOption = FileCardinalityWriteOption.MULTIPLE;
            ReadsSparkSink.writeReads(ctx, absoluteOutputFile, referencePathSpecifier, readsToOutput, header, splittingIndexGranularity, new WriteOption[]{formatWriteOption, fileCardinalityWriteOption});
        } else if (format == ReadsWriteFormat.ADAM) {
            if (outputPartsDir != null) {
                throw new GATKException(String.format("You specified the bam output parts directory %s, but requested an ADAM output format which does not use this option", outputPartsDir));
            }
            ReadsSparkSink.writeReadsADAM(ctx, absoluteOutputFile, readsToOutput, header);
        }
    }

    private static void writeReads(JavaSparkContext ctx, String outputFile, GATKPath referencePathSpecifier, JavaRDD<SAMRecord> reads, SAMFileHeader header, long sbiIndexGranularity, WriteOption ... writeOptions) throws IOException {
        Broadcast headerBroadcast = ctx.broadcast((Object)header);
        JavaRDD sortedReadsWithHeader = reads.map((Function & Serializable)read -> {
            read.setHeaderStrict((SAMFileHeader)headerBroadcast.getValue());
            return read;
        });
        HtsjdkReadsRdd htsjdkReadsRdd = new HtsjdkReadsRdd(header, sortedReadsWithHeader);
        HtsjdkReadsRddStorage.makeDefault((JavaSparkContext)ctx).referenceSourcePath(referencePathSpecifier == null ? null : referencePathSpecifier.getRawInputString()).sbiIndexGranularity(sbiIndexGranularity).write(htsjdkReadsRdd, outputFile, writeOptions);
    }

    private static void writeReadsADAM(JavaSparkContext ctx, String outputFile, JavaRDD<SAMRecord> reads, SAMFileHeader header) throws IOException {
        SequenceDictionary seqDict = SequenceDictionary.fromSAMSequenceDictionary((SAMSequenceDictionary)header.getSequenceDictionary());
        ReadGroupDictionary readGroups = ReadGroupDictionary.fromSAMHeader((SAMFileHeader)header);
        JavaPairRDD rddAlignmentRecords = reads.map((Function & Serializable)read -> {
            read.setHeaderStrict(header);
            AlignmentRecord alignmentRecord = GATKReadToBDGAlignmentRecordConverter.convert(read, seqDict, readGroups);
            read.setHeaderStrict(null);
            return alignmentRecord;
        }).mapToPair((PairFunction & Serializable)alignmentRecord -> new Tuple2(null, alignmentRecord));
        Job job = Job.getInstance((Configuration)ctx.hadoopConfiguration());
        AvroParquetOutputFormat.setSchema((Job)job, (Schema)AlignmentRecord.getClassSchema());
        ReadsSparkSink.deleteHadoopFile(outputFile, ctx.hadoopConfiguration());
        rddAlignmentRecords.saveAsNewAPIHadoopFile(outputFile, Void.class, AlignmentRecord.class, AvroParquetOutputFormat.class, job.getConfiguration());
    }

    private static void deleteHadoopFile(String fileToObliterate, Configuration conf) throws IOException {
        Path pathToDelete = new Path(fileToObliterate);
        pathToDelete.getFileSystem(conf).delete(pathToDelete, true);
    }

    public static String getDefaultPartsDirectory(String file) {
        return file + ".parts/";
    }

    private static JavaRDD<SAMRecord> sortSamRecordsToMatchHeader(JavaRDD<SAMRecord> reads, SAMFileHeader header, int numReducers) {
        Comparator<SAMRecord> comparator = ReadsSparkSink.getSAMRecordComparator(header);
        if (comparator == null) {
            return reads;
        }
        return SparkUtils.sortUsingElementsAsKeys(reads, comparator, numReducers);
    }

    private static Comparator<SAMRecord> getSAMRecordComparator(SAMFileHeader header) {
        switch (header.getSortOrder()) {
            case coordinate: {
                return new HeaderlessSAMRecordCoordinateComparator(header);
            }
            case duplicate: {
                throw new UserException.UnimplementedFeature("The sort order \"duplicate\" is not supported in Spark.");
            }
            case queryname: 
            case unsorted: {
                return header.getSortOrder().getComparatorInstance();
            }
        }
        return null;
    }
}

