/*
 * Decompiled with CFR 0.152.
 */
package org.broadinstitute.hellbender.tools.spark.sv;

import com.google.common.annotations.VisibleForTesting;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMFlag;
import htsjdk.samtools.SAMReadGroupRecord;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.variant.variantcontext.VariantContext;
import htsjdk.variant.vcf.VCFHeaderLine;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;
import org.broadinstitute.barclay.argparser.Advanced;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.argparser.ArgumentCollection;
import org.broadinstitute.barclay.argparser.BetaFeature;
import org.broadinstitute.barclay.argparser.CommandLineProgramProperties;
import org.broadinstitute.barclay.help.DocumentedFeature;
import org.broadinstitute.hellbender.cmdline.programgroups.StructuralVariantDiscoveryProgramGroup;
import org.broadinstitute.hellbender.engine.spark.GATKSparkTool;
import org.broadinstitute.hellbender.engine.spark.datasources.ReferenceMultiSparkSource;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.tools.spark.sv.StructuralVariationDiscoveryArgumentCollection;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.AnnotatedVariantProducer;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.SvDiscoverFromLocalAssemblyContigAlignmentsSpark;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.SvDiscoveryInputMetaData;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.alignment.AlignedContig;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.alignment.AlignedContigGenerator;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.alignment.AlignmentInterval;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.alignment.ContigAlignmentsModifier;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.inference.ContigChimericAlignmentIterativeInterpreter;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.inference.ImpreciseVariantDetector;
import org.broadinstitute.hellbender.tools.spark.sv.evidence.AlignedAssemblyOrExcuse;
import org.broadinstitute.hellbender.tools.spark.sv.evidence.EvidenceTargetLink;
import org.broadinstitute.hellbender.tools.spark.sv.evidence.FindBreakpointEvidenceSpark;
import org.broadinstitute.hellbender.tools.spark.sv.evidence.ReadMetadata;
import org.broadinstitute.hellbender.tools.spark.sv.utils.CNVInputReader;
import org.broadinstitute.hellbender.tools.spark.sv.utils.PairedStrandedIntervalTree;
import org.broadinstitute.hellbender.tools.spark.sv.utils.SVIntervalTree;
import org.broadinstitute.hellbender.tools.spark.sv.utils.SVUtils;
import org.broadinstitute.hellbender.tools.spark.sv.utils.SVVCFWriter;
import org.broadinstitute.hellbender.utils.SequenceDictionaryUtils;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.bwa.BwaMemAlignment;
import org.broadinstitute.hellbender.utils.bwa.BwaMemAlignmentUtils;
import org.broadinstitute.hellbender.utils.fermi.FermiLiteAssembly;
import org.broadinstitute.hellbender.utils.io.IOUtils;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter;

