/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.HashMultimap;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.SetMultimap;
import com.google.cloud.dataflow.sdk.runners.inprocess.BundleFactory;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.inprocess.StructuralKey;
import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
import com.google.cloud.dataflow.sdk.util.MutationDetector;
import com.google.cloud.dataflow.sdk.util.MutationDetectors;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import org.joda.time.Instant;

class ImmutabilityCheckingBundleFactory
implements BundleFactory {
    private final BundleFactory underlying;

    public static ImmutabilityCheckingBundleFactory create(BundleFactory underlying) {
        return new ImmutabilityCheckingBundleFactory(underlying);
    }

    private ImmutabilityCheckingBundleFactory(BundleFactory underlying) {
        this.underlying = Preconditions.checkNotNull(underlying);
    }

    @Override
    public <T> InProcessPipelineRunner.UncommittedBundle<T> createRootBundle(PCollection<T> output) {
        return new ImmutabilityEnforcingBundle<T>(this.underlying.createRootBundle(output));
    }

    @Override
    public <T> InProcessPipelineRunner.UncommittedBundle<T> createBundle(InProcessPipelineRunner.CommittedBundle<?> input, PCollection<T> output) {
        return new ImmutabilityEnforcingBundle<T>(this.underlying.createBundle(input, output));
    }

    @Override
    public <K, T> InProcessPipelineRunner.UncommittedBundle<T> createKeyedBundle(InProcessPipelineRunner.CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
        return new ImmutabilityEnforcingBundle<T>(this.underlying.createKeyedBundle(input, key, output));
    }

    private static class ImmutabilityEnforcingBundle<T>
    implements InProcessPipelineRunner.UncommittedBundle<T> {
        private final InProcessPipelineRunner.UncommittedBundle<T> underlying;
        private final SetMultimap<WindowedValue<T>, MutationDetector> mutationDetectors;
        private Coder<T> coder;

        public ImmutabilityEnforcingBundle(InProcessPipelineRunner.UncommittedBundle<T> underlying) {
            this.underlying = underlying;
            this.mutationDetectors = HashMultimap.create();
            this.coder = this.getPCollection().getCoder();
        }

        @Override
        public PCollection<T> getPCollection() {
            return this.underlying.getPCollection();
        }

        @Override
        public InProcessPipelineRunner.UncommittedBundle<T> add(WindowedValue<T> element) {
            try {
                this.mutationDetectors.put(element, MutationDetectors.forValueWithCoder(element.getValue(), this.coder));
            }
            catch (CoderException e) {
                throw new RuntimeException(e);
            }
            this.underlying.add(element);
            return this;
        }

        @Override
        public InProcessPipelineRunner.CommittedBundle<T> commit(Instant synchronizedProcessingTime) {
            for (MutationDetector detector : this.mutationDetectors.values()) {
                try {
                    detector.verifyUnmodified();
                }
                catch (IllegalMutationException exn) {
                    throw UserCodeException.wrap(new IllegalMutationException(String.format("PTransform %s mutated value %s after it was output (new value was %s). Values must not be mutated in any way after being output.", this.underlying.getPCollection().getProducingTransformInternal().getFullName(), exn.getSavedValue(), exn.getNewValue()), exn.getSavedValue(), exn.getNewValue(), exn));
                }
            }
            return this.underlying.commit(synchronizedProcessingTime);
        }
    }
}

