/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.cli;

import java.io.Serializable;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.SparkHelper;
import org.apache.hudi.cli.SparkHelpers$;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Iterable$;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashSet;
import scala.collection.mutable.HashSet$;
import scala.collection.mutable.Set;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;

@ScalaSignature(bytes="\u0006\u0001\u0005ma\u0001B\b\u0011\u0001eA\u0001\u0002\t\u0001\u0003\u0002\u0003\u0006I!\t\u0005\tY\u0001\u0011\t\u0011)A\u0005C!AQ\u0006\u0001B\u0001B\u0003%\u0011\u0005\u0003\u0005/\u0001\t\u0005\t\u0015!\u00030\u0011!9\u0004A!A!\u0002\u0013A\u0004\"B \u0001\t\u0003\u0001\u0005b\u0002%\u0001\u0005\u0004%\t!\u0013\u0005\u0007\u001b\u0002\u0001\u000b\u0011\u0002&\t\u000f9\u0003!\u0019!C\u0001\u001f\"1a\u000b\u0001Q\u0001\nACQa\u0016\u0001\u0005\u0002aCQA\u001b\u0001\u0005\n-DQa\u001e\u0001\u0005\u0002aD\u0011\"a\u0001\u0001#\u0003%\t!!\u0002\u0003\u001d\u0011+G-\u001e9f'B\f'o\u001b&pE*\u0011\u0011CE\u0001\u0004G2L'BA\n\u0015\u0003\u0011AW\u000fZ5\u000b\u0005U1\u0012AB1qC\u000eDWMC\u0001\u0018\u0003\ry'oZ\u0002\u0001'\t\u0001!\u0004\u0005\u0002\u001c=5\tADC\u0001\u001e\u0003\u0015\u00198-\u00197b\u0013\tyBD\u0001\u0004B]f\u0014VMZ\u0001\tE\u0006\u001cX\rU1uQB\u0011!%\u000b\b\u0003G\u001d\u0002\"\u0001\n\u000f\u000e\u0003\u0015R!A\n\r\u0002\rq\u0012xn\u001c;?\u0013\tAC$\u0001\u0004Qe\u0016$WMZ\u0005\u0003U-\u0012aa\u0015;sS:<'B\u0001\u0015\u001d\u0003]!W\u000f\u001d7jG\u0006$X\r\u001a)beRLG/[8o!\u0006$\b.\u0001\tsKB\f\u0017N](viB,H\u000fU1uQ\u0006Q1/\u001d7D_:$X\r\u001f;\u0011\u0005A*T\"A\u0019\u000b\u0005I\u001a\u0014aA:rY*\u0011A\u0007F\u0001\u0006gB\f'o[\u0005\u0003mE\u0012!bU)M\u0007>tG/\u001a=u\u0003\t17\u000f\u0005\u0002:{5\t!H\u0003\u00028w)\u0011A\bF\u0001\u0007Q\u0006$wn\u001c9\n\u0005yR$A\u0003$jY\u0016\u001c\u0016p\u001d;f[\u00061A(\u001b8jiz\"b!Q\"E\u000b\u001a;\u0005C\u0001\"\u0001\u001b\u0005\u0001\u0002\"\u0002\u0011\u0007\u0001\u0004\t\u0003\"\u0002\u0017\u0007\u0001\u0004\t\u0003\"B\u0017\u0007\u0001\u0004\t\u0003\"\u0002\u0018\u0007\u0001\u0004y\u0003\"B\u001c\u0007\u0001\u0004A\u0014aC:qCJ\\\u0007*\u001a7qKJ,\u0012A\u0013\t\u0003\u0005.K!\u0001\u0014\t\u0003\u0017M\u0003\u0018M]6IK2\u0004XM]\u0001\rgB\f'o\u001b%fYB,'\u000fI\u0001\u0004\u0019>;U#\u0001)\u0011\u0005E#V\"\u0001*\u000b\u0005M#\u0012!\u00027pORR\u0017BA+S\u0005\u0019aunZ4fe\u0006!AjT$!\u000319W\r\u001e#va\u0016\\U-\u001f#G)\tI\u0006\u000e\u0005\u0002[K:\u00111l\u0019\b\u00039\nt!!X1\u000f\u0005y\u0003gB\u0001\u0013`\u0013\u00059\u0012BA\u000b\u0017\u0013\t!D#\u0003\u00023g%\u0011A-M\u0001\ba\u0006\u001c7.Y4f\u0013\t1wMA\u0005ECR\fgI]1nK*\u0011A-\r\u0005\u0006S.\u0001\r!I\u0001\bi\ndg*Y7f\u0003A\u0001H.\u00198EkBd\u0017nY1uK\u001aK\u0007\u0010F\u0001m!\u0011i'/\t;\u000e\u00039T!a\u001c9\u0002\u000f5,H/\u00192mK*\u0011\u0011\u000fH\u0001\u000bG>dG.Z2uS>t\u0017BA:o\u0005\u001dA\u0015m\u001d5NCB\u00042!\\;\"\u0013\t1hNA\u0004ICND7+\u001a;\u0002\u001b\u0019L\u0007\u0010R;qY&\u001c\u0017\r^3t)\tIH\u0010\u0005\u0002\u001cu&\u00111\u0010\b\u0002\u0005+:LG\u000fC\u0004~\u001bA\u0005\t\u0019\u0001@\u0002\r\u0011\u0014\u0018PU;o!\tYr0C\u0002\u0002\u0002q\u0011qAQ8pY\u0016\fg.A\fgSb$U\u000f\u001d7jG\u0006$Xm\u001d\u0013eK\u001a\fW\u000f\u001c;%cU\u0011\u0011q\u0001\u0016\u0004}\u0006%1FAA\u0006!\u0011\ti!a\u0006\u000e\u0005\u0005=!\u0002BA\t\u0003'\t\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005UA$\u0001\u0006b]:|G/\u0019;j_:LA!!\u0007\u0002\u0010\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3")
public class DedupeSparkJob {
    private final String basePath;
    private final String duplicatedPartitionPath;
    private final String repairOutputPath;
    private final SQLContext sqlContext;
    private final FileSystem fs;
    private final SparkHelper sparkHelper;
    private final Logger LOG;