@DocumentedFeature
@BetaFeature
@CommandLineProgramProperties(oneLineSummary="Runs the structural variation discovery workflow on a single sample", summary="This tool packages the algorithms described in FindBreakpointEvidenceSpark and DiscoverVariantsFromContigAlignmentsSAMSpark as an integrated workflow.  Please consult the descriptions of those tools for more details about the algorithms employed.  In brief, input reads are examined for evidence of structural variation in a genomic region, regions so identified are locally assembled, and the local assemblies are called for structural variation.", programGroup=StructuralVariantDiscoveryProgramGroup.class)
public class StructuralVariationDiscoveryPipelineSpark
extends GATKSparkTool {
    private static final long serialVersionUID = 1L;
    private final Logger localLogger = LogManager.getLogger(StructuralVariationDiscoveryPipelineSpark.class);
    @ArgumentCollection
    private final StructuralVariationDiscoveryArgumentCollection.FindBreakpointEvidenceSparkArgumentCollection evidenceAndAssemblyArgs = new StructuralVariationDiscoveryArgumentCollection.FindBreakpointEvidenceSparkArgumentCollection();
    @ArgumentCollection
    private final StructuralVariationDiscoveryArgumentCollection.DiscoverVariantsFromContigAlignmentsArgumentCollection discoverStageArgs = new StructuralVariationDiscoveryArgumentCollection.DiscoverVariantsFromContigAlignmentsArgumentCollection();
    @Argument(doc="sam file for aligned contigs", fullName="contig-sam-file")
    private String outputAssemblyAlignments;
    @Argument(doc="directory for VCF output, including those from experimental interpretation tool if so requested, will be created if not present; sample name will be appended after the provided argument", shortName="O", fullName="output")
    private String variantsOutDir;
    @Advanced
    @Argument(doc="flag to signal that user wants to run experimental interpretation tool as well", fullName="exp-interpret", optional=true)
    private Boolean expInterpret = false;

    @Override
    public boolean requiresReads() {
        return true;
    }

    @Override
    public boolean requiresReference() {
        return true;
    }

    @Override
    protected void runTool(JavaSparkContext ctx) {
        this.validateParams();
        JavaRDD<GATKRead> unfilteredReads = this.getUnfilteredReads();
        SAMFileHeader headerForReads = this.getHeaderForReads();
        FindBreakpointEvidenceSpark.AssembledEvidenceResults assembledEvidenceResults = FindBreakpointEvidenceSpark.gatherEvidenceAndWriteContigSamFile(ctx, this.evidenceAndAssemblyArgs, headerForReads, unfilteredReads, this.outputAssemblyAlignments, this.localLogger);
        if (assembledEvidenceResults.getAlignedAssemblyOrExcuseList().isEmpty()) {
            return;
        }
        JavaRDD<AlignedContig> parsedAlignments = new InMemoryAlignmentParser(ctx, assembledEvidenceResults.getAlignedAssemblyOrExcuseList(), headerForReads).getAlignedContigs();
        if (parsedAlignments.isEmpty()) {
            return;
        }
        SvDiscoveryInputMetaData svDiscoveryInputMetaData = this.getSvDiscoveryInputData(ctx, headerForReads, assembledEvidenceResults);
        List<VariantContext> assemblyBasedVariants = ContigChimericAlignmentIterativeInterpreter.discoverVariantsFromChimeras(svDiscoveryInputMetaData, parsedAlignments);
        List<VariantContext> annotatedVariants = StructuralVariationDiscoveryPipelineSpark.processEvidenceTargetLinks(assemblyBasedVariants, svDiscoveryInputMetaData);
        String outputPath = svDiscoveryInputMetaData.getOutputPath();
        SAMSequenceDictionary refSeqDictionary = (SAMSequenceDictionary)svDiscoveryInputMetaData.getReferenceData().getReferenceSequenceDictionaryBroadcast().getValue();
        Set<VCFHeaderLine> defaultToolVCFHeaderLines = svDiscoveryInputMetaData.getDefaultToolVCFHeaderLines();
        Logger toolLogger = svDiscoveryInputMetaData.getToolLogger();
        SVVCFWriter.writeVCF(annotatedVariants, outputPath + "inv_del_ins.vcf", refSeqDictionary, defaultToolVCFHeaderLines, toolLogger);
        if (this.expInterpret != null) {
            StructuralVariationDiscoveryPipelineSpark.experimentalInterpretation(ctx, assembledEvidenceResults, svDiscoveryInputMetaData);
        }
    }

    private void validateParams() {
        this.evidenceAndAssemblyArgs.validate();
        this.discoverStageArgs.validate();
        Utils.validate(this.evidenceAndAssemblyArgs.externalEvidenceFile == null || this.discoverStageArgs.cnvCallsFile == null, "Please only specify one of externalEvidenceFile or cnvCallsFile");
        if (this.discoverStageArgs.cnvCallsFile != null) {
            this.evidenceAndAssemblyArgs.externalEvidenceFile = this.discoverStageArgs.cnvCallsFile;
        }
    }

    private SvDiscoveryInputMetaData getSvDiscoveryInputData(JavaSparkContext ctx, SAMFileHeader headerForReads, FindBreakpointEvidenceSpark.AssembledEvidenceResults assembledEvidenceResults) {
        Broadcast<SVIntervalTree<VariantContext>> cnvCallsBroadcast = StructuralVariationDiscoveryPipelineSpark.broadcastCNVCalls(ctx, headerForReads, this.discoverStageArgs.cnvCallsFile);
        try {
            if (!Files.exists(Paths.get(this.variantsOutDir, new String[0]), new LinkOption[0])) {
                IOUtils.createDirectory(this.variantsOutDir);
            }
        }
        catch (IOException ioex) {
            throw new GATKException("Failed to create output directory " + this.variantsOutDir + " though it does not yet exist", ioex);
        }
        String outputPrefixWithSampleName = this.variantsOutDir + (this.variantsOutDir.endsWith("/") ? "" : "/") + SVUtils.getSampleId(headerForReads) + "_";
        return new SvDiscoveryInputMetaData(ctx, this.discoverStageArgs, this.evidenceAndAssemblyArgs.crossContigsToIgnoreFile, outputPrefixWithSampleName, assembledEvidenceResults.getReadMetadata(), assembledEvidenceResults.getAssembledIntervals(), this.makeEvidenceLinkTree(assembledEvidenceResults.getEvidenceTargetLinks()), cnvCallsBroadcast, this.getHeaderForReads(), this.getReference(), this.getDefaultToolVCFHeaderLines(), this.localLogger);
    }

    public static Broadcast<SVIntervalTree<VariantContext>> broadcastCNVCalls(JavaSparkContext ctx, SAMFileHeader header, String cnvCallsFile) {
        SVIntervalTree<VariantContext> cnvCalls = cnvCallsFile != null ? CNVInputReader.loadCNVCalls(cnvCallsFile, header) : null;
        Broadcast broadcastCNVCalls = cnvCalls != null ? ctx.broadcast(cnvCalls) : null;
        return broadcastCNVCalls;
    }

    private PairedStrandedIntervalTree<EvidenceTargetLink> makeEvidenceLinkTree(List<EvidenceTargetLink> evidenceTargetLinks) {
        PairedStrandedIntervalTree<EvidenceTargetLink> evidenceLinkTree;
        if (evidenceTargetLinks != null) {
            evidenceLinkTree = new PairedStrandedIntervalTree<EvidenceTargetLink>();
            evidenceTargetLinks.forEach(l -> evidenceLinkTree.put(l.getPairedStrandedIntervals(), (EvidenceTargetLink)l));
        } else {
            evidenceLinkTree = null;
        }
        return evidenceLinkTree;
    }

    private static void experimentalInterpretation(JavaSparkContext ctx, FindBreakpointEvidenceSpark.AssembledEvidenceResults assembledEvidenceResults, SvDiscoveryInputMetaData svDiscoveryInputMetaData) {
        JavaRDD<GATKRead> assemblyRawAlignments = StructuralVariationDiscoveryPipelineSpark.getContigRawAlignments(ctx, assembledEvidenceResults, svDiscoveryInputMetaData);
        String updatedOutputPath = svDiscoveryInputMetaData.getOutputPath() + "experimentalInterpretation_";
        svDiscoveryInputMetaData.updateOutputPath(updatedOutputPath);
        SvDiscoverFromLocalAssemblyContigAlignmentsSpark.AssemblyContigsClassifiedByAlignmentSignatures contigsByPossibleRawTypes = SvDiscoverFromLocalAssemblyContigAlignmentsSpark.preprocess(svDiscoveryInputMetaData, assemblyRawAlignments);
        List<VariantContext> variants = SvDiscoverFromLocalAssemblyContigAlignmentsSpark.dispatchJobs(ctx, contigsByPossibleRawTypes, svDiscoveryInputMetaData, assemblyRawAlignments, true);
        contigsByPossibleRawTypes.unpersist();
        List<VariantContext> filteredVariants = AnnotatedVariantProducer.filterMergedVariantList(variants, svDiscoveryInputMetaData.getDiscoverStageArgs());
        String out = updatedOutputPath + "merged_simple.vcf";
        SVVCFWriter.writeVCF(filteredVariants, out, (SAMSequenceDictionary)svDiscoveryInputMetaData.getReferenceData().getReferenceSequenceDictionaryBroadcast().getValue(), svDiscoveryInputMetaData.getDefaultToolVCFHeaderLines(), svDiscoveryInputMetaData.getToolLogger());
    }

    private static JavaRDD<GATKRead> getContigRawAlignments(JavaSparkContext ctx, FindBreakpointEvidenceSpark.AssembledEvidenceResults assembledEvidenceResults, SvDiscoveryInputMetaData svDiscoveryInputMetaData) {
        Broadcast<SAMSequenceDictionary> referenceSequenceDictionaryBroadcast = svDiscoveryInputMetaData.getReferenceData().getReferenceSequenceDictionaryBroadcast();
        Broadcast<SAMFileHeader> headerBroadcast = svDiscoveryInputMetaData.getSampleSpecificData().getHeaderBroadcast();
        SAMFileHeader headerForReads = (SAMFileHeader)headerBroadcast.getValue();
        SAMReadGroupRecord contigAlignmentsReadGroup = new SAMReadGroupRecord("GATKSVContigAlignments");
        List<String> refNames = SequenceDictionaryUtils.getContigNamesList((SAMSequenceDictionary)referenceSequenceDictionaryBroadcast.getValue());
        return ctx.parallelize(assembledEvidenceResults.getAlignedAssemblyOrExcuseList().stream().filter(AlignedAssemblyOrExcuse::isNotFailure).flatMap(aa -> aa.toSAMStreamForAlignmentsOfThisAssembly(headerForReads, refNames, contigAlignmentsReadGroup)).map(SAMRecordToGATKReadAdapter::new).collect(Collectors.toList()));
    }

    private static List<VariantContext> processEvidenceTargetLinks(List<VariantContext> assemblyBasedVariants, SvDiscoveryInputMetaData svDiscoveryInputMetaData) {
        List<VariantContext> annotatedVariants;
        if (svDiscoveryInputMetaData.getSampleSpecificData().getEvidenceTargetLinks() != null) {
            PairedStrandedIntervalTree<EvidenceTargetLink> evidenceTargetLinks = svDiscoveryInputMetaData.getSampleSpecificData().getEvidenceTargetLinks();
            ReadMetadata readMetadata = svDiscoveryInputMetaData.getSampleSpecificData().getReadMetadata();
            SAMSequenceDictionary refDict = (SAMSequenceDictionary)svDiscoveryInputMetaData.getReferenceData().getReferenceSequenceDictionaryBroadcast().getValue();
            ReferenceMultiSparkSource reference = (ReferenceMultiSparkSource)svDiscoveryInputMetaData.getReferenceData().getReferenceBroadcast().getValue();
            StructuralVariationDiscoveryArgumentCollection.DiscoverVariantsFromContigAlignmentsArgumentCollection discoverStageArgs = svDiscoveryInputMetaData.getDiscoverStageArgs();
            Logger toolLogger = svDiscoveryInputMetaData.getToolLogger();
            annotatedVariants = AnnotatedVariantProducer.annotateBreakpointBasedCallsWithImpreciseEvidenceLinks(assemblyBasedVariants, evidenceTargetLinks, readMetadata, refDict, discoverStageArgs, toolLogger);
            List<VariantContext> impreciseVariants = ImpreciseVariantDetector.callImpreciseDeletionFromEvidenceLinks(evidenceTargetLinks, readMetadata, reference, discoverStageArgs.impreciseVariantEvidenceThreshold, discoverStageArgs.maxCallableImpreciseVariantDeletionSize, toolLogger);
            annotatedVariants.addAll(impreciseVariants);
        } else {
            annotatedVariants = assemblyBasedVariants;
        }
        return annotatedVariants;
    }

    public static final class InMemoryAlignmentParser
    extends AlignedContigGenerator
    implements scala.Serializable {
        private static final long serialVersionUID = 1L;
        private final JavaSparkContext ctx;
        private final List<AlignedAssemblyOrExcuse> alignedAssemblyOrExcuseList;
        private final SAMFileHeader header;

        InMemoryAlignmentParser(JavaSparkContext ctx, List<AlignedAssemblyOrExcuse> alignedAssemblyOrExcuseList, SAMFileHeader header) {
            this.ctx = ctx;
            this.alignedAssemblyOrExcuseList = alignedAssemblyOrExcuseList;
            this.header = header;
        }

        @VisibleForTesting
        public static JavaRDD<AlignedContig> filterAndConvertToAlignedContigViaSAM(List<AlignedAssemblyOrExcuse> alignedAssemblyOrExcuseList, SAMFileHeader header, JavaSparkContext ctx) {
            SAMFileHeader cleanHeader = new SAMFileHeader(header.getSequenceDictionary());
            List<String> refNames = SequenceDictionaryUtils.getContigNamesList(header.getSequenceDictionary());
            return ctx.parallelize(alignedAssemblyOrExcuseList).filter(AlignedAssemblyOrExcuse::isNotFailure).flatMap((FlatMapFunction & Serializable)alignedAssemblyNoExcuse -> {
                FermiLiteAssembly assembly = alignedAssemblyNoExcuse.getAssembly();
                int assemblyId = alignedAssemblyNoExcuse.getAssemblyId();
                List<List<BwaMemAlignment>> allAlignmentsOfThisAssembly = alignedAssemblyNoExcuse.getContigAlignments();
                int nContigs = assembly.getNContigs();
                return IntStream.range(0, nContigs).mapToObj(contigIdx -> BwaMemAlignmentUtils.toSAMStreamForRead(AlignedAssemblyOrExcuse.formatContigName(assemblyId, contigIdx), assembly.getContig(contigIdx).getSequence(), (List)allAlignmentsOfThisAssembly.get(contigIdx), cleanHeader, refNames, new SAMReadGroupRecord("GATKSVContigAlignments"))).iterator();
            }).map((Function & Serializable)forOneContig -> forOneContig.filter(sam -> !sam.getReadUnmappedFlag() && !sam.isSecondaryAlignment()).collect(Collectors.toList())).filter((Function & Serializable)list -> !list.isEmpty()).map((Function & Serializable)forOneContig -> AlignedContig.parseReadsAndOptionallySplitGappedAlignments(forOneContig, 50, true));
        }

        @VisibleForTesting
        public static List<AlignedContig> filterAndConvertToAlignedContigDirect(Iterable<AlignedAssemblyOrExcuse> alignedAssemblyOrExcuseIterable, List<String> refNames, SAMFileHeader header) {
            return Utils.stream(alignedAssemblyOrExcuseIterable).filter(AlignedAssemblyOrExcuse::isNotFailure).map(alignedAssembly -> InMemoryAlignmentParser.getAlignedContigsInOneAssembly(alignedAssembly, refNames, header)).flatMap(Utils::stream).filter(contig -> !contig.getAlignments().isEmpty()).collect(Collectors.toList());
        }

        @VisibleForTesting
        public static Iterable<AlignedContig> getAlignedContigsInOneAssembly(AlignedAssemblyOrExcuse alignedAssembly, List<String> refNames, SAMFileHeader header) {
            FermiLiteAssembly assembly = alignedAssembly.getAssembly();
            List<List<BwaMemAlignment>> allAlignments = alignedAssembly.getContigAlignments();
            return IntStream.range(0, assembly.getNContigs()).mapToObj(contigIdx -> {
                byte[] contigSequence = assembly.getContig(contigIdx).getSequence();
                String contigName = AlignedAssemblyOrExcuse.formatContigName(alignedAssembly.getAssemblyId(), contigIdx);
                List<AlignmentInterval> alignmentsForOneContig = InMemoryAlignmentParser.getAlignmentsForOneContig(contigName, contigSequence, (List)allAlignments.get(contigIdx), refNames, header);
                return new AlignedContig(contigName, contigSequence, alignmentsForOneContig);
            }).collect(Collectors.toList());
        }

        @VisibleForTesting
        private static List<AlignmentInterval> getAlignmentsForOneContig(String contigName, byte[] contigSequence, List<BwaMemAlignment> contigAlignments, List<String> refNames, SAMFileHeader header) {
            return contigAlignments.stream().filter(bwaMemAlignment -> bwaMemAlignment.getRefId() >= 0 && SAMFlag.SECONDARY_ALIGNMENT.isUnset(bwaMemAlignment.getSamFlag())).map(bwaMemAlignment -> BwaMemAlignmentUtils.applyAlignment(contigName, contigSequence, null, null, bwaMemAlignment, refNames, header, false, false)).map(AlignmentInterval::new).map(ar -> ContigAlignmentsModifier.splitGappedAlignment(ar, 50, contigSequence.length)).flatMap(Utils::stream).collect(Collectors.toList());
        }

        @Override
        public JavaRDD<AlignedContig> getAlignedContigs() {
            return InMemoryAlignmentParser.filterAndConvertToAlignedContigViaSAM(this.alignedAssemblyOrExcuseList, this.header, this.ctx);
        }
    }
}

