/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.MoreObjects;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ComparisonChain;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.FluentIterable;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableCollection;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableSet;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Ordering;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.SortedMultiset;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.TreeMultiset;
import com.google.cloud.dataflow.sdk.runners.inprocess.AutoValue_InMemoryWatermarkManager_PendingWatermarkUpdate;
import com.google.cloud.dataflow.sdk.runners.inprocess.Clock;
import com.google.cloud.dataflow.sdk.runners.inprocess.CommittedResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.inprocess.StructuralKey;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.TimeDomain;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

class InMemoryWatermarkManager {
    private static final Watermark THE_END_OF_TIME = new Watermark(){

        @Override
        public WatermarkUpdate refresh() {
            return WatermarkUpdate.NO_CHANGE;
        }

        @Override
        public Instant get() {
            return BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
    };
    private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
    private static final Function<WindowedValue<?>, ? extends Iterable<? extends WindowedValue<?>>> EXPLODE_WINDOWS_FN = new Function<WindowedValue<?>, Iterable<? extends WindowedValue<?>>>(){

        @Override
        public Iterable<? extends WindowedValue<?>> apply(WindowedValue<?> input) {
            return input.explodeWindows();
        }
    };
    private final Clock clock;
    private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers;
    private final Map<AppliedPTransform<?, ?, ?>, TransformWatermarks> transformToWatermarks;
    private final ConcurrentLinkedQueue<PendingWatermarkUpdate> pendingUpdates;
    private final ConcurrentLinkedQueue<AppliedPTransform<?, ?, ?>> pendingRefreshes;

    private static Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredTimers(Instant latestTime, Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> objectTimers) {
        HashMap result = new HashMap();
        HashSet emptyKeys = new HashSet();
        for (Map.Entry<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> pendingTimers : objectTimers.entrySet()) {
            NavigableSet<TimerInternals.TimerData> timers = pendingTimers.getValue();
            if (!timers.isEmpty() && ((TimerInternals.TimerData)timers.first()).getTimestamp().isBefore((ReadableInstant)latestTime)) {
                ArrayList keyFiredTimers = new ArrayList();
                result.put(pendingTimers.getKey(), keyFiredTimers);
                while (!timers.isEmpty() && ((TimerInternals.TimerData)timers.first()).getTimestamp().isBefore((ReadableInstant)latestTime)) {
                    keyFiredTimers.add(timers.first());
                    timers.remove(timers.first());
                }
            }
            if (!timers.isEmpty()) continue;
            emptyKeys.add(pendingTimers.getKey());
        }
        objectTimers.keySet().removeAll(emptyKeys);
        return result;
    }

    public static InMemoryWatermarkManager create(Clock clock, Collection<AppliedPTransform<?, ?, ?>> rootTransforms, Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
        return new InMemoryWatermarkManager(clock, rootTransforms, consumers);
    }

    private InMemoryWatermarkManager(Clock clock, Collection<AppliedPTransform<?, ?, ?>> rootTransforms, Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
        this.clock = clock;
        this.consumers = consumers;
        this.pendingUpdates = new ConcurrentLinkedQueue();
        this.pendingRefreshes = new ConcurrentLinkedQueue();
        this.transformToWatermarks = new HashMap();
        for (AppliedPTransform<?, ?, ?> appliedPTransform : rootTransforms) {
            this.getTransformWatermark(appliedPTransform);
        }
        for (Collection collection : consumers.values()) {
            for (AppliedPTransform transform : collection) {
                this.getTransformWatermark(transform);
            }
        }
    }

    private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> transform) {
        TransformWatermarks wms = this.transformToWatermarks.get(transform);
        if (wms == null) {
            List<Watermark> inputCollectionWatermarks = this.getInputWatermarks(transform);
            AppliedPTransformInputWatermark inputWatermark = new AppliedPTransformInputWatermark(inputCollectionWatermarks);
            AppliedPTransformOutputWatermark outputWatermark = new AppliedPTransformOutputWatermark(inputWatermark);
            SynchronizedProcessingTimeInputWatermark inputProcessingWatermark = new SynchronizedProcessingTimeInputWatermark(this.getInputProcessingWatermarks(transform));
            SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark = new SynchronizedProcessingTimeOutputWatermark(inputProcessingWatermark);
            wms = new TransformWatermarks(inputWatermark, outputWatermark, inputProcessingWatermark, outputProcessingWatermark);
            this.transformToWatermarks.put(transform, wms);
        }
        return wms;
    }

