/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.util;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.spark.SparkEnv;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockResult;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaBatchInfo;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.Iterator;
import scala.reflect.ClassManifestFactory;
import scala.reflect.ClassTag;

public class GlobalWatermarkHolder {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(GlobalWatermarkHolder.class);
    private static final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized Queue<@UnknownKeyFor @NonNull @Initialized SparkWatermarks>> sourceTimes = new HashMap<Integer, Queue<SparkWatermarks>>();
    private static final @UnknownKeyFor @NonNull @Initialized BlockId WATERMARKS_BLOCK_ID = BlockId.apply((String)"broadcast_0WATERMARKS");
    private static final @UnknownKeyFor @NonNull @Initialized ClassTag<@UnknownKeyFor @NonNull @Initialized Map> WATERMARKS_TAG = ClassManifestFactory.fromClass(Map.class);
    private static volatile @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized SparkWatermarks> driverNodeWatermarks = null;
    private static volatile @UnknownKeyFor @NonNull @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized SparkWatermarks>> watermarkCache = null;
    private static volatile @UnknownKeyFor @NonNull @Initialized long lastWatermarkedBatchTime = 0L;

    public static void add(@UnknownKeyFor @NonNull @Initialized int sourceId, @UnknownKeyFor @NonNull @Initialized SparkWatermarks sparkWatermarks) {
        Queue<SparkWatermarks> timesQueue = sourceTimes.get(sourceId);
        if (timesQueue == null) {
            timesQueue = new ConcurrentLinkedQueue<SparkWatermarks>();
        }
        timesQueue.offer(sparkWatermarks);
        sourceTimes.put(sourceId, timesQueue);
    }

    @VisibleForTesting
    public static void addAll(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized Queue<@UnknownKeyFor @NonNull @Initialized SparkWatermarks>> sourceTimes) {
        for (Map.Entry<Integer, Queue<SparkWatermarks>> en : sourceTimes.entrySet()) {
            int sourceId = en.getKey();
            Queue<SparkWatermarks> timesQueue = en.getValue();
            while (!timesQueue.isEmpty()) {
                GlobalWatermarkHolder.add(sourceId, timesQueue.poll());
            }
        }
    }

    public static @UnknownKeyFor @NonNull @Initialized long getLastWatermarkedBatchTime() {
        return lastWatermarkedBatchTime;
    }

