/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kinesis;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.spark.Logging;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.kinesis.KinesisCheckpointer$;
import org.apache.spark.streaming.kinesis.KinesisReceiver;
import org.apache.spark.streaming.kinesis.KinesisRecordProcessor$;
import org.apache.spark.streaming.util.RecurringTimer;
import org.apache.spark.util.Clock;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\u0005Uc!B\u0001\u0003\u0001\ta!aE&j]\u0016\u001c\u0018n]\"iK\u000e\\\u0007o\\5oi\u0016\u0014(BA\u0002\u0005\u0003\u001dY\u0017N\\3tSNT!!\u0002\u0004\u0002\u0013M$(/Z1nS:<'BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0017\u0005\u0019qN]4\u0014\u0007\u0001i1\u0003\u0005\u0002\u000f#5\tqBC\u0001\u0011\u0003\u0015\u00198-\u00197b\u0013\t\u0011rB\u0001\u0004B]f\u0014VM\u001a\t\u0003)Ui\u0011AB\u0005\u0003-\u0019\u0011q\u0001T8hO&tw\r\u0003\u0005\u0019\u0001\t\u0005\t\u0015!\u0003\u001b\u0003!\u0011XmY3jm\u0016\u00148\u0001\u0001\u0019\u00037\u0005\u00022\u0001H\u000f \u001b\u0005\u0011\u0011B\u0001\u0010\u0003\u0005=Y\u0015N\\3tSN\u0014VmY3jm\u0016\u0014\bC\u0001\u0011\"\u0019\u0001!\u0011BI\f\u0002\u0002\u0003\u0005)\u0011A\u0012\u0003\u0007}#\u0013'\u0005\u0002%OA\u0011a\"J\u0005\u0003M=\u0011qAT8uQ&tw\r\u0005\u0002\u000fQ%\u0011\u0011f\u0004\u0002\u0004\u0003:L\b\u0002C\u0016\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0017\u0002%\rDWmY6q_&tG/\u00138uKJ4\u0018\r\u001c\t\u0003[9j\u0011\u0001B\u0005\u0003_\u0011\u0011\u0001\u0002R;sCRLwN\u001c\u0005\tc\u0001\u0011\t\u0011)A\u0005e\u0005Aqo\u001c:lKJLE\r\u0005\u00024m9\u0011a\u0002N\u0005\u0003k=\ta\u0001\u0015:fI\u00164\u0017BA\u001c9\u0005\u0019\u0019FO]5oO*\u0011Qg\u0004\u0005\tu\u0001\u0011\t\u0011)A\u0005w\u0005)1\r\\8dWB\u0011AhP\u0007\u0002{)\u0011aHB\u0001\u0005kRLG.\u0003\u0002A{\t)1\t\\8dW\")!\t\u0001C\u0001\u0007\u00061A(\u001b8jiz\"R\u0001R#K\u00172\u0003\"\u0001\b\u0001\t\u000ba\t\u0005\u0019\u0001$1\u0005\u001dK\u0005c\u0001\u000f\u001e\u0011B\u0011\u0001%\u0013\u0003\nE\u0015\u000b\t\u0011!A\u0003\u0002\rBQaK!A\u00021BQ!M!A\u0002IBqAO!\u0011\u0002\u0003\u00071\bC\u0004O\u0001\t\u0007I\u0011B(\u0002\u001b\rDWmY6q_&tG/\u001a:t+\u0005\u0001\u0006\u0003B)Xeek\u0011A\u0015\u0006\u0003'R\u000b!bY8oGV\u0014(/\u001a8u\u0015\tqTKC\u0001W\u0003\u0011Q\u0017M^1\n\u0005a\u0013&!E\"p]\u000e,(O]3oi\"\u000b7\u000f['baB\u0011!LZ\u0007\u00027*\u0011A,X\u0001\u000bS:$XM\u001d4bG\u0016\u001c(B\u00010`\u00035\u0019G.[3oi2L'M]1ss*\u00111\u0001\u0019\u0006\u0003C\n\f\u0001b]3sm&\u001cWm\u001d\u0006\u0003G\u0012\f\u0011\"Y7bu>t\u0017m^:\u000b\u0003\u0015\f1aY8n\u0013\t97L\u0001\u000fJ%\u0016\u001cwN\u001d3Qe>\u001cWm]:pe\u000eCWmY6q_&tG/\u001a:\t\r%\u0004\u0001\u0015!\u0003Q\u00039\u0019\u0007.Z2la>Lg\u000e^3sg\u0002Bqa\u001b\u0001C\u0002\u0013%A.A\fmCN$8\t[3dWB|\u0017N\u001c;fIN+\u0017OT;ngV\tQ\u000e\u0005\u0003R/J\u0012\u0004BB8\u0001A\u0003%Q.\u0001\rmCN$8\t[3dWB|\u0017N\u001c;fIN+\u0017OT;ng\u0002Bq!\u001d\u0001C\u0002\u0013%!/\u0001\ndQ\u0016\u001c7\u000e]8j]R,'\u000f\u00165sK\u0006$W#A:\u0011\u0005Q4X\"A;\u000b\u0005y\"\u0011BA<v\u00059\u0011VmY;se&tw\rV5nKJDa!\u001f\u0001!\u0002\u0013\u0019\u0018aE2iK\u000e\\\u0007o\\5oi\u0016\u0014H\u000b\u001b:fC\u0012\u0004\u0003\"B>\u0001\t\u0003a\u0018aD:fi\u000eCWmY6q_&tG/\u001a:\u0015\u000bu\f\t!!\u0002\u0011\u00059q\u0018BA@\u0010\u0005\u0011)f.\u001b;\t\r\u0005\r!\u00101\u00013\u0003\u001d\u0019\b.\u0019:e\u0013\u0012Da!a\u0002{\u0001\u0004I\u0016\u0001D2iK\u000e\\\u0007o\\5oi\u0016\u0014\bbBA\u0006\u0001\u0011\u0005\u0011QB\u0001\u0013e\u0016lwN^3DQ\u0016\u001c7\u000e]8j]R,'\u000fF\u0003~\u0003\u001f\t\t\u0002C\u0004\u0002\u0004\u0005%\u0001\u0019\u0001\u001a\t\u000f\u0005\u001d\u0011\u0011\u0002a\u00013\"9\u0011Q\u0003\u0001\u0005\n\u0005]\u0011AC2iK\u000e\\\u0007o\\5oiR)Q0!\u0007\u0002\u001c!9\u00111AA\n\u0001\u0004\u0011\u0004bBA\u0004\u0003'\u0001\r!\u0017\u0005\b\u0003?\u0001A\u0011BA\u0011\u00035\u0019\u0007.Z2la>Lg\u000e^!mYR\tQ\u0010C\u0004\u0002&\u0001!I!a\n\u0002/M$\u0018M\u001d;DQ\u0016\u001c7\u000e]8j]R,'\u000f\u00165sK\u0006$G#A:\t\u000f\u0005-\u0002\u0001\"\u0001\u0002\"\u0005A1\u000f[;uI><hn\u0002\u0006\u00020\t\t\t\u0011#\u0001\u0003\u0003c\t1cS5oKNL7o\u00115fG.\u0004x.\u001b8uKJ\u00042\u0001HA\u001a\r%\t!!!A\t\u0002\t\t)dE\u0002\u000245AqAQA\u001a\t\u0003\tI\u0004\u0006\u0002\u00022!Q\u0011QHA\u001a#\u0003%\t!a\u0010\u00027\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00135+\t\t\tEK\u0002<\u0003\u0007Z#!!\u0012\u0011\t\u0005\u001d\u0013\u0011K\u0007\u0003\u0003\u0013RA!a\u0013\u0002N\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003\u001fz\u0011AC1o]>$\u0018\r^5p]&!\u00111KA%\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a")
public class KinesisCheckpointer
implements Logging {
    private final KinesisReceiver<?> receiver;
    private final Duration checkpointInterval;
    public final String org$apache$spark$streaming$kinesis$KinesisCheckpointer$$workerId;
    private final Clock clock;
    private final ConcurrentHashMap<String, IRecordProcessorCheckpointer> checkpointers;
    private final ConcurrentHashMap<String, String> org$apache$spark$streaming$kinesis$KinesisCheckpointer$$lastCheckpointedSeqNums;
    private final RecurringTimer checkpointerThread;
    private transient Logger org$apache$spark$Logging$$log_;

    public static Clock $lessinit$greater$default$4() {
        return KinesisCheckpointer$.MODULE$.$lessinit$greater$default$4();
    }

    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    public void org$apache$spark$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$Logging$$log_ = x$1;
    }

    public String logName() {
        return Logging.class.logName((Logging)this);
    }

    public Logger log() {
        return Logging.class.log((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.class.logInfo((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.class.logDebug((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.class.logTrace((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.class.logWarning((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.class.logError((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.class.logInfo((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.class.logDebug((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.class.logTrace((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.class.logWarning((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.class.logError((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled((Logging)this);
    }

    private ConcurrentHashMap<String, IRecordProcessorCheckpointer> checkpointers() {
        return this.checkpointers;
    }

    public ConcurrentHashMap<String, String> org$apache$spark$streaming$kinesis$KinesisCheckpointer$$lastCheckpointedSeqNums() {
        return this.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$lastCheckpointedSeqNums;
    }

    private RecurringTimer checkpointerThread() {
        return this.checkpointerThread;
    }

    public void setCheckpointer(String shardId, IRecordProcessorCheckpointer checkpointer) {
        this.checkpointers().put(shardId, checkpointer);
    }

    public synchronized void removeCheckpointer(String shardId, IRecordProcessorCheckpointer checkpointer) {
        this.checkpointers().remove(shardId);
        this.checkpoint(shardId, checkpointer);
    }

    private void checkpoint(String shardId, IRecordProcessorCheckpointer checkpointer) {
        try {
            if (checkpointer == null) {
                this.logDebug((Function0<String>)new Serializable(this, shardId){
                    public static final long serialVersionUID = 0L;
                    private final String shardId$1;

                    public final String apply() {
                        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Checkpointing skipped for shardId ", ". Checkpointer not set."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.shardId$1}));
                    }
                    {
                        this.shardId$1 = shardId$1;
                    }
                });
            } else {
                this.receiver.getLatestSeqNumToCheckpoint(shardId).foreach((Function1)new Serializable(this, shardId, checkpointer){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ KinesisCheckpointer $outer;
                    public final String shardId$1;
                    public final IRecordProcessorCheckpointer checkpointer$1;

                    public final Object apply(String latestSeqNum) {
                        Object object;
                        String lastSeqNum = this.$outer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$lastCheckpointedSeqNums().get(this.shardId$1);
                        if (lastSeqNum == null || new StringOps(Predef$.MODULE$.augmentString(latestSeqNum)).$greater((Object)lastSeqNum)) {
                            KinesisRecordProcessor$.MODULE$.retryRandom(new Serializable(this, latestSeqNum){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ $anonfun$checkpoint$1 $outer;
                                private final String latestSeqNum$1;

                                public final void apply() {
                                    this.apply$mcV$sp();
                                }

                                public void apply$mcV$sp() {
                                    this.$outer.checkpointer$1.checkpoint(this.latestSeqNum$1);
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.latestSeqNum$1 = latestSeqNum$1;
                                }
                            }, 4, 100);
                            this.$outer.logDebug((Function0<String>)new Serializable(this, latestSeqNum){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ $anonfun$checkpoint$1 $outer;
                                private final String latestSeqNum$1;

                                public final String apply() {
                                    return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Checkpoint:  WorkerId ", " completed checkpoint at sequence number"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$anonfun$$$outer().org$apache$spark$streaming$kinesis$KinesisCheckpointer$$workerId}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{" ", " for shardId ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.latestSeqNum$1, this.$outer.shardId$1}))).toString();
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.latestSeqNum$1 = latestSeqNum$1;
                                }
                            });
                            object = this.$outer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$lastCheckpointedSeqNums().put(this.shardId$1, latestSeqNum);
                        } else {
                            object = BoxedUnit.UNIT;
                        }
                        return object;
                    }

                    public /* synthetic */ KinesisCheckpointer org$apache$spark$streaming$kinesis$KinesisCheckpointer$$anonfun$$$outer() {
                        return this.$outer;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.shardId$1 = shardId$1;
                        this.checkpointer$1 = checkpointer$1;
                    }
                });
            }
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            Throwable e = (Throwable)option.get();
            this.logWarning((Function0<String>)new Serializable(this, shardId){
                public static final long serialVersionUID = 0L;
                private final String shardId$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Failed to checkpoint shardId ", " to DynamoDB."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.shardId$1}));
                }
                {
                    this.shardId$1 = shardId$1;
                }
            }, e);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    public synchronized void org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll() {
        try {
            Enumeration<String> shardIds = this.checkpointers().keys();
            while (shardIds.hasMoreElements()) {
                String shardId = shardIds.nextElement();
                this.checkpoint(shardId, this.checkpointers().get(shardId));
            }
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            Throwable e = (Throwable)option.get();
            this.logWarning((Function0<String>)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Failed to checkpoint to DynamoDB.";
                }
            }, e);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    private RecurringTimer startCheckpointerThread() {
        long period = this.checkpointInterval.milliseconds();
        String threadName = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Kinesis Checkpointer - Worker ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$workerId}));
        RecurringTimer timer = new RecurringTimer(this.clock, period, (Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ KinesisCheckpointer $outer;

            public final void apply(long x$1) {
                this.apply$mcVJ$sp(x$1);
            }

            public void apply$mcVJ$sp(long x$1) {
                this.$outer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        }, threadName);
        timer.start();
        this.logDebug((Function0<String>)new Serializable(this, threadName){
            public static final long serialVersionUID = 0L;
            private final String threadName$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Started checkpointer thread: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.threadName$1}));
            }
            {
                this.threadName$1 = threadName$1;
            }
        });
        return timer;
    }

    public void shutdown() {
        this.checkpointerThread().stop(false);
        this.checkpointers().clear();
        this.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$lastCheckpointedSeqNums().clear();
        this.logInfo((Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Successfully shutdown Kinesis Checkpointer.";
            }
        });
    }

    public KinesisCheckpointer(KinesisReceiver<?> receiver, Duration checkpointInterval, String workerId, Clock clock) {
        this.receiver = receiver;
        this.checkpointInterval = checkpointInterval;
        this.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$workerId = workerId;
        this.clock = clock;
        Logging.class.$init$((Logging)this);
        this.checkpointers = new ConcurrentHashMap();
        this.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$lastCheckpointedSeqNums = new ConcurrentHashMap();
        this.checkpointerThread = this.startCheckpointerThread();
    }
}