    private Collection<Watermark> getInputProcessingWatermarks(AppliedPTransform<?, ?, ?> transform) {
        ImmutableList.Builder inputWmsBuilder = ImmutableList.builder();
        Collection<? extends PValue> inputs = transform.getInput().expand();
        if (inputs.isEmpty()) {
            inputWmsBuilder.add(THE_END_OF_TIME);
        }
        for (PValue pValue : inputs) {
            SynchronizedProcessingTimeOutputWatermark producerOutputWatermark = this.getTransformWatermark(pValue.getProducingTransformInternal()).synchronizedProcessingOutputWatermark;
            inputWmsBuilder.add(producerOutputWatermark);
        }
        return inputWmsBuilder.build();
    }

    private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) {
        ImmutableList.Builder inputWatermarksBuilder = ImmutableList.builder();
        Collection<? extends PValue> inputs = transform.getInput().expand();
        if (inputs.isEmpty()) {
            inputWatermarksBuilder.add(THE_END_OF_TIME);
        }
        for (PValue pValue : inputs) {
            AppliedPTransformOutputWatermark producerOutputWatermark = this.getTransformWatermark(pValue.getProducingTransformInternal()).outputWatermark;
            inputWatermarksBuilder.add(producerOutputWatermark);
        }
        ImmutableCollection inputCollectionWatermarks = inputWatermarksBuilder.build();
        return inputCollectionWatermarks;
    }

    public TransformWatermarks getWatermarks(AppliedPTransform<?, ?, ?> transform) {
        return this.transformToWatermarks.get(transform);
    }

    public void updateWatermarks(@Nullable InProcessPipelineRunner.CommittedBundle<?> completed, TimerUpdate timerUpdate, CommittedResult result, Instant earliestHold) {
        this.pendingUpdates.offer(PendingWatermarkUpdate.create(completed, timerUpdate, result, earliestHold));
    }

    private void applyPendingUpdates() {
        HashSet updatedTransforms = new HashSet();
        PendingWatermarkUpdate pending = this.pendingUpdates.poll();
        while (pending != null) {
            this.applyPendingUpdate(pending);
            updatedTransforms.add(pending.getTransform());
            pending = this.pendingUpdates.poll();
        }
        this.pendingRefreshes.addAll(updatedTransforms);
    }

    private void applyPendingUpdate(PendingWatermarkUpdate pending) {
        CommittedResult result = pending.getResult();
        AppliedPTransform<?, ?, ?> transform = result.getTransform();
        InProcessPipelineRunner.CommittedBundle<?> inputBundle = pending.getInputBundle();
        this.updatePending(inputBundle, pending.getTimerUpdate(), result);
        TransformWatermarks transformWms = this.transformToWatermarks.get(transform);
        transformWms.setEventTimeHold(inputBundle == null ? null : inputBundle.getKey(), pending.getEarliestHold());
    }

    private void updatePending(InProcessPipelineRunner.CommittedBundle<?> input, TimerUpdate timerUpdate, CommittedResult result) {
        for (InProcessPipelineRunner.CommittedBundle<?> bundle : result.getOutputs()) {
            for (AppliedPTransform<?, ?, ?> consumer : this.consumers.get(bundle.getPCollection())) {
                TransformWatermarks watermarks = this.transformToWatermarks.get(consumer);
                watermarks.addPending(bundle);
            }
        }
        TransformWatermarks completedTransform = this.transformToWatermarks.get(result.getTransform());
        if (input != null) {
            completedTransform.addPending(result.getUnprocessedInputs());
        }
        completedTransform.updateTimers(timerUpdate);
        if (input != null) {
            completedTransform.removePending(input);
        }
    }

    synchronized void refreshAll() {
        this.applyPendingUpdates();
        while (!this.pendingRefreshes.isEmpty()) {
            this.refreshWatermarks(this.pendingRefreshes.poll());
        }
    }

    private void refreshWatermarks(AppliedPTransform<?, ?, ?> toRefresh) {
        TransformWatermarks myWatermarks = this.transformToWatermarks.get(toRefresh);
        WatermarkUpdate updateResult = myWatermarks.refresh();
        HashSet additionalRefreshes = new HashSet();
        if (updateResult.isAdvanced()) {
            for (PValue pValue : toRefresh.getOutput().expand()) {
                additionalRefreshes.addAll(this.consumers.get(pValue));
            }
        }
        this.pendingRefreshes.addAll(additionalRefreshes);
    }

    public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() {
        HashMap allTimers = new HashMap();
        for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry : this.transformToWatermarks.entrySet()) {
            Map keyFiredTimers = watermarksEntry.getValue().extractFiredTimers();
            if (keyFiredTimers.isEmpty()) continue;
            allTimers.put(watermarksEntry.getKey(), keyFiredTimers);
        }
        return allTimers;
    }

    public Set<AppliedPTransform<?, ?, ?>> getCompletedTransforms() {
        HashSet result = new HashSet();
        for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> wms : this.transformToWatermarks.entrySet()) {
            if (!wms.getValue().getOutputWatermark().equals((Object)THE_END_OF_TIME.get())) continue;
            result.add(wms.getKey());
        }
        return result;
    }

    static abstract class PendingWatermarkUpdate {
        PendingWatermarkUpdate() {
        }

        @Nullable
        public abstract InProcessPipelineRunner.CommittedBundle<?> getInputBundle();

        public abstract TimerUpdate getTimerUpdate();

        public abstract CommittedResult getResult();

        public abstract Instant getEarliestHold();

        public AppliedPTransform<?, ?, ?> getTransform() {
            return this.getResult().getTransform();
        }

        public static PendingWatermarkUpdate create(InProcessPipelineRunner.CommittedBundle<?> inputBundle, TimerUpdate timerUpdate, CommittedResult result, Instant earliestHold) {
            return new AutoValue_InMemoryWatermarkManager_PendingWatermarkUpdate(inputBundle, timerUpdate, result, earliestHold);
        }
    }

    private static class WindowedValueByTimestampComparator
    extends Ordering<WindowedValue<?>> {
        private WindowedValueByTimestampComparator() {
        }

        @Override
        public int compare(WindowedValue<?> o1, WindowedValue<?> o2) {
            return ComparisonChain.start().compare((Comparable<?>)o1.getTimestamp(), (Comparable<?>)o2.getTimestamp()).result();
        }
    }

    public static class FiredTimers {
        private final Map<TimeDomain, ? extends Collection<TimerInternals.TimerData>> timers;

        private FiredTimers(Map<TimeDomain, ? extends Collection<TimerInternals.TimerData>> timers) {
            this.timers = timers;
        }

        public Collection<TimerInternals.TimerData> getTimers(TimeDomain domain) {
            Collection<TimerInternals.TimerData> domainTimers = this.timers.get((Object)domain);
            if (domainTimers == null) {
                return Collections.emptyList();
            }
            return domainTimers;
        }

        public String toString() {
            return MoreObjects.toStringHelper(FiredTimers.class).add("timers", this.timers).toString();
        }
    }

    public static class TimerUpdate {
        private final StructuralKey<?> key;
        private final Iterable<? extends TimerInternals.TimerData> completedTimers;
        private final Iterable<? extends TimerInternals.TimerData> setTimers;
        private final Iterable<? extends TimerInternals.TimerData> deletedTimers;

        public static TimerUpdate empty() {
            return new TimerUpdate(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
        }

        public static TimerUpdateBuilder builder(StructuralKey<?> key) {
            return new TimerUpdateBuilder(key);
        }

        private TimerUpdate(StructuralKey<?> key, Iterable<? extends TimerInternals.TimerData> completedTimers, Iterable<? extends TimerInternals.TimerData> setTimers, Iterable<? extends TimerInternals.TimerData> deletedTimers) {
            this.key = key;
            this.completedTimers = completedTimers;
            this.setTimers = setTimers;
            this.deletedTimers = deletedTimers;
        }

        @VisibleForTesting
        StructuralKey<?> getKey() {
            return this.key;
        }

        @VisibleForTesting
        Iterable<? extends TimerInternals.TimerData> getCompletedTimers() {
            return this.completedTimers;
        }

        @VisibleForTesting
        Iterable<? extends TimerInternals.TimerData> getSetTimers() {
            return this.setTimers;
        }

        @VisibleForTesting
        Iterable<? extends TimerInternals.TimerData> getDeletedTimers() {
            return this.deletedTimers;
        }

        public TimerUpdate withCompletedTimers(Iterable<TimerInternals.TimerData> completedTimers) {
            return new TimerUpdate(this.key, completedTimers, this.setTimers, this.deletedTimers);
        }

        public int hashCode() {
            return Objects.hash(this.key, this.completedTimers, this.setTimers, this.deletedTimers);
        }

        public boolean equals(Object other) {
            if (other == null || !(other instanceof TimerUpdate)) {
                return false;
            }
            TimerUpdate that = (TimerUpdate)other;
            return Objects.equals(this.key, that.key) && Objects.equals(this.completedTimers, that.completedTimers) && Objects.equals(this.setTimers, that.setTimers) && Objects.equals(this.deletedTimers, that.deletedTimers);
        }

        public static final class TimerUpdateBuilder {
            private final StructuralKey<?> key;
            private final Collection<TimerInternals.TimerData> completedTimers;
            private final Collection<TimerInternals.TimerData> setTimers;
            private final Collection<TimerInternals.TimerData> deletedTimers;

            private TimerUpdateBuilder(StructuralKey<?> key) {
                this.key = key;
                this.completedTimers = new HashSet<TimerInternals.TimerData>();
                this.setTimers = new HashSet<TimerInternals.TimerData>();
                this.deletedTimers = new HashSet<TimerInternals.TimerData>();
            }

            public TimerUpdateBuilder withCompletedTimers(Iterable<TimerInternals.TimerData> completedTimers) {
                Iterables.addAll(this.completedTimers, completedTimers);
                return this;
            }

            public TimerUpdateBuilder setTimer(TimerInternals.TimerData setTimer) {
                this.deletedTimers.remove(setTimer);
                this.setTimers.add(setTimer);
                return this;
            }

            public TimerUpdateBuilder deletedTimer(TimerInternals.TimerData deletedTimer) {
                this.deletedTimers.add(deletedTimer);
                this.setTimers.remove(deletedTimer);
                return this;
            }

            public TimerUpdate build() {
                return new TimerUpdate(this.key, ImmutableSet.copyOf(this.completedTimers), ImmutableSet.copyOf(this.setTimers), ImmutableSet.copyOf(this.deletedTimers));
            }
        }
    }

    public class TransformWatermarks {
        private final AppliedPTransformInputWatermark inputWatermark;
        private final AppliedPTransformOutputWatermark outputWatermark;
        private final SynchronizedProcessingTimeInputWatermark synchronizedProcessingInputWatermark;
        private final SynchronizedProcessingTimeOutputWatermark synchronizedProcessingOutputWatermark;
        private Instant latestSynchronizedInputWm;
        private Instant latestSynchronizedOutputWm;

        private TransformWatermarks(AppliedPTransformInputWatermark inputWatermark, AppliedPTransformOutputWatermark outputWatermark, SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark, SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) {
            this.inputWatermark = inputWatermark;
            this.outputWatermark = outputWatermark;
            this.synchronizedProcessingInputWatermark = inputSynchProcessingWatermark;
            this.synchronizedProcessingOutputWatermark = outputSynchProcessingWatermark;
            this.latestSynchronizedInputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
            this.latestSynchronizedOutputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
        }

        public Instant getInputWatermark() {
            return Preconditions.checkNotNull(this.inputWatermark.get());
        }

        public Instant getOutputWatermark() {
            return this.outputWatermark.get();
        }

        public synchronized Instant getSynchronizedProcessingInputTime() {
            this.latestSynchronizedInputWm = INSTANT_ORDERING.max(this.latestSynchronizedInputWm, INSTANT_ORDERING.min(InMemoryWatermarkManager.this.clock.now(), this.synchronizedProcessingInputWatermark.get()));
            return this.latestSynchronizedInputWm;
        }

        public synchronized Instant getSynchronizedProcessingOutputTime() {
            this.latestSynchronizedOutputWm = INSTANT_ORDERING.max(this.latestSynchronizedOutputWm, INSTANT_ORDERING.min(InMemoryWatermarkManager.this.clock.now(), this.synchronizedProcessingOutputWatermark.get()));
            return this.latestSynchronizedOutputWm;
        }

        private WatermarkUpdate refresh() {
            this.inputWatermark.refresh();
            this.synchronizedProcessingInputWatermark.refresh();
            WatermarkUpdate eventOutputUpdate = this.outputWatermark.refresh();
            WatermarkUpdate syncOutputUpdate = this.synchronizedProcessingOutputWatermark.refresh();
            return eventOutputUpdate.union(syncOutputUpdate);
        }

        private void setEventTimeHold(Object key, Instant newHold) {
            this.outputWatermark.updateHold(key, newHold);
        }

        private void removePending(InProcessPipelineRunner.CommittedBundle<?> bundle) {
            this.inputWatermark.removePendingElements(this.elementsFromBundle(bundle));
            this.synchronizedProcessingInputWatermark.removePending(bundle);
        }

        private void addPending(InProcessPipelineRunner.CommittedBundle<?> bundle) {
            this.inputWatermark.addPendingElements(this.elementsFromBundle(bundle));
            this.synchronizedProcessingInputWatermark.addPending(bundle);
        }

        private Iterable<? extends WindowedValue<?>> elementsFromBundle(InProcessPipelineRunner.CommittedBundle<?> bundle) {
            return FluentIterable.from(bundle.getElements()).transformAndConcat(EXPLODE_WINDOWS_FN);
        }

        private Map<StructuralKey<?>, FiredTimers> extractFiredTimers() {
            Map eventTimeTimers = this.inputWatermark.extractFiredEventTimeTimers();
            Map processingTimers = this.synchronizedProcessingInputWatermark.extractFiredDomainTimers(TimeDomain.PROCESSING_TIME, InMemoryWatermarkManager.this.clock.now());
            Map synchronizedTimers = this.synchronizedProcessingInputWatermark.extractFiredDomainTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, this.getSynchronizedProcessingInputTime());
            HashMap groupedTimers = new HashMap();
            this.groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers);
            HashMap keyFiredTimers = new HashMap();
            for (Map.Entry firedTimers : groupedTimers.entrySet()) {
                keyFiredTimers.put((StructuralKey<?>)firedTimers.getKey(), new FiredTimers((Map)firedTimers.getValue()));
            }
            return keyFiredTimers;
        }

        @SafeVarargs
        private final void groupFiredTimers(Map<StructuralKey<?>, Map<TimeDomain, List<TimerInternals.TimerData>>> groupedToMutate, Map<StructuralKey<?>, List<TimerInternals.TimerData>> ... timersToGroup) {
            for (Map<StructuralKey<?>, List<TimerInternals.TimerData>> subGroup : timersToGroup) {
                for (Map.Entry<StructuralKey<?>, List<TimerInternals.TimerData>> newTimers : subGroup.entrySet()) {
                    Map<TimeDomain, List<TimerInternals.TimerData>> grouped = groupedToMutate.get(newTimers.getKey());
                    if (grouped == null) {
                        grouped = new HashMap<TimeDomain, List<TimerInternals.TimerData>>();
                        groupedToMutate.put(newTimers.getKey(), grouped);
                    }
                    grouped.put(newTimers.getValue().get(0).getDomain(), newTimers.getValue());
                }
            }
        }

        private void updateTimers(TimerUpdate update) {
            this.inputWatermark.updateTimers(update);
            this.synchronizedProcessingInputWatermark.updateTimers(update);
        }

        public String toString() {
            return MoreObjects.toStringHelper(TransformWatermarks.class).add("inputWatermark", this.inputWatermark).add("outputWatermark", this.outputWatermark).add("inputProcessingTime", this.synchronizedProcessingInputWatermark).add("outputProcessingTime", this.synchronizedProcessingOutputWatermark).toString();
        }
    }

    private static class PerKeyHolds {
        private final Map<Object, KeyedHold> keyedHolds = new HashMap<Object, KeyedHold>();
        private final PriorityQueue<KeyedHold> allHolds = new PriorityQueue();

        private PerKeyHolds() {
        }

        public Instant getMinHold() {
            return this.allHolds.isEmpty() ? THE_END_OF_TIME.get() : this.allHolds.peek().getTimestamp();
        }

        public void updateHold(@Nullable Object key, Instant newHold) {
            this.removeHold(key);
            KeyedHold newKeyedHold = KeyedHold.of(key, newHold);
            this.keyedHolds.put(key, newKeyedHold);
            this.allHolds.offer(newKeyedHold);
        }

        public void removeHold(Object key) {
            KeyedHold oldHold = this.keyedHolds.get(key);
            if (oldHold != null) {
                this.allHolds.remove(oldHold);
            }
        }
    }

    private static final class KeyedHold
    implements Comparable<KeyedHold> {
        private static final Ordering<Object> KEY_ORDERING = Ordering.arbitrary().nullsLast();
        private final Object key;
        private final Instant timestamp;

        public static KeyedHold of(Object key, Instant timestamp) {
            return new KeyedHold(key, MoreObjects.firstNonNull(timestamp, THE_END_OF_TIME.get()));
        }

        private KeyedHold(Object key, Instant timestamp) {
            this.key = key;
            this.timestamp = timestamp;
        }

        @Override
        public int compareTo(KeyedHold that) {
            return ComparisonChain.start().compare((Comparable<?>)this.timestamp, (Comparable<?>)that.timestamp).compare(this.key, that.key, KEY_ORDERING).result();
        }

        public int hashCode() {
            return Objects.hash(this.timestamp, this.key);
        }

        public boolean equals(Object other) {
            if (other == null || !(other instanceof KeyedHold)) {
                return false;
            }
            KeyedHold that = (KeyedHold)other;
            return Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.key, that.key);
        }

        public Instant getTimestamp() {
            return this.timestamp;
        }

        public String toString() {
            return MoreObjects.toStringHelper(KeyedHold.class).add("key", this.key).add("hold", this.timestamp).toString();
        }
    }

    private static class SynchronizedProcessingTimeOutputWatermark
    implements Watermark {
        private final SynchronizedProcessingTimeInputWatermark inputWm;
        private AtomicReference<Instant> latestRefresh;

        public SynchronizedProcessingTimeOutputWatermark(SynchronizedProcessingTimeInputWatermark inputWm) {
            this.inputWm = inputWm;
            this.latestRefresh = new AtomicReference<Instant>(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public Instant get() {
            return this.latestRefresh.get();
        }

        @Override
        public synchronized WatermarkUpdate refresh() {
            Instant oldRefresh = this.latestRefresh.get();
            Instant newTimestamp = INSTANT_ORDERING.min(this.inputWm.get(), this.inputWm.getEarliestTimerTimestamp());
            this.latestRefresh.set(newTimestamp);
            return WatermarkUpdate.fromTimestamps(oldRefresh, newTimestamp);
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class).add("latestRefresh", this.latestRefresh).toString();
        }
    }

    private static class SynchronizedProcessingTimeInputWatermark
    implements Watermark {
        private final Collection<? extends Watermark> inputWms;
        private final Collection<InProcessPipelineRunner.CommittedBundle<?>> pendingBundles;
        private final Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> processingTimers;
        private final Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> synchronizedProcessingTimers;
        private final PriorityQueue<TimerInternals.TimerData> pendingTimers;
        private AtomicReference<Instant> earliestHold;

        public SynchronizedProcessingTimeInputWatermark(Collection<? extends Watermark> inputWms) {
            this.inputWms = inputWms;
            this.pendingBundles = new HashSet();
            this.processingTimers = new HashMap();
            this.synchronizedProcessingTimers = new HashMap();
            this.pendingTimers = new PriorityQueue();
            Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
            for (Watermark watermark : inputWms) {
                initialHold = INSTANT_ORDERING.min(initialHold, watermark.get());
            }
            this.earliestHold = new AtomicReference<Instant>(initialHold);
        }

        @Override
        public Instant get() {
            return this.earliestHold.get();
        }

        @Override
        public synchronized WatermarkUpdate refresh() {
            Instant oldHold = this.earliestHold.get();
            Instant minTime = THE_END_OF_TIME.get();
            for (Watermark watermark : this.inputWms) {
                minTime = INSTANT_ORDERING.min(minTime, watermark.get());
            }
            for (InProcessPipelineRunner.CommittedBundle committedBundle : this.pendingBundles) {
                minTime = INSTANT_ORDERING.min(minTime, committedBundle.getSynchronizedProcessingOutputWatermark());
            }
            this.earliestHold.set(minTime);
            return WatermarkUpdate.fromTimestamps(oldHold, minTime);
        }

        public synchronized void addPending(InProcessPipelineRunner.CommittedBundle<?> bundle) {
            this.pendingBundles.add(bundle);
        }

        public synchronized void removePending(InProcessPipelineRunner.CommittedBundle<?> bundle) {
            this.pendingBundles.remove(bundle);
        }

        public synchronized Instant getEarliestTimerTimestamp() {
            Instant earliest = THE_END_OF_TIME.get();
            for (NavigableSet<TimerInternals.TimerData> timers : this.processingTimers.values()) {
                if (timers.isEmpty()) continue;
                earliest = INSTANT_ORDERING.min(((TimerInternals.TimerData)timers.first()).getTimestamp(), earliest);
            }
            for (NavigableSet<TimerInternals.TimerData> timers : this.synchronizedProcessingTimers.values()) {
                if (timers.isEmpty()) continue;
                earliest = INSTANT_ORDERING.min(((TimerInternals.TimerData)timers.first()).getTimestamp(), earliest);
            }
            if (!this.pendingTimers.isEmpty()) {
                earliest = INSTANT_ORDERING.min(this.pendingTimers.peek().getTimestamp(), earliest);
            }
            return earliest;
        }

        private synchronized void updateTimers(TimerUpdate update) {
            NavigableSet<TimerInternals.TimerData> timerQueue;
            Map<TimeDomain, NavigableSet<TimerInternals.TimerData>> timerMap = this.timerMap(update.key);
            for (TimerInternals.TimerData addedTimer : update.setTimers) {
                timerQueue = timerMap.get((Object)addedTimer.getDomain());
                if (timerQueue == null) continue;
                timerQueue.add(addedTimer);
            }
            for (TimerInternals.TimerData completedTimer : update.completedTimers) {
                this.pendingTimers.remove(completedTimer);
            }
            for (TimerInternals.TimerData deletedTimer : update.deletedTimers) {
                timerQueue = timerMap.get((Object)deletedTimer.getDomain());
                if (timerQueue == null) continue;
                timerQueue.remove(deletedTimer);
            }
        }

        private synchronized Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredDomainTimers(TimeDomain domain, Instant firingTime) {
            Map firedTimers;
            switch (domain) {
                case PROCESSING_TIME: {
                    firedTimers = InMemoryWatermarkManager.extractFiredTimers(firingTime, this.processingTimers);
                    break;
                }
                case SYNCHRONIZED_PROCESSING_TIME: {
                    firedTimers = InMemoryWatermarkManager.extractFiredTimers(INSTANT_ORDERING.min(firingTime, this.earliestHold.get()), this.synchronizedProcessingTimers);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Called getFiredTimers on a Synchronized Processing Time watermark and gave a non-processing time domain " + (Object)((Object)domain));
                }
            }
            for (Map.Entry firedTimer : firedTimers.entrySet()) {
                this.pendingTimers.addAll((Collection)firedTimer.getValue());
            }
            return firedTimers;
        }

        private Map<TimeDomain, NavigableSet<TimerInternals.TimerData>> timerMap(StructuralKey<?> key) {
            NavigableSet<TimerInternals.TimerData> synchronizedProcessingQueue;
            NavigableSet<TimerInternals.TimerData> processingQueue = this.processingTimers.get(key);
            if (processingQueue == null) {
                processingQueue = new TreeSet<TimerInternals.TimerData>();
                this.processingTimers.put(key, processingQueue);
            }
            if ((synchronizedProcessingQueue = this.synchronizedProcessingTimers.get(key)) == null) {
                synchronizedProcessingQueue = new TreeSet<TimerInternals.TimerData>();
                this.synchronizedProcessingTimers.put(key, synchronizedProcessingQueue);
            }
            EnumMap<TimeDomain, NavigableSet<TimerInternals.TimerData>> result = new EnumMap<TimeDomain, NavigableSet<TimerInternals.TimerData>>(TimeDomain.class);
            result.put(TimeDomain.PROCESSING_TIME, processingQueue);
            result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue);
            return result;
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(SynchronizedProcessingTimeInputWatermark.class).add("earliestHold", this.earliestHold).toString();
        }
    }

    private static class AppliedPTransformOutputWatermark
    implements Watermark {
        private final Watermark inputWatermark;
        private final PerKeyHolds holds;
        private AtomicReference<Instant> currentWatermark;

        public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) {
            this.inputWatermark = inputWatermark;
            this.holds = new PerKeyHolds();
            this.currentWatermark = new AtomicReference<Instant>(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        public synchronized void updateHold(Object key, Instant newHold) {
            if (newHold == null) {
                this.holds.removeHold(key);
            } else {
                this.holds.updateHold(key, newHold);
            }
        }

        @Override
        public Instant get() {
            return this.currentWatermark.get();
        }

        @Override
        public synchronized WatermarkUpdate refresh() {
            Instant oldWatermark = this.currentWatermark.get();
            Instant newWatermark = INSTANT_ORDERING.min(this.inputWatermark.get(), this.holds.getMinHold());
            newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
            this.currentWatermark.set(newWatermark);
            return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(AppliedPTransformOutputWatermark.class).add("holds", this.holds).add("currentWatermark", this.currentWatermark).toString();
        }
    }

    private static class AppliedPTransformInputWatermark
    implements Watermark {
        private final Collection<? extends Watermark> inputWatermarks;
        private final SortedMultiset<WindowedValue<?>> pendingElements;
        private final Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> objectTimers;
        private AtomicReference<Instant> currentWatermark;

        public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) {
            this.inputWatermarks = inputWatermarks;
            this.pendingElements = TreeMultiset.create(new WindowedValueByTimestampComparator());
            this.objectTimers = new HashMap();
            this.currentWatermark = new AtomicReference<Instant>(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public Instant get() {
            return this.currentWatermark.get();
        }

        @Override
        public synchronized WatermarkUpdate refresh() {
            Instant oldWatermark = this.currentWatermark.get();
            Instant minInputWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
            for (Watermark watermark : this.inputWatermarks) {
                minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, watermark.get());
            }
            if (!this.pendingElements.isEmpty()) {
                minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, this.pendingElements.firstEntry().getElement().getTimestamp());
            }
            Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
            this.currentWatermark.set(newWatermark);
            return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
        }

        private synchronized void addPendingElements(Iterable<? extends WindowedValue<?>> newPending) {
            for (WindowedValue<?> pendingElement : newPending) {
                this.pendingElements.add(pendingElement);
            }
        }

        private synchronized void removePendingElements(Iterable<? extends WindowedValue<?>> finishedElements) {
            for (WindowedValue<?> finishedElement : finishedElements) {
                this.pendingElements.remove(finishedElement);
            }
        }

        private synchronized void updateTimers(TimerUpdate update) {
            NavigableSet<TimerInternals.TimerData> keyTimers = this.objectTimers.get(update.key);
            if (keyTimers == null) {
                keyTimers = new TreeSet<TimerInternals.TimerData>();
                this.objectTimers.put(update.key, keyTimers);
            }
            for (TimerInternals.TimerData timer : update.setTimers) {
                if (!TimeDomain.EVENT_TIME.equals((Object)timer.getDomain())) continue;
                keyTimers.add(timer);
            }
            for (TimerInternals.TimerData timer : update.deletedTimers) {
                if (!TimeDomain.EVENT_TIME.equals((Object)timer.getDomain())) continue;
                keyTimers.remove(timer);
            }
        }

        private synchronized Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredEventTimeTimers() {
            return InMemoryWatermarkManager.extractFiredTimers(this.currentWatermark.get(), this.objectTimers);
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(AppliedPTransformInputWatermark.class).add("pendingElements", this.pendingElements).add("currentWatermark", this.currentWatermark).toString();
        }
    }

    private static enum WatermarkUpdate {
        ADVANCED(true),
        NO_CHANGE(false);

        private final boolean advanced;

        private WatermarkUpdate(boolean advanced) {
            this.advanced = advanced;
        }

        public boolean isAdvanced() {
            return this.advanced;
        }

        public WatermarkUpdate union(WatermarkUpdate that) {
            if (this.advanced) {
                return this;
            }
            return that;
        }

        public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTime) {
            if (currentTime.isAfter((ReadableInstant)oldTime)) {
                return ADVANCED;
            }
            return NO_CHANGE;
        }
    }

    private static interface Watermark {
        public Instant get();

        public WatermarkUpdate refresh();
    }
}