    public static @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized SparkWatermarks> get(@UnknownKeyFor @NonNull @Initialized Long cacheInterval) {
        if (GlobalWatermarkHolder.canBypassRemoteWatermarkFetching()) {
            return GlobalWatermarkHolder.getLocalWatermarkCopy();
        }
        if (watermarkCache == null) {
            watermarkCache = GlobalWatermarkHolder.createWatermarkCache(cacheInterval);
        }
        try {
            return (Map)watermarkCache.get((Object)"SINGLETON");
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private static @UnknownKeyFor @NonNull @Initialized boolean canBypassRemoteWatermarkFetching() {
        return driverNodeWatermarks != null;
    }

    private static synchronized @UnknownKeyFor @NonNull @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized SparkWatermarks>> createWatermarkCache(@UnknownKeyFor @NonNull @Initialized Long batchDuration) {
        return CacheBuilder.newBuilder().expireAfterWrite(batchDuration / 2L, TimeUnit.MILLISECONDS).build((CacheLoader)new WatermarksLoader());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void advance(@UnknownKeyFor @NonNull @Initialized String batchId) {
        Class<GlobalWatermarkHolder> clazz = GlobalWatermarkHolder.class;
        synchronized (GlobalWatermarkHolder.class) {
            BlockManager blockManager = SparkEnv.get().blockManager();
            Map<Integer, SparkWatermarks> newWatermarks = GlobalWatermarkHolder.computeNewWatermarks(blockManager);
            if (!newWatermarks.isEmpty()) {
                GlobalWatermarkHolder.writeRemoteWatermarkBlock(newWatermarks, blockManager);
                GlobalWatermarkHolder.writeLocalWatermarkCopy(newWatermarks);
            } else {
                LOG.info("No new watermarks could be computed upon completion of batch: {}", (Object)batchId);
            }
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return;
        }
    }

    private static void writeLocalWatermarkCopy(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized SparkWatermarks> newWatermarks) {
        driverNodeWatermarks = newWatermarks;
    }

    private static @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized SparkWatermarks> getLocalWatermarkCopy() {
        return driverNodeWatermarks;
    }

    public static void advance() {
        GlobalWatermarkHolder.advance("N/A");
    }

    private static @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized SparkWatermarks> computeNewWatermarks(@UnknownKeyFor @NonNull @Initialized BlockManager blockManager) {
        if (sourceTimes.isEmpty()) {
            return new HashMap<Integer, SparkWatermarks>();
        }
        HashMap<Integer, SparkWatermarks> newValues = new HashMap<Integer, SparkWatermarks>();
        for (Map.Entry<Integer, Queue<SparkWatermarks>> watermarkInfo : sourceTimes.entrySet()) {
            Queue<SparkWatermarks> timesQueue;
            SparkWatermarks next;
            if (watermarkInfo.getValue().isEmpty()) continue;
            Integer sourceId = watermarkInfo.getKey();
            Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
            Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
            Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
            Map<Integer, SparkWatermarks> currentWatermarks = GlobalWatermarkHolder.initWatermarks(blockManager);
            if (currentWatermarks.containsKey(sourceId)) {
                SparkWatermarks currentTimes = currentWatermarks.get(sourceId);
                currentLowWatermark = currentTimes.getLowWatermark();
                currentHighWatermark = currentTimes.getHighWatermark();
                currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime();
            }
            Instant nextLowWatermark = (next = (timesQueue = watermarkInfo.getValue()).poll()).getLowWatermark().isAfter((ReadableInstant)currentLowWatermark) ? next.getLowWatermark() : currentLowWatermark;
            Instant nextHighWatermark = next.getHighWatermark().isAfter((ReadableInstant)currentHighWatermark) ? next.getHighWatermark() : currentHighWatermark;
            Instant nextSynchronizedProcessingTime = next.getSynchronizedProcessingTime();
            Preconditions.checkState((!nextLowWatermark.isAfter((ReadableInstant)nextHighWatermark) ? 1 : 0) != 0, (Object)String.format("Low watermark %s cannot be later then high watermark %s", nextLowWatermark, nextHighWatermark));
            Preconditions.checkState((boolean)nextSynchronizedProcessingTime.isAfter((ReadableInstant)currentSynchronizedProcessingTime), (Object)"Synchronized processing time must advance.");
            newValues.put(sourceId, new SparkWatermarks(nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime));
        }
        return newValues;
    }

    private static void writeRemoteWatermarkBlock(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized SparkWatermarks> newWatermarks, @UnknownKeyFor @NonNull @Initialized BlockManager blockManager) {
        blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
        blockManager.putSingle(WATERMARKS_BLOCK_ID, newWatermarks, StorageLevel.MEMORY_ONLY(), true, WATERMARKS_TAG);
        LOG.info("Put new watermark block: {}", newWatermarks);
    }

    private static @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized SparkWatermarks> initWatermarks(@UnknownKeyFor @NonNull @Initialized BlockManager blockManager) {
        Map<Integer, SparkWatermarks> watermarks = GlobalWatermarkHolder.fetchSparkWatermarks(blockManager);
        if (watermarks == null) {
            HashMap empty = Maps.newHashMap();
            blockManager.putSingle(WATERMARKS_BLOCK_ID, (Object)empty, StorageLevel.MEMORY_ONLY(), true, WATERMARKS_TAG);
            return empty;
        }
        return watermarks;
    }

    private static @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized SparkWatermarks> fetchSparkWatermarks(@UnknownKeyFor @NonNull @Initialized BlockManager blockManager) {
        Option blockResultOption = blockManager.get(WATERMARKS_BLOCK_ID, WATERMARKS_TAG);
        if (blockResultOption.isDefined()) {
            Iterator data = ((BlockResult)blockResultOption.get()).data();
            Map next = (Map)data.next();
            while (data.hasNext()) {
            }
            return next;
        }
        return null;
    }

    @VisibleForTesting
    public static synchronized void clear() {
        sourceTimes.clear();
        lastWatermarkedBatchTime = 0L;
        GlobalWatermarkHolder.writeLocalWatermarkCopy(null);
        SparkEnv sparkEnv = SparkEnv.get();
        if (sparkEnv != null) {
            BlockManager blockManager = sparkEnv.blockManager();
            blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
        }
    }

    public static class WatermarkAdvancingStreamingListener
    extends JavaStreamingListener {
        private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(WatermarkAdvancingStreamingListener.class);

        private @UnknownKeyFor @NonNull @Initialized long timeOf(@UnknownKeyFor @NonNull @Initialized JavaBatchInfo info) {
            return info.batchTime().milliseconds();
        }

        private @UnknownKeyFor @NonNull @Initialized long laterOf(@UnknownKeyFor @NonNull @Initialized long t1, @UnknownKeyFor @NonNull @Initialized long t2) {
            return Math.max(t1, t2);
        }

        public void onBatchCompleted(@UnknownKeyFor @NonNull @Initialized JavaStreamingListenerBatchCompleted batchCompleted) {
            long currentBatchTime = this.timeOf(batchCompleted.batchInfo());
            GlobalWatermarkHolder.advance(Long.toString(currentBatchTime));
            lastWatermarkedBatchTime = this.laterOf(lastWatermarkedBatchTime, currentBatchTime);
            LOG.info("Batch with timestamp: {} has completed, watermarks have been updated.", (Object)lastWatermarkedBatchTime);
        }
    }

    public static class SparkWatermarks
    implements Serializable {
        private final @UnknownKeyFor @NonNull @Initialized Instant lowWatermark;
        private final @UnknownKeyFor @NonNull @Initialized Instant highWatermark;
        private final @UnknownKeyFor @NonNull @Initialized Instant synchronizedProcessingTime;

        @VisibleForTesting
        public SparkWatermarks(@UnknownKeyFor @NonNull @Initialized Instant lowWatermark, @UnknownKeyFor @NonNull @Initialized Instant highWatermark, @UnknownKeyFor @NonNull @Initialized Instant synchronizedProcessingTime) {
            this.lowWatermark = lowWatermark;
            this.highWatermark = highWatermark;
            this.synchronizedProcessingTime = synchronizedProcessingTime;
        }

        public @UnknownKeyFor @NonNull @Initialized Instant getLowWatermark() {
            return this.lowWatermark;
        }

        public @UnknownKeyFor @NonNull @Initialized Instant getHighWatermark() {
            return this.highWatermark;
        }

        public @UnknownKeyFor @NonNull @Initialized Instant getSynchronizedProcessingTime() {
            return this.synchronizedProcessingTime;
        }

        @SideEffectFree
        public @UnknownKeyFor @NonNull @Initialized String toString() {
            return "SparkWatermarks{lowWatermark=" + this.lowWatermark + ", highWatermark=" + this.highWatermark + ", synchronizedProcessingTime=" + this.synchronizedProcessingTime + '}';
        }
    }

    private static class WatermarksLoader
    extends CacheLoader<String, Map<Integer, SparkWatermarks>> {
        private WatermarksLoader() {
        }

        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized SparkWatermarks> load(@Nonnull @UnknownKeyFor @NonNull @Initialized String key) throws @UnknownKeyFor @NonNull @Initialized Exception {
            BlockManager blockManager = SparkEnv.get().blockManager();
            Map watermarks = GlobalWatermarkHolder.fetchSparkWatermarks(blockManager);
            return watermarks != null ? watermarks : Maps.newHashMap();
        }
    }
}

