/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.BatchCounter;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.JavaTestUtils;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.util.ManualClock;
import org.junit.Assert;
import org.junit.Test;
import org.spark_project.guava.collect.Sets;
import scala.Tuple2;

public class JavaMapWithStateSuite
extends LocalJavaStreamingContext
implements Serializable {
    public void testAPI() {
        JavaPairRDD initialRDD = null;
        Object wordsDstream = null;
        Function4 & Serializable mappingFunc = (Function4 & Serializable)(time, word, one, state) -> {
            state.exists();
            state.get();
            state.isTimingOut();
            state.remove();
            state.update((Object)true);
            return Optional.of((Object)2.0);
        };
        JavaMapWithStateDStream stateDstream = wordsDstream.mapWithState(StateSpec.function((Function4)mappingFunc).initialState(initialRDD).numPartitions(10).partitioner((Partitioner)new HashPartitioner(10)).timeout(Durations.seconds((long)10L)));
        stateDstream.stateSnapshots();
        Function3 & Serializable mappingFunc2 = (Function3 & Serializable)(key, one, state) -> {
            state.exists();
            state.get();
            state.isTimingOut();
            state.remove();
            state.update((Object)true);
            return 2.0;
        };
        JavaMapWithStateDStream stateDstream2 = wordsDstream.mapWithState(StateSpec.function((Function3)mappingFunc2).initialState(initialRDD).numPartitions(10).partitioner((Partitioner)new HashPartitioner(10)).timeout(Durations.seconds((long)10L)));
        stateDstream2.stateSnapshots();
    }

    @Test
    public void testBasicFunction() {
        List inputData = Arrays.asList(Collections.emptyList(), Arrays.asList("a"), Arrays.asList("a", "b"), Arrays.asList("a", "b", "c"), Arrays.asList("a", "b"), Arrays.asList("a"), Collections.emptyList());
        List outputData = Arrays.asList(Collections.emptySet(), Sets.newHashSet((Object[])new Integer[]{1}), Sets.newHashSet((Object[])new Integer[]{2, 1}), Sets.newHashSet((Object[])new Integer[]{3, 2, 1}), Sets.newHashSet((Object[])new Integer[]{4, 3}), Sets.newHashSet((Object[])new Integer[]{5}), Collections.emptySet());
        List stateData = Arrays.asList(Collections.emptySet(), Sets.newHashSet((Object[])new Tuple2[]{new Tuple2((Object)"a", (Object)1)}), Sets.newHashSet((Object[])new Tuple2[]{new Tuple2((Object)"a", (Object)2), new Tuple2((Object)"b", (Object)1)}), Sets.newHashSet((Object[])new Tuple2[]{new Tuple2((Object)"a", (Object)3), new Tuple2((Object)"b", (Object)2), new Tuple2((Object)"c", (Object)1)}), Sets.newHashSet((Object[])new Tuple2[]{new Tuple2((Object)"a", (Object)4), new Tuple2((Object)"b", (Object)3), new Tuple2((Object)"c", (Object)1)}), Sets.newHashSet((Object[])new Tuple2[]{new Tuple2((Object)"a", (Object)5), new Tuple2((Object)"b", (Object)3), new Tuple2((Object)"c", (Object)1)}), Sets.newHashSet((Object[])new Tuple2[]{new Tuple2((Object)"a", (Object)5), new Tuple2((Object)"b", (Object)3), new Tuple2((Object)"c", (Object)1)}));
        Function3 & Serializable mappingFunc = (Function3 & Serializable)(key, value, state) -> {
            int sum = (Integer)value.orElse((Object)0) + (state.exists() ? (Integer)state.get() : 0);
            state.update((Object)sum);
            return sum;
        };
        this.testOperation(inputData, StateSpec.function((Function3)mappingFunc), outputData, stateData);
    }

    private <K, S, T> void testOperation(List<List<K>> input, StateSpec<K, Integer, S, T> mapWithStateSpec, List<Set<T>> expectedOutputs, List<Set<Tuple2<K, S>>> expectedStateSnapshots) {
        int numBatches = expectedOutputs.size();
        JavaDStream inputStream = JavaTestUtils.attachTestInputStream(this.ssc, input, 2);
        JavaMapWithStateDStream mapWithStateDStream = JavaPairDStream.fromJavaDStream((JavaDStream)inputStream.map((Function & Serializable)x -> new Tuple2(x, (Object)1))).mapWithState(mapWithStateSpec);
        List collectedOutputs = Collections.synchronizedList(new ArrayList());
        mapWithStateDStream.foreachRDD((VoidFunction & Serializable)rdd -> collectedOutputs.add(Sets.newHashSet((Iterable)rdd.collect())));
        List collectedStateSnapshots = Collections.synchronizedList(new ArrayList());
        mapWithStateDStream.stateSnapshots().foreachRDD((VoidFunction & Serializable)rdd -> collectedStateSnapshots.add(Sets.newHashSet((Iterable)rdd.collect())));
        BatchCounter batchCounter = new BatchCounter(this.ssc.ssc());
        this.ssc.start();
        ((ManualClock)this.ssc.ssc().scheduler().clock()).advance(this.ssc.ssc().progressListener().batchDuration() * (long)numBatches + 1L);
        batchCounter.waitUntilBatchesCompleted(numBatches, 10000L);
        Assert.assertEquals(expectedOutputs, collectedOutputs);
        Assert.assertEquals(expectedStateSnapshots, collectedStateSnapshots);
    }
}

