/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ComparisonChain;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Ordering;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

class WatermarkCallbackExecutor {
    private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>> callbacks = new ConcurrentHashMap();
    private final Executor executor;

    public static WatermarkCallbackExecutor create(Executor executor) {
        return new WatermarkCallbackExecutor(executor);
    }

    private WatermarkCallbackExecutor(Executor executor) {
        this.executor = executor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void callOnGuaranteedFiring(AppliedPTransform<?, ?, ?> step, BoundedWindow window, WindowingStrategy<?, ?> windowingStrategy, Runnable runnable) {
        WatermarkCallback callback = WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
        PriorityQueue callbackQueue = (PriorityQueue)this.callbacks.get(step);
        if (callbackQueue == null && this.callbacks.putIfAbsent(step, callbackQueue = new PriorityQueue(11, new CallbackOrdering())) != null) {
            callbackQueue = (PriorityQueue)this.callbacks.get(step);
        }
        PriorityQueue priorityQueue = callbackQueue;
        synchronized (priorityQueue) {
            callbackQueue.offer(callback);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark) {
        PriorityQueue callbackQueue = (PriorityQueue)this.callbacks.get(step);
        if (callbackQueue == null) {
            return;
        }
        PriorityQueue priorityQueue = callbackQueue;
        synchronized (priorityQueue) {
            while (!callbackQueue.isEmpty() && ((WatermarkCallback)callbackQueue.peek()).shouldFire(watermark)) {
                this.executor.execute(((WatermarkCallback)callbackQueue.poll()).getCallback());
            }
        }
    }

    private static class CallbackOrdering
    extends Ordering<WatermarkCallback> {
        private CallbackOrdering() {
        }

        @Override
        public int compare(WatermarkCallback left, WatermarkCallback right) {
            return ComparisonChain.start().compare((Comparable<?>)left.fireAfter, (Comparable<?>)right.fireAfter).compare(left.callback, right.callback, Ordering.arbitrary()).result();
        }
    }

    private static class WatermarkCallback {
        private final Instant fireAfter;
        private final Runnable callback;

        public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
            Instant firingAfter = strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring(window);
            return new WatermarkCallback(firingAfter, callback);
        }

        private WatermarkCallback(Instant fireAfter, Runnable callback) {
            this.fireAfter = fireAfter;
            this.callback = callback;
        }

        public boolean shouldFire(Instant currentWatermark) {
            return currentWatermark.isAfter((ReadableInstant)this.fireAfter) || currentWatermark.equals((Object)BoundedWindow.TIMESTAMP_MAX_VALUE);
        }

        public Runnable getCallback() {
            return this.callback;
        }
    }
}