    public SparkHelper sparkHelper() {
        return this.sparkHelper;
    }

    public Logger LOG() {
        return this.LOG;
    }

    public Dataset<Row> getDupeKeyDF(String tblName) {
        String dupeSql = new StringBuilder(120).append("\n      select  `").append(HoodieRecord.RECORD_KEY_METADATA_FIELD).append("` as dupe_key,\n      count(*) as dupe_cnt\n      from ").append(tblName).append("\n      group by `").append(HoodieRecord.RECORD_KEY_METADATA_FIELD).append("`\n      having dupe_cnt > 1\n      ").toString();
        return this.sqlContext.sql(dupeSql);
    }

    private HashMap<String, HashSet<String>> planDuplicateFix() {
        String tmpTableName = new StringBuilder(5).append("htbl_").append(System.currentTimeMillis()).toString();
        String dedupeTblName = new StringBuilder(9).append(tmpTableName).append("_dupeKeys").toString();
        HoodieTableMetaClient metadata = new HoodieTableMetaClient(this.fs.getConf(), this.basePath);
        FileStatus[] allFiles = this.fs.listStatus(new Path(new StringBuilder(1).append(this.basePath).append("/").append(this.duplicatedPartitionPath).toString()));
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), allFiles);
        java.util.List latestFiles = fsView.getLatestBaseFiles().collect(Collectors.toList());
        Buffer filteredStatuses = (Buffer)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(latestFiles).map((Function1 & Serializable & scala.Serializable)f -> f.getPath(), Buffer$.MODULE$.canBuildFrom());
        this.LOG().info((Object)new StringBuilder(37).append(" List of files under partition: ").append(BoxedUnit.UNIT).append(" =>  ").append(filteredStatuses.mkString(" ")).toString());
        Dataset df = this.sqlContext.parquetFile((Seq)filteredStatuses);
        df.registerTempTable(tmpTableName);
        Dataset<Row> dupeKeyDF = this.getDupeKeyDF(tmpTableName);
        dupeKeyDF.registerTempTable(dedupeTblName);
        String dupeDataSql = new StringBuilder(208).append("\n        SELECT `_hoodie_record_key`, `_hoodie_partition_path`, `_hoodie_file_name`, `_hoodie_commit_time`\n        FROM ").append(tmpTableName).append(" h\n        JOIN ").append(dedupeTblName).append(" d\n        ON h.`_hoodie_record_key` = d.dupe_key\n                      ").toString();
        Map dupeMap = JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(this.sqlContext.sql(dupeDataSql).collectAsList()).groupBy((Function1 & Serializable & scala.Serializable)r -> r.getString(0));
        HashMap fileToDeleteKeyMap = new HashMap();
        dupeMap.foreach((Function1 & Serializable & scala.Serializable)rt -> {
            DedupeSparkJob.$anonfun$planDuplicateFix$3(fileToDeleteKeyMap, rt);
            return BoxedUnit.UNIT;
        });
        return fileToDeleteKeyMap;
    }

    public void fixDuplicates(boolean dryRun) {
        Dataset<Row> fixedDF;
        HoodieTableMetaClient metadata = new HoodieTableMetaClient(this.fs.getConf(), this.basePath);
        FileStatus[] allFiles = this.fs.listStatus(new Path(new StringBuilder(1).append(this.basePath).append("/").append(this.duplicatedPartitionPath).toString()));
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), allFiles);
        java.util.List latestFiles = fsView.getLatestBaseFiles().collect(Collectors.toList());
        Map fileNameToPathMap = ((TraversableOnce)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(latestFiles).map((Function1 & Serializable & scala.Serializable)f -> new Tuple2((Object)f.getFileId(), (Object)new Path(f.getPath())), Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        HashMap<String, HashSet<String>> dupeFixPlan = this.planDuplicateFix();
        fileNameToPathMap.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)DedupeSparkJob.$anonfun$fixDuplicates$2(this, dupeFixPlan, x0$1)));
        dupeFixPlan.foreach((Function1 & Serializable & scala.Serializable)x0$2 -> BoxesRunTime.boxToBoolean((boolean)DedupeSparkJob.$anonfun$fixDuplicates$3(this, fileNameToPathMap, dupeFixPlan, x0$2)));
        Dataset df = this.sqlContext.read().parquet(new StringBuilder(10).append(this.repairOutputPath).append("/*.parquet").toString());
        df.registerTempTable("fixedTbl");
        Dataset<Row> dupeKeyDF = this.getDupeKeyDF("fixedTbl");
        long dupeCnt = dupeKeyDF.count();
        if (dupeCnt != 0L) {
            dupeKeyDF.show();
            throw new HoodieException("Still found some duplicates!!.. Inspect output");
        }
        Dataset<Row> sourceDF = this.sparkHelper().getDistinctKeyDF((List<String>)((TraversableOnce)fileNameToPathMap.map((Function1 & Serializable & scala.Serializable)t -> ((Path)t._2()).toString(), Iterable$.MODULE$.canBuildFrom())).toList());
        Dataset missedRecordKeysDF = sourceDF.except(fixedDF = this.sparkHelper().getDistinctKeyDF((List<String>)((TraversableOnce)fileNameToPathMap.map((Function1 & Serializable & scala.Serializable)t -> new StringBuilder(1).append($this.repairOutputPath).append("/").append(((Path)t._2()).getName()).toString(), Iterable$.MODULE$.canBuildFrom())).toList()));
        long missedCnt = missedRecordKeysDF.count();
        if (missedCnt != 0L) {
            missedRecordKeysDF.show();
            throw new HoodieException("Some records in source are not found in fixed files. Inspect output!!");
        }
        Predef$.MODULE$.println((Object)"No duplicates found & counts are in check!!!! ");
        fileNameToPathMap.foreach((Function1 & Serializable & scala.Serializable)x0$3 -> {
            Object object;
            Tuple2 tuple2 = x0$3;
            if (tuple2 != null) {
                Path filePath = (Path)tuple2._2();
                Path srcPath = new Path(new StringBuilder(1).append($this.repairOutputPath).append("/").append(filePath.getName()).toString());
                Path dstPath = new Path(new StringBuilder(2).append($this.basePath).append("/").append($this.duplicatedPartitionPath).append("/").append(filePath.getName()).toString());
                if (dryRun) {
                    this.LOG().info((Object)new StringBuilder(35).append("[JUST KIDDING!!!] Copying from ").append(srcPath).append(" to ").append(dstPath).toString());
                    object = BoxedUnit.UNIT;
                } else {
                    this.LOG().info((Object)new StringBuilder(31).append("[FOR REAL!!!] Copying from ").append(srcPath).append(" to ").append(dstPath).toString());
                    object = BoxesRunTime.boxToBoolean((boolean)FileUtil.copy((FileSystem)$this.fs, (Path)srcPath, (FileSystem)$this.fs, (Path)dstPath, (boolean)false, (boolean)true, (Configuration)$this.fs.getConf()));
                }
            } else {
                throw new MatchError((Object)tuple2);
            }
            BoxedUnit boxedUnit = object;
            return boxedUnit;
        });
    }

    public boolean fixDuplicates$default$1() {
        return true;
    }

    public static final /* synthetic */ void $anonfun$planDuplicateFix$4(LongRef maxCommit$1, Row r) {
        block0: {
            long c = new StringOps(Predef$.MODULE$.augmentString((String)r.apply(3))).toLong();
            if (c <= maxCommit$1.elem) break block0;
            maxCommit$1.elem = c;
        }
    }

    public static final /* synthetic */ void $anonfun$planDuplicateFix$3(HashMap fileToDeleteKeyMap$1, Tuple2 rt) {
        Tuple2 tuple2 = rt;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        String key = (String)tuple2._1();
        Buffer rows = (Buffer)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)key, (Object)rows);
        Tuple2 tuple23 = tuple22;
        String key2 = (String)tuple23._1();
        Buffer rows2 = (Buffer)tuple23._2();
        LongRef maxCommit = LongRef.create((long)-1L);
        rows2.foreach((Function1 & Serializable & scala.Serializable)r -> {
            DedupeSparkJob.$anonfun$planDuplicateFix$4(maxCommit, r);
            return BoxedUnit.UNIT;
        });
        rows2.foreach((Function1 & Serializable & scala.Serializable)r -> {
            Object object;
            long c = new StringOps(Predef$.MODULE$.augmentString((String)r.apply(3))).toLong();
            if (c != maxCommit$1.elem) {
                String f = ((String)r.apply(2)).split("_")[0];
                if (!fileToDeleteKeyMap$1.contains((Object)f)) {
                    fileToDeleteKeyMap$1.update((Object)f, (Object)HashSet$.MODULE$.apply((Seq)Nil$.MODULE$));
                }
                object = BoxesRunTime.boxToBoolean((boolean)((HashSet)fileToDeleteKeyMap$1.apply((Object)f)).add((Object)key2));
            } else {
                object = BoxedUnit.UNIT;
            }
            return object;
        });
    }

    public static final /* synthetic */ boolean $anonfun$fixDuplicates$2(DedupeSparkJob $this, HashMap dupeFixPlan$1, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        String fileName = (String)tuple2._1();
        Path filePath = (Path)tuple2._2();
        String badSuffix = dupeFixPlan$1.contains((Object)fileName) ? ".bad" : "";
        Path dstPath = new Path(new StringBuilder(1).append($this.repairOutputPath).append("/").append(filePath.getName()).append(badSuffix).toString());
        $this.LOG().info((Object)new StringBuilder(17).append("Copying from ").append(filePath).append(" to ").append(dstPath).toString());
        boolean bl = FileUtil.copy((FileSystem)$this.fs, (Path)filePath, (FileSystem)$this.fs, (Path)dstPath, (boolean)false, (boolean)true, (Configuration)$this.fs.getConf());
        return bl;
    }

    public static final /* synthetic */ boolean $anonfun$fixDuplicates$3(DedupeSparkJob $this, Map fileNameToPathMap$1, HashMap dupeFixPlan$1, Tuple2 x0$2) {
        Tuple2 tuple2 = x0$2;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        String fileName = (String)tuple2._1();
        String commitTime = FSUtils.getCommitTime((String)((Path)fileNameToPathMap$1.apply((Object)fileName)).getName());
        Path badFilePath = new Path(new StringBuilder(5).append($this.repairOutputPath).append("/").append(((Path)fileNameToPathMap$1.apply((Object)fileName)).getName()).append(".bad").toString());
        Path newFilePath = new Path(new StringBuilder(1).append($this.repairOutputPath).append("/").append(((Path)fileNameToPathMap$1.apply((Object)fileName)).getName()).toString());
        $this.LOG().info((Object)new StringBuilder(37).append(" Skipping and writing new file for : ").append(fileName).toString());
        SparkHelpers$.MODULE$.skipKeysAndWriteNewFile(commitTime, $this.fs, badFilePath, newFilePath, (Set<String>)((Set)dupeFixPlan$1.apply((Object)fileName)));
        boolean bl = $this.fs.delete(badFilePath, false);
        return bl;
    }

    public DedupeSparkJob(String basePath, String duplicatedPartitionPath, String repairOutputPath, SQLContext sqlContext, FileSystem fs) {
        this.basePath = basePath;
        this.duplicatedPartitionPath = duplicatedPartitionPath;
        this.repairOutputPath = repairOutputPath;
        this.sqlContext = sqlContext;
        this.fs = fs;
        this.sparkHelper = new SparkHelper(sqlContext, fs);
        this.LOG = Logger.getLogger(this.getClass());
    }
}

