/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateNamespace;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.Bundle;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.AutoValue_WatermarkManager_PendingWatermarkUpdate;
import org.apache.beam.runners.direct.Clock;
import org.apache.beam.runners.direct.ExecutableGraph;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ComparisonChain;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.HashBasedTable;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Ordering;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.SortedMultiset;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Table;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TreeMultiset;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

@Internal
public class WatermarkManager<ExecutableT, CollectionT> {
    private static final int MAX_INCREMENTAL_UPDATES = 10;
    private static final Watermark THE_END_OF_TIME = new Watermark(){

        @Override
        public String getName() {
            return "THE_END_OF_TIME";
        }

        @Override
        public WatermarkUpdate refresh() {
            return WatermarkUpdate.NO_CHANGE;
        }

        @Override
        public Instant get() {
            return BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
    };
    private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
    private final Clock clock;
    private final ExecutableGraph<ExecutableT, CollectionT> graph;
    private final Function<ExecutableT, String> getName;
    private final Map<ExecutableT, TransformWatermarks> transformToWatermarks;
    private final ConcurrentLinkedQueue<PendingWatermarkUpdate> pendingUpdates;
    private final Lock refreshLock;
    @GuardedBy(value="refreshLock")
    private final Set<ExecutableT> pendingRefreshes;

    private static WatermarkUpdate updateAndTrace(String name, Instant oldTime, Instant currentTime) {
        WatermarkUpdate res = WatermarkUpdate.fromTimestamps(oldTime, currentTime);
        if (res.isAdvanced()) {
            WindowTracing.debug((String)"Watermark {} advanced from {} to {}", (Object[])new Object[]{name, oldTime, currentTime});
        }
        return res;
    }

    private static Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredTimers(Instant latestTime, Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> objectTimers) {
        HashMap result = new HashMap();
        HashSet emptyKeys = new HashSet();
        for (Map.Entry<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> pendingTimers : objectTimers.entrySet()) {
            NavigableSet<TimerInternals.TimerData> timers = pendingTimers.getValue();
            if (!timers.isEmpty() && ((TimerInternals.TimerData)timers.first()).getTimestamp().isBefore((ReadableInstant)latestTime)) {
                ArrayList<TimerInternals.TimerData> keyFiredTimers = new ArrayList<TimerInternals.TimerData>();
                result.put(pendingTimers.getKey(), keyFiredTimers);
                while (!timers.isEmpty() && ((TimerInternals.TimerData)timers.first()).getTimestamp().isBefore((ReadableInstant)latestTime)) {
                    keyFiredTimers.add((TimerInternals.TimerData)timers.first());
                    timers.remove(timers.first());
                }
            }
            if (!timers.isEmpty()) continue;
            emptyKeys.add(pendingTimers.getKey());
        }
        objectTimers.keySet().removeAll(emptyKeys);
        return result;
    }

    public static <ExecutableT, CollectionT> WatermarkManager<ExecutableT, ? super CollectionT> create(Clock clock, ExecutableGraph<ExecutableT, ? super CollectionT> graph, Function<ExecutableT, String> getName) {
        return new WatermarkManager<ExecutableT, CollectionT>(clock, graph, getName);
    }

    private WatermarkManager(Clock clock, ExecutableGraph<ExecutableT, CollectionT> graph, Function<ExecutableT, String> getName) {
        this.clock = clock;
        this.graph = graph;
        this.getName = getName;
        this.pendingUpdates = new ConcurrentLinkedQueue();
        this.refreshLock = new ReentrantLock();
        this.pendingRefreshes = new HashSet<ExecutableT>();
        this.transformToWatermarks = new HashMap<ExecutableT, TransformWatermarks>();
        for (ExecutableT rootTransform : graph.getRootTransforms()) {
            this.getTransformWatermark(rootTransform);
        }
        for (ExecutableT primitiveTransform : graph.getExecutables()) {
            this.getTransformWatermark(primitiveTransform);
        }
    }

    private TransformWatermarks getValueWatermark(CollectionT value) {
        return this.getTransformWatermark(this.graph.getProducer(value));
    }

    private TransformWatermarks getTransformWatermark(ExecutableT executable) {
        String name = this.getName.apply(executable);
        TransformWatermarks wms = this.transformToWatermarks.get(executable);
        if (wms == null) {
            List<Watermark> inputCollectionWatermarks = this.getInputWatermarks(executable);
            AppliedPTransformInputWatermark inputWatermark = new AppliedPTransformInputWatermark(name + ".in", inputCollectionWatermarks);
            AppliedPTransformOutputWatermark outputWatermark = new AppliedPTransformOutputWatermark(name + ".out", inputWatermark);
            SynchronizedProcessingTimeInputWatermark inputProcessingWatermark = new SynchronizedProcessingTimeInputWatermark(name + ".inProcessing", this.getInputProcessingWatermarks(executable));
            SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark = new SynchronizedProcessingTimeOutputWatermark(name + ".outProcessing", inputProcessingWatermark);
            wms = new TransformWatermarks(executable, inputWatermark, outputWatermark, inputProcessingWatermark, outputProcessingWatermark);
            this.transformToWatermarks.put(executable, wms);
        }
        return wms;
    }

    private Collection<Watermark> getInputProcessingWatermarks(ExecutableT executable) {
        ImmutableList.Builder inputWmsBuilder = ImmutableList.builder();
        Collection<CollectionT> inputs = this.graph.getPerElementInputs(executable);
        if (inputs.isEmpty()) {
            inputWmsBuilder.add((Object)THE_END_OF_TIME);
        }
        for (CollectionT input : inputs) {
            SynchronizedProcessingTimeOutputWatermark producerOutputWatermark = this.getValueWatermark(input).synchronizedProcessingOutputWatermark;
            inputWmsBuilder.add((Object)producerOutputWatermark);
        }
        return inputWmsBuilder.build();
    }

    private List<Watermark> getInputWatermarks(ExecutableT executable) {
        ImmutableList.Builder inputWatermarksBuilder = ImmutableList.builder();
        Collection<CollectionT> inputs = this.graph.getPerElementInputs(executable);
        if (inputs.isEmpty()) {
            inputWatermarksBuilder.add((Object)THE_END_OF_TIME);
        }
        for (CollectionT input : inputs) {
            AppliedPTransformOutputWatermark producerOutputWatermark = this.getValueWatermark(input).outputWatermark;
            inputWatermarksBuilder.add((Object)producerOutputWatermark);
        }
        return inputWatermarksBuilder.build();
    }

    public TransformWatermarks getWatermarks(ExecutableT executable) {
        return this.transformToWatermarks.get(executable);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initialize(Map<ExecutableT, ? extends Iterable<Bundle<?, CollectionT>>> initialBundles) {
        this.refreshLock.lock();
        try {
            for (Map.Entry<ExecutableT, Iterable<Bundle<?, CollectionT>>> rootEntry : initialBundles.entrySet()) {
                TransformWatermarks rootWms = this.transformToWatermarks.get(rootEntry.getKey());
                for (Bundle<?, CollectionT> initialBundle : rootEntry.getValue()) {
                    rootWms.addPending(initialBundle);
                }
                this.pendingRefreshes.add(rootEntry.getKey());
            }
        }
        finally {
            this.refreshLock.unlock();
        }
    }

    public void updateWatermarks(@Nullable Bundle<?, ? extends CollectionT> completed, TimerUpdate timerUpdate, ExecutableT executable, @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs, Iterable<? extends Bundle<?, ? extends CollectionT>> outputs, Instant earliestHold) {
        this.pendingUpdates.offer(PendingWatermarkUpdate.create(executable, completed, timerUpdate, unprocessedInputs, outputs, earliestHold));
        this.tryApplyPendingUpdates();
    }

    private void tryApplyPendingUpdates() {
        if (this.refreshLock.tryLock()) {
            try {
                this.applyNUpdates(10);
            }
            finally {
                this.refreshLock.unlock();
            }
        }
    }

    private void applyAllPendingUpdates() {
        this.refreshLock.lock();
        try {
            this.applyNUpdates(-1);
        }
        finally {
            this.refreshLock.unlock();
        }
    }

    @GuardedBy(value="refreshLock")
    private void applyNUpdates(int numUpdates) {
        for (int i = 0; !(this.pendingUpdates.isEmpty() || i >= numUpdates && numUpdates > 0); ++i) {
            PendingWatermarkUpdate pending = this.pendingUpdates.poll();
            this.applyPendingUpdate(pending);
            this.pendingRefreshes.add(pending.getExecutable());
        }
    }

    private void applyPendingUpdate(PendingWatermarkUpdate<ExecutableT, CollectionT> pending) {
        ExecutableT executable = pending.getExecutable();
        Bundle<?, CollectionT> inputBundle = pending.getInputBundle();
        this.updatePending(inputBundle, pending.getTimerUpdate(), executable, pending.getUnprocessedInputs(), pending.getOutputs());
        TransformWatermarks transformWms = this.transformToWatermarks.get(executable);
        transformWms.setEventTimeHold(inputBundle == null ? null : inputBundle.getKey(), pending.getEarliestHold());
    }

    private void updatePending(Bundle<?, ? extends CollectionT> input, TimerUpdate timerUpdate, ExecutableT executable, @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs, Iterable<? extends Bundle<?, ? extends CollectionT>> outputs) {
        for (Bundle<?, CollectionT> bundle : outputs) {
            for (ExecutableT consumer : this.graph.getPerElementConsumers(bundle.getPCollection())) {
                TransformWatermarks watermarks = this.transformToWatermarks.get(consumer);
                watermarks.addPending(bundle);
            }
        }
        TransformWatermarks completedTransform = this.transformToWatermarks.get(executable);
        if (unprocessedInputs != null) {
            completedTransform.addPending(unprocessedInputs);
        }
        completedTransform.updateTimers(timerUpdate);
        if (input != null) {
            completedTransform.removePending(input);
        }
    }

    public synchronized void refreshAll() {
        this.refreshLock.lock();
        try {
            this.applyAllPendingUpdates();
            Set<ExecutableT> toRefresh = this.pendingRefreshes;
            while (!toRefresh.isEmpty()) {
                toRefresh = this.refreshAllOf(toRefresh);
            }
        }
        finally {
            this.refreshLock.unlock();
        }
    }

    private Set<ExecutableT> refreshAllOf(Set<ExecutableT> toRefresh) {
        HashSet<ExecutableT> newRefreshes = new HashSet<ExecutableT>();
        for (ExecutableT executable : toRefresh) {
            newRefreshes.addAll(this.refreshWatermarks(executable));
        }
        return newRefreshes;
    }

    private Set<ExecutableT> refreshWatermarks(ExecutableT toRefresh) {
        TransformWatermarks myWatermarks = this.transformToWatermarks.get(toRefresh);
        WatermarkUpdate updateResult = myWatermarks.refresh();
        if (updateResult.isAdvanced()) {
            HashSet<ExecutableT> additionalRefreshes = new HashSet<ExecutableT>();
            for (CollectionT outputPValue : this.graph.getProduced(toRefresh)) {
                additionalRefreshes.addAll(this.graph.getPerElementConsumers(outputPValue));
            }
            return additionalRefreshes;
        }
        return Collections.emptySet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<FiredTimers<ExecutableT>> extractFiredTimers() {
        ArrayList allTimers = new ArrayList();
        this.refreshLock.lock();
        try {
            for (Map.Entry<ExecutableT, TransformWatermarks> watermarksEntry : this.transformToWatermarks.entrySet()) {
                Collection firedTimers = watermarksEntry.getValue().extractFiredTimers();
                allTimers.addAll(firedTimers);
            }
            ArrayList arrayList = allTimers;
            return arrayList;
        }
        finally {
            this.refreshLock.unlock();
        }
    }

    @AutoValue
    static abstract class PendingWatermarkUpdate<ExecutableT, CollectionT> {
        PendingWatermarkUpdate() {
        }

        abstract ExecutableT getExecutable();

        @Nullable
        abstract Bundle<?, ? extends CollectionT> getInputBundle();

        abstract TimerUpdate getTimerUpdate();

        @Nullable
        abstract Bundle<?, ? extends CollectionT> getUnprocessedInputs();

        abstract Iterable<? extends Bundle<?, ? extends CollectionT>> getOutputs();

        abstract Instant getEarliestHold();

        public static <ExecutableT, CollectionT> PendingWatermarkUpdate<ExecutableT, CollectionT> create(ExecutableT executable, @Nullable Bundle<?, ? extends CollectionT> inputBundle, TimerUpdate timerUpdate, @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs, Iterable<? extends Bundle<?, ? extends CollectionT>> outputs, Instant earliestHold) {
            return new AutoValue_WatermarkManager_PendingWatermarkUpdate<ExecutableT, CollectionT>(executable, inputBundle, timerUpdate, unprocessedInputs, outputs, earliestHold);
        }
    }

    private static class BundleByElementTimestampComparator
    extends Ordering<Bundle<?, ?>>
    implements Serializable {
        private BundleByElementTimestampComparator() {
        }

        public int compare(Bundle<?, ?> o1, Bundle<?, ?> o2) {
            return ComparisonChain.start().compare((Comparable)o1.getMinimumTimestamp(), (Comparable)o2.getMinimumTimestamp()).result();
        }
    }

    public static class FiredTimers<ExecutableT> {
        private final ExecutableT executable;
        private final StructuralKey<?> key;
        private final Collection<TimerInternals.TimerData> timers;

        private FiredTimers(ExecutableT executable, StructuralKey<?> key, Collection<TimerInternals.TimerData> timers) {
            this.executable = executable;
            this.key = key;
            this.timers = timers;
        }

        public ExecutableT getExecutable() {
            return this.executable;
        }

        public StructuralKey<?> getKey() {
            return this.key;
        }

        public Collection<TimerInternals.TimerData> getTimers() {
            return this.timers;
        }

        public String toString() {
            return MoreObjects.toStringHelper(FiredTimers.class).add("timers", this.timers).toString();
        }
    }

    public static class TimerUpdate {
        private final StructuralKey<?> key;
        private final Iterable<? extends TimerInternals.TimerData> completedTimers;
        private final Iterable<? extends TimerInternals.TimerData> setTimers;
        private final Iterable<? extends TimerInternals.TimerData> deletedTimers;

        public static TimerUpdate empty() {
            return new TimerUpdate(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
        }

        public static TimerUpdateBuilder builder(StructuralKey<?> key) {
            return new TimerUpdateBuilder(key);
        }

        private TimerUpdate(StructuralKey<?> key, Iterable<? extends TimerInternals.TimerData> completedTimers, Iterable<? extends TimerInternals.TimerData> setTimers, Iterable<? extends TimerInternals.TimerData> deletedTimers) {
            this.key = key;
            this.completedTimers = completedTimers;
            this.setTimers = setTimers;
            this.deletedTimers = deletedTimers;
        }

        @VisibleForTesting
        StructuralKey<?> getKey() {
            return this.key;
        }

        @VisibleForTesting
        public Iterable<? extends TimerInternals.TimerData> getCompletedTimers() {
            return this.completedTimers;
        }

        @VisibleForTesting
        public Iterable<? extends TimerInternals.TimerData> getSetTimers() {
            return this.setTimers;
        }

        @VisibleForTesting
        public Iterable<? extends TimerInternals.TimerData> getDeletedTimers() {
            return this.deletedTimers;
        }

        public TimerUpdate withCompletedTimers(Iterable<TimerInternals.TimerData> completedTimers) {
            return new TimerUpdate(this.key, completedTimers, this.setTimers, this.deletedTimers);
        }

        public int hashCode() {
            return Objects.hash(this.key, this.completedTimers, this.setTimers, this.deletedTimers);
        }

        public boolean equals(Object other) {
            if (other == null || !(other instanceof TimerUpdate)) {
                return false;
            }
            TimerUpdate that = (TimerUpdate)other;
            return Objects.equals(this.key, that.key) && Objects.equals(this.completedTimers, that.completedTimers) && Objects.equals(this.setTimers, that.setTimers) && Objects.equals(this.deletedTimers, that.deletedTimers);
        }

        public static final class TimerUpdateBuilder {
            private final StructuralKey<?> key;
            private final Collection<TimerInternals.TimerData> completedTimers;
            private final Collection<TimerInternals.TimerData> setTimers;
            private final Collection<TimerInternals.TimerData> deletedTimers;

            private TimerUpdateBuilder(StructuralKey<?> key) {
                this.key = key;
                this.completedTimers = new LinkedHashSet<TimerInternals.TimerData>();
                this.setTimers = new LinkedHashSet<TimerInternals.TimerData>();
                this.deletedTimers = new LinkedHashSet<TimerInternals.TimerData>();
            }

            public TimerUpdateBuilder withCompletedTimers(Iterable<TimerInternals.TimerData> completedTimers) {
                Iterables.addAll(this.completedTimers, completedTimers);
                return this;
            }

            public TimerUpdateBuilder setTimer(TimerInternals.TimerData setTimer) {
                Preconditions.checkArgument((boolean)setTimer.getTimestamp().isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), (String)"Got a timer for after the end of time (%s), got %s", (Object)BoundedWindow.TIMESTAMP_MAX_VALUE, (Object)setTimer.getTimestamp());
                this.deletedTimers.remove(setTimer);
                this.setTimers.add(setTimer);
                return this;
            }

            public TimerUpdateBuilder deletedTimer(TimerInternals.TimerData deletedTimer) {
                this.deletedTimers.add(deletedTimer);
                this.setTimers.remove(deletedTimer);
                return this;
            }

            public TimerUpdate build() {
                return new TimerUpdate(this.key, (Iterable)ImmutableList.copyOf(this.completedTimers), (Iterable)ImmutableList.copyOf(this.setTimers), (Iterable)ImmutableList.copyOf(this.deletedTimers));
            }
        }
    }

    public class TransformWatermarks {
        private final ExecutableT executable;
        private final AppliedPTransformInputWatermark inputWatermark;
        private final AppliedPTransformOutputWatermark outputWatermark;
        private final SynchronizedProcessingTimeInputWatermark synchronizedProcessingInputWatermark;
        private final SynchronizedProcessingTimeOutputWatermark synchronizedProcessingOutputWatermark;
        private Instant latestSynchronizedInputWm;
        private Instant latestSynchronizedOutputWm;

        private TransformWatermarks(ExecutableT executable, AppliedPTransformInputWatermark inputWatermark, AppliedPTransformOutputWatermark outputWatermark, SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark, SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) {
            this.executable = executable;
            this.inputWatermark = inputWatermark;
            this.outputWatermark = outputWatermark;
            this.synchronizedProcessingInputWatermark = inputSynchProcessingWatermark;
            this.synchronizedProcessingOutputWatermark = outputSynchProcessingWatermark;
            this.latestSynchronizedInputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
            this.latestSynchronizedOutputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
        }

        public Instant getInputWatermark() {
            return (Instant)Preconditions.checkNotNull((Object)this.inputWatermark.get());
        }

        public Instant getOutputWatermark() {
            return this.outputWatermark.get();
        }

        public synchronized Instant getSynchronizedProcessingInputTime() {
            this.latestSynchronizedInputWm = (Instant)INSTANT_ORDERING.max((Object)this.latestSynchronizedInputWm, (Object)((Instant)INSTANT_ORDERING.min((Object)WatermarkManager.this.clock.now(), (Object)this.synchronizedProcessingInputWatermark.get())));
            return this.latestSynchronizedInputWm;
        }

        public synchronized Instant getSynchronizedProcessingOutputTime() {
            this.latestSynchronizedOutputWm = (Instant)INSTANT_ORDERING.max((Object)this.latestSynchronizedOutputWm, (Object)((Instant)INSTANT_ORDERING.min((Object)WatermarkManager.this.clock.now(), (Object)this.synchronizedProcessingOutputWatermark.get())));
            return this.latestSynchronizedOutputWm;
        }

        private WatermarkUpdate refresh() {
            this.inputWatermark.refresh();
            this.synchronizedProcessingInputWatermark.refresh();
            WatermarkUpdate eventOutputUpdate = this.outputWatermark.refresh();
            WatermarkUpdate syncOutputUpdate = this.synchronizedProcessingOutputWatermark.refresh();
            return eventOutputUpdate.union(syncOutputUpdate);
        }

        private void setEventTimeHold(Object key, Instant newHold) {
            this.outputWatermark.updateHold(key, newHold);
        }

        private void removePending(Bundle<?, ?> bundle) {
            this.inputWatermark.removePending(bundle);
            this.synchronizedProcessingInputWatermark.removePending(bundle);
        }

        private void addPending(Bundle<?, ?> bundle) {
            this.inputWatermark.addPending(bundle);
            this.synchronizedProcessingInputWatermark.addPending(bundle);
        }

        private Collection<FiredTimers<ExecutableT>> extractFiredTimers() {
            Map<StructuralKey<?>, List<TimerInternals.TimerData>> eventTimeTimers = this.inputWatermark.extractFiredEventTimeTimers();
            Map processingTimers = this.synchronizedProcessingInputWatermark.extractFiredDomainTimers(TimeDomain.PROCESSING_TIME, WatermarkManager.this.clock.now());
            Map synchronizedTimers = this.synchronizedProcessingInputWatermark.extractFiredDomainTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, this.getSynchronizedProcessingInputTime());
            Map<StructuralKey<?>, List<TimerInternals.TimerData>> timersPerKey = this.groupFiredTimers(eventTimeTimers, processingTimers, synchronizedTimers);
            ArrayList keyFiredTimers = new ArrayList(timersPerKey.size());
            for (Map.Entry<StructuralKey<?>, List<TimerInternals.TimerData>> firedTimers : timersPerKey.entrySet()) {
                keyFiredTimers.add(new FiredTimers(this.executable, firedTimers.getKey(), firedTimers.getValue()));
            }
            return keyFiredTimers;
        }

        @SafeVarargs
        private final Map<StructuralKey<?>, List<TimerInternals.TimerData>> groupFiredTimers(Map<StructuralKey<?>, List<TimerInternals.TimerData>> ... timersToGroup) {
            HashMap groupedTimers = new HashMap();
            for (Map<StructuralKey<?>, List<TimerInternals.TimerData>> subGroup : timersToGroup) {
                for (Map.Entry<StructuralKey<?>, List<TimerInternals.TimerData>> newTimers : subGroup.entrySet()) {
                    List grouped = groupedTimers.computeIfAbsent(newTimers.getKey(), k -> new ArrayList());
                    grouped.addAll((Collection)newTimers.getValue());
                }
            }
            return groupedTimers;
        }

        private void updateTimers(TimerUpdate update) {
            this.inputWatermark.updateTimers(update);
            this.synchronizedProcessingInputWatermark.updateTimers(update);
        }

        public String toString() {
            return MoreObjects.toStringHelper(TransformWatermarks.class).add("inputWatermark", (Object)this.inputWatermark).add("outputWatermark", (Object)this.outputWatermark).add("inputProcessingTime", (Object)this.synchronizedProcessingInputWatermark).add("outputProcessingTime", (Object)this.synchronizedProcessingOutputWatermark).toString();
        }
    }

    private static class PerKeyHolds {
        private final Map<Object, KeyedHold> keyedHolds = new HashMap<Object, KeyedHold>();
        private final NavigableSet<KeyedHold> allHolds = new TreeSet<KeyedHold>();

        private PerKeyHolds() {
        }

        public Instant getMinHold() {
            return this.allHolds.isEmpty() ? THE_END_OF_TIME.get() : ((KeyedHold)this.allHolds.first()).getTimestamp();
        }

        public void updateHold(@Nullable Object key, Instant newHold) {
            this.removeHold(key);
            KeyedHold newKeyedHold = KeyedHold.of(key, newHold);
            this.keyedHolds.put(key, newKeyedHold);
            this.allHolds.add(newKeyedHold);
        }

        public void removeHold(Object key) {
            KeyedHold oldHold = this.keyedHolds.remove(key);
            if (oldHold != null) {
                this.allHolds.remove(oldHold);
            }
        }
    }

    private static final class KeyedHold
    implements Comparable<KeyedHold> {
        private static final Ordering<Object> KEY_ORDERING = Ordering.arbitrary().nullsLast();
        private final Object key;
        private final Instant timestamp;

        public static KeyedHold of(Object key, Instant timestamp) {
            return new KeyedHold(key, (Instant)MoreObjects.firstNonNull((Object)timestamp, (Object)THE_END_OF_TIME.get()));
        }

        private KeyedHold(Object key, Instant timestamp) {
            this.key = key;
            this.timestamp = timestamp;
        }

        @Override
        public int compareTo(KeyedHold that) {
            return ComparisonChain.start().compare((Comparable)this.timestamp, (Comparable)that.timestamp).compare(this.key, that.key, KEY_ORDERING).result();
        }

        public int hashCode() {
            return Objects.hash(this.timestamp, this.key);
        }

        public boolean equals(Object other) {
            if (other == null || !(other instanceof KeyedHold)) {
                return false;
            }
            KeyedHold that = (KeyedHold)other;
            return Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.key, that.key);
        }

        public Instant getTimestamp() {
            return this.timestamp;
        }

        public String toString() {
            return MoreObjects.toStringHelper(KeyedHold.class).add("key", this.key).add("hold", (Object)this.timestamp).toString();
        }
    }

    private static class SynchronizedProcessingTimeOutputWatermark
    implements Watermark {
        private final String name;
        private final SynchronizedProcessingTimeInputWatermark inputWm;
        private AtomicReference<Instant> latestRefresh;

        public SynchronizedProcessingTimeOutputWatermark(String name, SynchronizedProcessingTimeInputWatermark inputWm) {
            this.name = name;
            this.inputWm = inputWm;
            this.latestRefresh = new AtomicReference<Instant>(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public String getName() {
            return this.name;
        }

        @Override
        public Instant get() {
            return this.latestRefresh.get();
        }

        @Override
        public synchronized WatermarkUpdate refresh() {
            Instant oldRefresh = this.latestRefresh.get();
            Instant newTimestamp = (Instant)INSTANT_ORDERING.min((Object)this.inputWm.get(), (Object)this.inputWm.getEarliestTimerTimestamp());
            this.latestRefresh.set(newTimestamp);
            return WatermarkManager.updateAndTrace(this.getName(), oldRefresh, newTimestamp);
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class).add("latestRefresh", this.latestRefresh).toString();
        }
    }

    private static class SynchronizedProcessingTimeInputWatermark
    implements Watermark {
        private final String name;
        private final Collection<? extends Watermark> inputWms;
        private final Collection<Bundle<?, ?>> pendingBundles;
        private final Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> processingTimers;
        private final Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> synchronizedProcessingTimers;
        private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerInternals.TimerData>> existingTimers;
        private final NavigableSet<TimerInternals.TimerData> pendingTimers;
        private AtomicReference<Instant> earliestHold;

        public SynchronizedProcessingTimeInputWatermark(String name, Collection<? extends Watermark> inputWms) {
            this.name = name;
            this.inputWms = inputWms;
            this.pendingBundles = new HashSet();
            this.processingTimers = new HashMap();
            this.synchronizedProcessingTimers = new HashMap();
            this.existingTimers = new HashMap();
            this.pendingTimers = new TreeSet<TimerInternals.TimerData>();
            Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
            for (Watermark watermark : inputWms) {
                initialHold = (Instant)INSTANT_ORDERING.min((Object)initialHold, (Object)watermark.get());
            }
            this.earliestHold = new AtomicReference<Instant>(initialHold);
        }

        @Override
        public String getName() {
            return this.name;
        }

        @Override
        public Instant get() {
            return this.earliestHold.get();
        }

        @Override
        public synchronized WatermarkUpdate refresh() {
            Instant oldHold = this.earliestHold.get();
            Instant minTime = THE_END_OF_TIME.get();
            for (Watermark watermark : this.inputWms) {
                minTime = (Instant)INSTANT_ORDERING.min((Object)minTime, (Object)watermark.get());
            }
            for (Bundle bundle : this.pendingBundles) {
                minTime = (Instant)INSTANT_ORDERING.min((Object)minTime, (Object)bundle.getSynchronizedProcessingOutputWatermark());
            }
            this.earliestHold.set(minTime);
            return WatermarkManager.updateAndTrace(this.getName(), oldHold, minTime);
        }

        public synchronized void addPending(Bundle<?, ?> bundle) {
            this.pendingBundles.add(bundle);
        }

        public synchronized void removePending(Bundle<?, ?> bundle) {
            this.pendingBundles.remove(bundle);
        }

        public synchronized Instant getEarliestTimerTimestamp() {
            Instant earliest = THE_END_OF_TIME.get();
            for (NavigableSet<TimerInternals.TimerData> timers : this.processingTimers.values()) {
                if (timers.isEmpty()) continue;
                earliest = (Instant)INSTANT_ORDERING.min((Object)((TimerInternals.TimerData)timers.first()).getTimestamp(), (Object)earliest);
            }
            for (NavigableSet<TimerInternals.TimerData> timers : this.synchronizedProcessingTimers.values()) {
                if (timers.isEmpty()) continue;
                earliest = (Instant)INSTANT_ORDERING.min((Object)((TimerInternals.TimerData)timers.first()).getTimestamp(), (Object)earliest);
            }
            if (!this.pendingTimers.isEmpty()) {
                earliest = (Instant)INSTANT_ORDERING.min((Object)((TimerInternals.TimerData)this.pendingTimers.first()).getTimestamp(), (Object)earliest);
            }
            return earliest;
        }

        private synchronized void updateTimers(TimerUpdate update) {
            TimerInternals.TimerData existingTimer;
            NavigableSet<TimerInternals.TimerData> timerQueue;
            Map<TimeDomain, NavigableSet<TimerInternals.TimerData>> timerMap = this.timerMap(update.key);
            Table existingTimersForKey = this.existingTimers.computeIfAbsent(update.key, k -> HashBasedTable.create());
            for (TimerInternals.TimerData addedTimer : update.setTimers) {
                timerQueue = timerMap.get(addedTimer.getDomain());
                if (timerQueue == null) continue;
                existingTimer = (TimerInternals.TimerData)existingTimersForKey.get((Object)addedTimer.getNamespace(), (Object)addedTimer.getTimerId());
                if (existingTimer == null) {
                    timerQueue.add(addedTimer);
                } else if (!existingTimer.equals(addedTimer)) {
                    timerQueue.remove(existingTimer);
                    timerQueue.add(addedTimer);
                }
                existingTimersForKey.put((Object)addedTimer.getNamespace(), (Object)addedTimer.getTimerId(), (Object)addedTimer);
            }
            for (TimerInternals.TimerData deletedTimer : update.deletedTimers) {
                timerQueue = timerMap.get(deletedTimer.getDomain());
                if (timerQueue == null || (existingTimer = (TimerInternals.TimerData)existingTimersForKey.get((Object)deletedTimer.getNamespace(), (Object)deletedTimer.getTimerId())) == null) continue;
                this.pendingTimers.remove(deletedTimer);
                timerQueue.remove(deletedTimer);
                existingTimersForKey.remove((Object)existingTimer.getNamespace(), (Object)existingTimer.getTimerId());
            }
            for (TimerInternals.TimerData completedTimer : update.completedTimers) {
                this.pendingTimers.remove(completedTimer);
            }
        }

        private synchronized Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredDomainTimers(TimeDomain domain, Instant firingTime) {
            Map firedTimers;
            switch (domain) {
                case PROCESSING_TIME: {
                    firedTimers = WatermarkManager.extractFiredTimers(firingTime, this.processingTimers);
                    break;
                }
                case SYNCHRONIZED_PROCESSING_TIME: {
                    firedTimers = WatermarkManager.extractFiredTimers((Instant)INSTANT_ORDERING.min((Object)firingTime, (Object)this.earliestHold.get()), this.synchronizedProcessingTimers);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Called getFiredTimers on a Synchronized Processing Time watermark and gave a non-processing time domain " + domain);
                }
            }
            for (Map.Entry firedTimer : firedTimers.entrySet()) {
                this.pendingTimers.addAll((Collection)firedTimer.getValue());
            }
            return firedTimers;
        }

        private Map<TimeDomain, NavigableSet<TimerInternals.TimerData>> timerMap(StructuralKey<?> key) {
            NavigableSet processingQueue = this.processingTimers.computeIfAbsent(key, k -> new TreeSet());
            NavigableSet synchronizedProcessingQueue = this.synchronizedProcessingTimers.computeIfAbsent(key, k -> new TreeSet());
            EnumMap<TimeDomain, NavigableSet<TimerInternals.TimerData>> result = new EnumMap<TimeDomain, NavigableSet<TimerInternals.TimerData>>(TimeDomain.class);
            result.put(TimeDomain.PROCESSING_TIME, processingQueue);
            result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue);
            return result;
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(SynchronizedProcessingTimeInputWatermark.class).add("earliestHold", this.earliestHold).toString();
        }
    }

    private static class AppliedPTransformOutputWatermark
    implements Watermark {
        private final String name;
        private final AppliedPTransformInputWatermark inputWatermark;
        private final PerKeyHolds holds;
        private AtomicReference<Instant> currentWatermark;

        public AppliedPTransformOutputWatermark(String name, AppliedPTransformInputWatermark inputWatermark) {
            this.name = name;
            this.inputWatermark = inputWatermark;
            this.holds = new PerKeyHolds();
            this.currentWatermark = new AtomicReference<Instant>(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        public synchronized void updateHold(Object key, Instant newHold) {
            if (newHold == null) {
                this.holds.removeHold(key);
            } else {
                this.holds.updateHold(key, newHold);
            }
        }

        @Override
        public String getName() {
            return this.name;
        }

        @Override
        public Instant get() {
            return this.currentWatermark.get();
        }

        @Override
        public synchronized WatermarkUpdate refresh() {
            Instant oldWatermark = this.currentWatermark.get();
            Instant newWatermark = (Instant)INSTANT_ORDERING.min((Object)this.inputWatermark.get(), (Object)this.inputWatermark.getEarliestTimerTimestamp(), (Object)this.holds.getMinHold(), (Object[])new Instant[0]);
            newWatermark = (Instant)INSTANT_ORDERING.max((Object)oldWatermark, (Object)newWatermark);
            this.currentWatermark.set(newWatermark);
            return WatermarkManager.updateAndTrace(this.getName(), oldWatermark, newWatermark);
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(AppliedPTransformOutputWatermark.class).add("holds", (Object)this.holds).add("currentWatermark", this.currentWatermark).toString();
        }
    }

    @VisibleForTesting
    static class AppliedPTransformInputWatermark
    implements Watermark {
        private final String name;
        private final Collection<? extends Watermark> inputWatermarks;
        private final SortedMultiset<Bundle<?, ?>> pendingElements;
        private final SortedMultiset<TimerInternals.TimerData> pendingTimers;
        private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerInternals.TimerData>> existingTimers;
        private final Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> objectTimers;
        private AtomicReference<Instant> currentWatermark;

        public AppliedPTransformInputWatermark(String name, Collection<? extends Watermark> inputWatermarks) {
            this.name = name;
            this.inputWatermarks = inputWatermarks;
            Ordering pendingBundleComparator = new BundleByElementTimestampComparator().compound((Comparator)Ordering.arbitrary());
            this.pendingElements = TreeMultiset.create((Comparator)pendingBundleComparator);
            this.pendingTimers = TreeMultiset.create();
            this.objectTimers = new HashMap();
            this.existingTimers = new HashMap();
            this.currentWatermark = new AtomicReference<Instant>(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public String getName() {
            return this.name;
        }

        @Override
        public Instant get() {
            return this.currentWatermark.get();
        }

        @Override
        public synchronized WatermarkUpdate refresh() {
            Instant oldWatermark = this.currentWatermark.get();
            Instant minInputWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
            for (Watermark watermark : this.inputWatermarks) {
                minInputWatermark = (Instant)INSTANT_ORDERING.min((Object)minInputWatermark, (Object)watermark.get());
            }
            if (!this.pendingElements.isEmpty()) {
                minInputWatermark = (Instant)INSTANT_ORDERING.min((Object)minInputWatermark, (Object)((Bundle)this.pendingElements.firstEntry().getElement()).getMinimumTimestamp());
            }
            Instant newWatermark = (Instant)INSTANT_ORDERING.max((Object)oldWatermark, (Object)minInputWatermark);
            this.currentWatermark.set(newWatermark);
            return WatermarkManager.updateAndTrace(this.getName(), oldWatermark, newWatermark);
        }

        private synchronized void addPending(Bundle<?, ?> newPending) {
            this.pendingElements.add(newPending);
        }

        private synchronized void removePending(Bundle<?, ?> completed) {
            this.pendingElements.remove(completed);
        }

        @VisibleForTesting
        synchronized Instant getEarliestTimerTimestamp() {
            if (this.pendingTimers.isEmpty()) {
                return BoundedWindow.TIMESTAMP_MAX_VALUE;
            }
            return ((TimerInternals.TimerData)this.pendingTimers.firstEntry().getElement()).getTimestamp();
        }

        @VisibleForTesting
        synchronized void updateTimers(TimerUpdate update) {
            TimerInternals.TimerData existingTimer;
            NavigableSet keyTimers = this.objectTimers.computeIfAbsent(update.key, k -> new TreeSet());
            Table existingTimersForKey = this.existingTimers.computeIfAbsent(update.key, k -> HashBasedTable.create());
            for (TimerInternals.TimerData timerData : update.getSetTimers()) {
                if (!TimeDomain.EVENT_TIME.equals((Object)timerData.getDomain())) continue;
                existingTimer = (TimerInternals.TimerData)existingTimersForKey.get((Object)timerData.getNamespace(), (Object)timerData.getTimerId());
                if (existingTimer == null) {
                    this.pendingTimers.add((Object)timerData);
                    keyTimers.add(timerData);
                } else if (!existingTimer.equals(timerData)) {
                    this.pendingTimers.remove((Object)existingTimer);
                    keyTimers.remove(existingTimer);
                    this.pendingTimers.add((Object)timerData);
                    keyTimers.add(timerData);
                }
                existingTimersForKey.put((Object)timerData.getNamespace(), (Object)timerData.getTimerId(), (Object)timerData);
            }
            for (TimerInternals.TimerData timerData : update.getDeletedTimers()) {
                if (!TimeDomain.EVENT_TIME.equals((Object)timerData.getDomain()) || (existingTimer = (TimerInternals.TimerData)existingTimersForKey.get((Object)timerData.getNamespace(), (Object)timerData.getTimerId())) == null) continue;
                this.pendingTimers.remove((Object)existingTimer);
                keyTimers.remove(existingTimer);
                existingTimersForKey.remove((Object)existingTimer.getNamespace(), (Object)existingTimer.getTimerId());
            }
            for (TimerInternals.TimerData timerData : update.getCompletedTimers()) {
                if (!TimeDomain.EVENT_TIME.equals((Object)timerData.getDomain())) continue;
                keyTimers.remove(timerData);
                this.pendingTimers.remove((Object)timerData);
            }
        }

        @VisibleForTesting
        synchronized Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredEventTimeTimers() {
            return WatermarkManager.extractFiredTimers(this.currentWatermark.get(), this.objectTimers);
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(AppliedPTransformInputWatermark.class).add("pendingElements", this.pendingElements).add("currentWatermark", this.currentWatermark).toString();
        }
    }

    private static enum WatermarkUpdate {
        ADVANCED(true),
        NO_CHANGE(false);

        private final boolean advanced;

        private WatermarkUpdate(boolean advanced) {
            this.advanced = advanced;
        }

        public boolean isAdvanced() {
            return this.advanced;
        }

        public WatermarkUpdate union(WatermarkUpdate that) {
            if (this.advanced) {
                return this;
            }
            return that;
        }

        public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTime) {
            if (currentTime.isAfter((ReadableInstant)oldTime)) {
                return ADVANCED;
            }
            return NO_CHANGE;
        }
    }

    @VisibleForTesting
    static interface Watermark {
        public String getName();

        public Instant get();

        public WatermarkUpdate refresh();
    }
}

