/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.scheduler;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.internal.LogEntry;
import org.apache.spark.internal.LogEntry$;
import org.apache.spark.internal.LogKey;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.MDC;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.scheduler.AllocatedBlocks;
import org.apache.spark.streaming.scheduler.BatchAllocationEvent;
import org.apache.spark.streaming.scheduler.BatchCleanupEvent;
import org.apache.spark.streaming.scheduler.BlockAdditionEvent;
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo;
import org.apache.spark.streaming.scheduler.ReceivedBlockTracker$;
import org.apache.spark.streaming.scheduler.ReceivedBlockTrackerLogEvent;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.streaming.util.WriteAheadLogUtils$;
import org.apache.spark.util.Clock;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction1;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0005\u0005\u001dg!B\u0012%\u0001\u0019r\u0003\u0002C\u001e\u0001\u0005\u0003\u0005\u000b\u0011B\u001f\t\u0011\u0005\u0003!\u0011!Q\u0001\n\tC\u0001\"\u0013\u0001\u0003\u0002\u0003\u0006IA\u0013\u0005\t3\u0002\u0011\t\u0011)A\u00055\"A\u0001\r\u0001B\u0001B\u0003%\u0011\r\u0003\u0005e\u0001\t\u0005\t\u0015!\u0003f\u0011\u0015\u0001\b\u0001\"\u0001r\u000b\u0011Q\b\u0001B>\t\u0013\u00055\u0001A1A\u0005\n\u0005=\u0001\u0002CA\u000e\u0001\u0001\u0006I!!\u0005\t\u0013\u0005u\u0001A1A\u0005\n\u0005}\u0001\u0002CA\u0019\u0001\u0001\u0006I!!\t\t\u0013\u0005M\u0002A1A\u0005\n\u0005U\u0002\u0002CA\"\u0001\u0001\u0006I!a\u000e\t\u0013\u0005\u0015\u0003\u00011A\u0005\n\u0005\u001d\u0003\"CA%\u0001\u0001\u0007I\u0011BA&\u0011!\t9\u0006\u0001Q!\n\u0005\r\u0002bBA-\u0001\u0011\u0005\u00111\f\u0005\b\u0003C\u0002A\u0011AA2\u0011\u001d\tI\u0007\u0001C\u0001\u0003WBq!a\u001e\u0001\t\u0003\tI\bC\u0004\u0002\u0002\u0002!\t!a!\t\u000f\u0005\u0015\u0005\u0001\"\u0001\u0002\b\"9\u00111\u0012\u0001\u0005\u0002\u00055\u0005bBAL\u0001\u0011\u0005\u0011\u0011\u0014\u0005\b\u00037\u0003A\u0011BAM\u0011!\ti\n\u0001C\u0001M\u0005}\u0005bBAV\u0001\u0011%\u0011Q\u0016\u0005\b\u0003c\u0003A\u0011BAZ\u0011!\t)\f\u0001C\u0001M\u0005\ru\u0001CA\\I!\u0005a%!/\u0007\u000f\r\"\u0003\u0012\u0001\u0014\u0002<\"1\u0001\u000f\tC\u0001\u0003{Cq!a0!\t\u0003\t\tM\u0001\u000bSK\u000e,\u0017N^3e\u00052|7m\u001b+sC\u000e\\WM\u001d\u0006\u0003K\u0019\n\u0011b]2iK\u0012,H.\u001a:\u000b\u0005\u001dB\u0013!C:ue\u0016\fW.\u001b8h\u0015\tI#&A\u0003ta\u0006\u00148N\u0003\u0002,Y\u00051\u0011\r]1dQ\u0016T\u0011!L\u0001\u0004_J<7c\u0001\u00010kA\u0011\u0001gM\u0007\u0002c)\t!'A\u0003tG\u0006d\u0017-\u0003\u00025c\t1\u0011I\\=SK\u001a\u0004\"AN\u001d\u000e\u0003]R!\u0001\u000f\u0015\u0002\u0011%tG/\u001a:oC2L!AO\u001c\u0003\u000f1{wmZ5oO\u0006!1m\u001c8g\u0007\u0001\u0001\"AP \u000e\u0003!J!\u0001\u0011\u0015\u0003\u0013M\u0003\u0018M]6D_:4\u0017A\u00035bI>|\u0007oQ8oMB\u00111iR\u0007\u0002\t*\u00111(\u0012\u0006\u0003\r*\na\u0001[1e_>\u0004\u0018B\u0001%E\u00055\u0019uN\u001c4jOV\u0014\u0018\r^5p]\u0006I1\u000f\u001e:fC6LEm\u001d\t\u0004\u0017N3fB\u0001'R\u001d\ti\u0005+D\u0001O\u0015\tyE(\u0001\u0004=e>|GOP\u0005\u0002e%\u0011!+M\u0001\ba\u0006\u001c7.Y4f\u0013\t!VKA\u0002TKFT!AU\u0019\u0011\u0005A:\u0016B\u0001-2\u0005\rIe\u000e^\u0001\u0006G2|7m\u001b\t\u00037zk\u0011\u0001\u0018\u0006\u0003;\"\nA!\u001e;jY&\u0011q\f\u0018\u0002\u0006\u00072|7m[\u0001\u0019e\u0016\u001cwN^3s\rJ|Wn\u0016:ji\u0016\f\u0005.Z1e\u0019><\u0007C\u0001\u0019c\u0013\t\u0019\u0017GA\u0004C_>dW-\u00198\u0002'\rDWmY6q_&tG\u000fR5s\u001fB$\u0018n\u001c8\u0011\u0007A2\u0007.\u0003\u0002hc\t1q\n\u001d;j_:\u0004\"![7\u000f\u0005)\\\u0007CA'2\u0013\ta\u0017'\u0001\u0004Qe\u0016$WMZ\u0005\u0003]>\u0014aa\u0015;sS:<'B\u000172\u0003\u0019a\u0014N\\5u}Q9!\u000f^;wobL\bCA:\u0001\u001b\u0005!\u0003\"B\u001e\b\u0001\u0004i\u0004\"B!\b\u0001\u0004\u0011\u0005\"B%\b\u0001\u0004Q\u0005\"B-\b\u0001\u0004Q\u0006\"\u00021\b\u0001\u0004\t\u0007\"\u00023\b\u0001\u0004)'A\u0005*fG\u0016Lg/\u001a3CY>\u001c7.U;fk\u0016\u0004R\u0001`A\u0002\u0003\u000fi\u0011! \u0006\u0003}~\fq!\\;uC\ndWMC\u0002\u0002\u0002E\n!bY8mY\u0016\u001cG/[8o\u0013\r\t)! \u0002\u0006#V,W/\u001a\t\u0004g\u0006%\u0011bAA\u0006I\t\t\"+Z2fSZ,GM\u00117pG.LeNZ8\u0002AM$(/Z1n\u0013\u0012$v.\u00168bY2|7-\u0019;fI\ncwnY6Rk\u0016,Xm]\u000b\u0003\u0003#\u0001b\u0001`A\n-\u0006]\u0011bAA\u000b{\n9\u0001*Y:i\u001b\u0006\u0004\bcAA\r\u00115\t\u0001!A\u0011tiJ,\u0017-\\%e)>,f.\u00197m_\u000e\fG/\u001a3CY>\u001c7.U;fk\u0016\u001c\b%A\u000buS6,Gk\\!mY>\u001c\u0017\r^3e\u00052|7m[:\u0016\u0005\u0005\u0005\u0002c\u0002?\u0002\u0014\u0005\r\u00121\u0006\t\u0005\u0003K\t9#D\u0001'\u0013\r\tIC\n\u0002\u0005)&lW\rE\u0002t\u0003[I1!a\f%\u0005=\tE\u000e\\8dCR,GM\u00117pG.\u001c\u0018A\u0006;j[\u0016$v.\u00117m_\u000e\fG/\u001a3CY>\u001c7n\u001d\u0011\u0002']\u0014\u0018\u000e^3BQ\u0016\fG\rT8h\u001fB$\u0018n\u001c8\u0016\u0005\u0005]\u0002\u0003\u0002\u0019g\u0003s\u0001B!a\u000f\u0002@5\u0011\u0011Q\b\u0006\u0003;\u001aJA!!\u0011\u0002>\tiqK]5uK\u0006CW-\u00193M_\u001e\fAc\u001e:ji\u0016\f\u0005.Z1e\u0019><w\n\u001d;j_:\u0004\u0013A\u00067bgR\fE\u000e\\8dCR,GMQ1uG\"$\u0016.\\3\u0016\u0005\u0005\r\u0012A\u00077bgR\fE\u000e\\8dCR,GMQ1uG\"$\u0016.\\3`I\u0015\fH\u0003BA'\u0003'\u00022\u0001MA(\u0013\r\t\t&\r\u0002\u0005+:LG\u000fC\u0005\u0002VA\t\t\u00111\u0001\u0002$\u0005\u0019\u0001\u0010J\u0019\u0002/1\f7\u000f^!mY>\u001c\u0017\r^3e\u0005\u0006$8\r\u001b+j[\u0016\u0004\u0013\u0001C1eI\ncwnY6\u0015\u0007\u0005\fi\u0006C\u0004\u0002`I\u0001\r!a\u0002\u0002#I,7-Z5wK\u0012\u0014En\\2l\u0013:4w.A\u000bbY2|7-\u0019;f\u00052|7m[:U_\n\u000bGo\u00195\u0015\t\u00055\u0013Q\r\u0005\b\u0003O\u001a\u0002\u0019AA\u0012\u0003%\u0011\u0017\r^2i)&lW-\u0001\thKR\u0014En\\2lg>3')\u0019;dQR!\u0011QNA;!\u0019I\u0017q\u000e,\u0002t%\u0019\u0011\u0011O8\u0003\u00075\u000b\u0007\u000f\u0005\u0003L'\u0006\u001d\u0001bBA4)\u0001\u0007\u00111E\u0001\u001aO\u0016$(\t\\8dWN|eMQ1uG\"\fe\u000eZ*ue\u0016\fW\u000e\u0006\u0004\u0002t\u0005m\u0014Q\u0010\u0005\b\u0003O*\u0002\u0019AA\u0012\u0011\u0019\ty(\u0006a\u0001-\u0006A1\u000f\u001e:fC6LE-\u0001\u000fiCN,f.\u00197m_\u000e\fG/\u001a3SK\u000e,\u0017N^3e\u00052|7m[:\u0016\u0003\u0005\fAcZ3u+:\fG\u000e\\8dCR,GM\u00117pG.\u001cH\u0003BA:\u0003\u0013Ca!a \u0018\u0001\u00041\u0016!E2mK\u0006tW\u000f](mI\n\u000bGo\u00195fgR1\u0011QJAH\u0003'Cq!!%\u0019\u0001\u0004\t\u0019#A\tdY\u0016\fg.\u001e9UQJ,7\u000f\u001b+j[\u0016Da!!&\u0019\u0001\u0004\t\u0017!E<bSR4uN]\"p[BdW\r^5p]\u0006!1\u000f^8q)\t\ti%A\tsK\u000e|g/\u001a:QCN$XI^3oiN\f!b\u001e:ji\u0016$v\u000eT8h)\r\t\u0017\u0011\u0015\u0005\b\u0003G[\u0002\u0019AAS\u0003\u0019\u0011XmY8sIB\u00191/a*\n\u0007\u0005%FE\u0001\u000fSK\u000e,\u0017N^3e\u00052|7m\u001b+sC\u000e\\WM\u001d'pO\u00163XM\u001c;\u0002+\u001d,GOU3dK&4X\r\u001a\"m_\u000e\\\u0017+^3vKR!\u0011qCAX\u0011\u0019\ty\b\ba\u0001-\u0006\u00192M]3bi\u0016<&/\u001b;f\u0003\",\u0017\r\u001a'pOR\u0011\u0011qG\u0001\u0017SN<&/\u001b;f\u0003\",\u0017\r\u001a'pO\u0016s\u0017M\u00197fI\u0006!\"+Z2fSZ,GM\u00117pG.$&/Y2lKJ\u0004\"a\u001d\u0011\u0014\u0005\u0001zCCAA]\u0003U\u0019\u0007.Z2la>Lg\u000e\u001e#jeR{Gj\\4ESJ$2\u0001[Ab\u0011\u0019\t)M\ta\u0001Q\u0006i1\r[3dWB|\u0017N\u001c;ESJ\u0004")
public class ReceivedBlockTracker
implements Logging {
    private final SparkConf conf;
    private final Configuration hadoopConf;
    private final Seq<Object> streamIds;
    private final Clock clock;
    private final Option<String> checkpointDirOption;
    private final scala.collection.mutable.HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues;
    private final scala.collection.mutable.HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks;
    private final Option<WriteAheadLog> writeAheadLogOption;
    private Time lastAllocatedBatchTime;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static String checkpointDirToLogDir(String checkpointDir) {
        return ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir(checkpointDir);
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public Logging.LogStringContext LogStringContext(StringContext sc) {
        return Logging.LogStringContext$((Logging)this, (StringContext)sc);
    }

    public void withLogContext(HashMap<String, String> context, Function0<BoxedUnit> body) {
        Logging.withLogContext$((Logging)this, context, body);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logInfo(LogEntry entry) {
        Logging.logInfo$((Logging)this, (LogEntry)entry);
    }

    public void logInfo(LogEntry entry, Throwable throwable) {
        Logging.logInfo$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logDebug(LogEntry entry) {
        Logging.logDebug$((Logging)this, (LogEntry)entry);
    }

    public void logDebug(LogEntry entry, Throwable throwable) {
        Logging.logDebug$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logTrace(LogEntry entry) {
        Logging.logTrace$((Logging)this, (LogEntry)entry);
    }

    public void logTrace(LogEntry entry, Throwable throwable) {
        Logging.logTrace$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logWarning(LogEntry entry) {
        Logging.logWarning$((Logging)this, (LogEntry)entry);
    }

    public void logWarning(LogEntry entry, Throwable throwable) {
        Logging.logWarning$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logError(LogEntry entry) {
        Logging.logError$((Logging)this, (LogEntry)entry);
    }

    public void logError(LogEntry entry, Throwable throwable) {
        Logging.logError$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private scala.collection.mutable.HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues() {
        return this.streamIdToUnallocatedBlockQueues;
    }

    private scala.collection.mutable.HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks() {
        return this.timeToAllocatedBlocks;
    }

    private Option<WriteAheadLog> writeAheadLogOption() {
        return this.writeAheadLogOption;
    }

    private Time lastAllocatedBatchTime() {
        return this.lastAllocatedBatchTime;
    }

    private void lastAllocatedBatchTime_$eq(Time x$1) {
        this.lastAllocatedBatchTime = x$1;
    }

    /*
     * WARNING - void declaration
     */
    public boolean addBlock(ReceivedBlockInfo receivedBlockInfo) {
        boolean bl;
        try {
            void var3_2;
            boolean writeResult = this.writeToLog(new BlockAdditionEvent(receivedBlockInfo));
            if (writeResult) {
                ReceivedBlockTracker receivedBlockTracker = this;
                synchronized (receivedBlockTracker) {
                    Queue cfr_ignored_0 = (Queue)this.getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq((Object)receivedBlockInfo);
                }
                this.logDebug((Function0<String>)(Function0 & Serializable)() -> "Stream " + receivedBlockInfo.streamId() + " received block " + receivedBlockInfo.blockStoreResult().blockId());
            } else {
                this.logDebug((Function0<String>)(Function0 & Serializable)() -> "Failed to acknowledge stream " + receivedBlockInfo.streamId() + " receiving block " + receivedBlockInfo.blockStoreResult().blockId() + " in the Write Ahead Log.");
            }
            bl = var3_2;
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            if (NonFatal$.MODULE$.apply(throwable2)) {
                this.logError(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Error adding block ", ""}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.RECEIVED_BLOCK_INFO$.MODULE$, (Object)receivedBlockInfo)}))), throwable2);
                bl = false;
            }
            throw throwable;
        }
        return bl;
    }

    public synchronized void allocateBlocksToBatch(Time batchTime) {
        if (this.lastAllocatedBatchTime() == null || batchTime.$greater(this.lastAllocatedBatchTime())) {
            Map streamIdToBlocks = ((IterableOnceOps)this.streamIds.map((Function1 & Serializable)streamId -> ReceivedBlockTracker.$anonfun$allocateBlocksToBatch$1(this, BoxesRunTime.unboxToInt((Object)streamId)))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
            AllocatedBlocks allocatedBlocks = new AllocatedBlocks((Map<Object, Seq<ReceivedBlockInfo>>)streamIdToBlocks);
            if (this.writeToLog(new BatchAllocationEvent(batchTime, allocatedBlocks))) {
                this.streamIds.foreach((Function1)(JFunction1.mcVI.sp & Serializable)x$1 -> this.getReceivedBlockQueue(x$1).clear());
                this.timeToAllocatedBlocks().put((Object)batchTime, (Object)allocatedBlocks);
                this.lastAllocatedBatchTime_$eq(batchTime);
                return;
            }
            this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Possibly processed batch ", " needs to be "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.BATCH_TIMESTAMP$.MODULE$, (Object)batchTime)})).$plus(this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"processed again in WAL recovery"}))).log((Seq)Nil$.MODULE$))));
            return;
        }
        this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Possibly processed batch ", " needs to be processed "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.BATCH_TIMESTAMP$.MODULE$, (Object)batchTime)})).$plus(this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"again in WAL recovery"}))).log((Seq)Nil$.MODULE$))));
    }

    public synchronized Map<Object, Seq<ReceivedBlockInfo>> getBlocksOfBatch(Time batchTime) {
        return (Map)this.timeToAllocatedBlocks().get((Object)batchTime).map((Function1 & Serializable)x$2 -> x$2.streamIdToAllocatedBlocks()).getOrElse((Function0 & Serializable)() -> Predef$.MODULE$.Map().empty());
    }

    public synchronized Seq<ReceivedBlockInfo> getBlocksOfBatchAndStream(Time batchTime, int streamId) {
        return (Seq)this.timeToAllocatedBlocks().get((Object)batchTime).map((Function1 & Serializable)x$3 -> x$3.getBlocksOfStream(streamId)).getOrElse((Function0 & Serializable)() -> (Seq)package$.MODULE$.Seq().empty());
    }

    public synchronized boolean hasUnallocatedReceivedBlocks() {
        return !this.streamIdToUnallocatedBlockQueues().values().forall((Function1 & Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)x$4.isEmpty()));
    }

    public synchronized Seq<ReceivedBlockInfo> getUnallocatedBlocks(int streamId) {
        return this.getReceivedBlockQueue(streamId).toSeq();
    }

    public synchronized void cleanupOldBatches(Time cleanupThreshTime, boolean waitForCompletion) {
        Predef$.MODULE$.require(cleanupThreshTime.milliseconds() < this.clock.getTimeMillis());
        Seq timesToCleanup = ((IterableOnceOps)this.timeToAllocatedBlocks().keys().filter((Function1 & Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)x$5.$less(cleanupThreshTime)))).toSeq();
        this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Deleting batches: ", ""}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.DURATION$.MODULE$, (Object)timesToCleanup.mkString(" "))}))));
        if (this.writeToLog(new BatchCleanupEvent((Seq<Time>)timesToCleanup))) {
            this.timeToAllocatedBlocks().$minus$minus$eq((IterableOnce)timesToCleanup);
            this.writeAheadLogOption().foreach((Function1 & Serializable)x$6 -> {
                x$6.clean(cleanupThreshTime.milliseconds(), waitForCompletion);
                return BoxedUnit.UNIT;
            });
            return;
        }
        this.logWarning((Function0<String>)(Function0 & Serializable)() -> "Failed to acknowledge batch clean up in the Write Ahead Log.");
    }

    public void stop() {
        this.writeAheadLogOption().foreach((Function1 & Serializable)x$7 -> {
            x$7.close();
            return BoxedUnit.UNIT;
        });
    }

    private synchronized void recoverPastEvents() {
        this.writeAheadLogOption().foreach((Function1 & Serializable)writeAheadLog -> {
            ReceivedBlockTracker.$anonfun$recoverPastEvents$5(this, writeAheadLog);
            return BoxedUnit.UNIT;
        });
    }

    public boolean writeToLog(ReceivedBlockTrackerLogEvent record) {
        if (this.isWriteAheadLogEnabled()) {
            boolean bl;
            this.logTrace((Function0<String>)(Function0 & Serializable)() -> "Writing record: " + record);
            try {
                ((WriteAheadLog)this.writeAheadLogOption().get()).write(ByteBuffer.wrap(Utils$.MODULE$.serialize((Object)record)), this.clock.getTimeMillis());
                bl = true;
            }
            catch (Throwable throwable) {
                Throwable throwable2 = throwable;
                if (NonFatal$.MODULE$.apply(throwable2)) {
                    this.logWarning(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Exception thrown while writing record: "}))).log((Seq)Nil$.MODULE$).$plus(this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"", " to the WriteAheadLog."}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.RECEIVED_BLOCK_TRACKER_LOG_EVENT$.MODULE$, (Object)record)})))), throwable2);
                    bl = false;
                }
                throw throwable;
            }
            return bl;
        }
        return true;
    }

    private Queue<ReceivedBlockInfo> getReceivedBlockQueue(int streamId) {
        return (Queue)this.streamIdToUnallocatedBlockQueues().getOrElseUpdate((Object)BoxesRunTime.boxToInteger((int)streamId), (Function0 & Serializable)() -> new Queue(Queue$.MODULE$.$lessinit$greater$default$1()));
    }

    private Option<WriteAheadLog> createWriteAheadLog() {
        return this.checkpointDirOption.map((Function1 & Serializable)checkpointDir -> {
            String logDir = ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir((String)$this.checkpointDirOption.get());
            return WriteAheadLogUtils$.MODULE$.createLogForDriver($this.conf, logDir, $this.hadoopConf);
        });
    }

    public boolean isWriteAheadLogEnabled() {
        return this.writeAheadLogOption().nonEmpty();
    }

    public static final /* synthetic */ Tuple2 $anonfun$allocateBlocksToBatch$1(ReceivedBlockTracker $this, int streamId) {
        ArrayBuffer blocks = (ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Nil$.MODULE$);
        blocks.$plus$plus$eq((IterableOnce)$this.getReceivedBlockQueue(streamId).clone());
        return new Tuple2((Object)BoxesRunTime.boxToInteger((int)streamId), (Object)blocks.toSeq());
    }

    private final void insertAddedBlock$1(ReceivedBlockInfo receivedBlockInfo) {
        this.logTrace((Function0<String>)(Function0 & Serializable)() -> "Recovery: Inserting added block " + receivedBlockInfo);
        receivedBlockInfo.setBlockIdInvalid();
        this.getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq((Object)receivedBlockInfo);
    }

    private final void insertAllocatedBatch$1(Time batchTime, AllocatedBlocks allocatedBlocks) {
        this.logTrace((Function0<String>)(Function0 & Serializable)() -> "Recovery: Inserting allocated batch for time " + batchTime + " to " + allocatedBlocks.streamIdToAllocatedBlocks());
        allocatedBlocks.streamIdToAllocatedBlocks().foreach((Function1 & Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                int streamId = tuple2._1$mcI$sp();
                Seq allocatedBlocksInStream = (Seq)tuple2._2();
                return this.getReceivedBlockQueue(streamId).dequeueAll((Function1)allocatedBlocksInStream.toSet());
            }
            throw new MatchError((Object)tuple2);
        });
        this.timeToAllocatedBlocks().put((Object)batchTime, (Object)allocatedBlocks);
        this.lastAllocatedBatchTime_$eq(batchTime);
    }

    private final void cleanupBatches$1(Seq batchTimes) {
        this.logTrace((Function0<String>)(Function0 & Serializable)() -> "Recovery: Cleaning up batches " + batchTimes);
        this.timeToAllocatedBlocks().$minus$minus$eq((IterableOnce)batchTimes);
    }

    public static final /* synthetic */ void $anonfun$recoverPastEvents$7(ReceivedBlockTracker $this, ByteBuffer byteBuffer) {
        $this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> $this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Recovering record ", ""}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.BYTE_BUFFER$.MODULE$, (Object)byteBuffer)}))));
        ReceivedBlockTrackerLogEvent receivedBlockTrackerLogEvent = (ReceivedBlockTrackerLogEvent)Utils$.MODULE$.deserialize(JavaUtils.bufferToArray((ByteBuffer)byteBuffer), Thread.currentThread().getContextClassLoader());
        if (receivedBlockTrackerLogEvent instanceof BlockAdditionEvent) {
            BlockAdditionEvent blockAdditionEvent = (BlockAdditionEvent)receivedBlockTrackerLogEvent;
            ReceivedBlockInfo receivedBlockInfo = blockAdditionEvent.receivedBlockInfo();
            $this.insertAddedBlock$1(receivedBlockInfo);
            return;
        }
        if (receivedBlockTrackerLogEvent instanceof BatchAllocationEvent) {
            BatchAllocationEvent batchAllocationEvent = (BatchAllocationEvent)receivedBlockTrackerLogEvent;
            Time time = batchAllocationEvent.time();
            AllocatedBlocks allocatedBlocks = batchAllocationEvent.allocatedBlocks();
            $this.insertAllocatedBatch$1(time, allocatedBlocks);
            return;
        }
        if (receivedBlockTrackerLogEvent instanceof BatchCleanupEvent) {
            BatchCleanupEvent batchCleanupEvent = (BatchCleanupEvent)receivedBlockTrackerLogEvent;
            Seq<Time> batchTimes = batchCleanupEvent.times();
            $this.cleanupBatches$1(batchTimes);
            return;
        }
        throw new MatchError((Object)receivedBlockTrackerLogEvent);
    }

    public static final /* synthetic */ void $anonfun$recoverPastEvents$5(ReceivedBlockTracker $this, WriteAheadLog writeAheadLog) {
        $this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> $this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Recovering from write ahead logs in "}))).log((Seq)Nil$.MODULE$).$plus($this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"", ""}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.PATH$.MODULE$, $this.checkpointDirOption.get())})))));
        CollectionConverters$.MODULE$.IteratorHasAsScala(writeAheadLog.readAll()).asScala().foreach((Function1 & Serializable)byteBuffer -> {
            ReceivedBlockTracker.$anonfun$recoverPastEvents$7($this, byteBuffer);
            return BoxedUnit.UNIT;
        });
    }

    public ReceivedBlockTracker(SparkConf conf, Configuration hadoopConf, Seq<Object> streamIds, Clock clock, boolean recoverFromWriteAheadLog, Option<String> checkpointDirOption) {
        block0: {
            this.conf = conf;
            this.hadoopConf = hadoopConf;
            this.streamIds = streamIds;
            this.clock = clock;
            this.checkpointDirOption = checkpointDirOption;
            Logging.$init$((Logging)this);
            this.streamIdToUnallocatedBlockQueues = new scala.collection.mutable.HashMap();
            this.timeToAllocatedBlocks = new scala.collection.mutable.HashMap();
            this.writeAheadLogOption = this.createWriteAheadLog();
            this.lastAllocatedBatchTime = null;
            if (!recoverFromWriteAheadLog) break block0;
            this.recoverPastEvents();
        }
    }
}

