/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.transforms;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class WithTimestamps<T>
extends PTransform<PCollection<T>, PCollection<T>> {
    private final SerializableFunction<T, Instant> fn;
    private final Duration allowedTimestampSkew;

    public static <T> WithTimestamps<T> of(SerializableFunction<T, Instant> fn) {
        return new WithTimestamps<T>(fn, Duration.ZERO);
    }

    private WithTimestamps(SerializableFunction<T, Instant> fn, Duration allowedTimestampSkew) {
        this.fn = Preconditions.checkNotNull(fn, "WithTimestamps fn cannot be null");
        this.allowedTimestampSkew = allowedTimestampSkew;
    }

    public WithTimestamps<T> withAllowedTimestampSkew(Duration allowedTimestampSkew) {
        return new WithTimestamps<T>(this.fn, allowedTimestampSkew);
    }

    public Duration getAllowedTimestampSkew() {
        return this.allowedTimestampSkew;
    }

    @Override
    public PCollection<T> apply(PCollection<T> input) {
        return ((PCollection)input.apply(ParDo.named("AddTimestamps").of(new AddTimestampsDoFn<T>(this.fn, this.allowedTimestampSkew)))).setTypeDescriptorInternal(input.getTypeDescriptor());
    }

    private static class AddTimestampsDoFn<T>
    extends DoFn<T, T> {
        private final SerializableFunction<T, Instant> fn;
        private final Duration allowedTimestampSkew;

        public AddTimestampsDoFn(SerializableFunction<T, Instant> fn, Duration allowedTimestampSkew) {
            this.fn = fn;
            this.allowedTimestampSkew = allowedTimestampSkew;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) {
            Instant timestamp = this.fn.apply(c.element());
            Preconditions.checkNotNull(timestamp, "Timestamps for WithTimestamps cannot be null. Timestamp provided by %s.", this.fn);
            c.outputWithTimestamp(c.element(), timestamp);
        }

        @Override
        public Duration getAllowedTimestampSkew() {
            return this.allowedTimestampSkew;
        }
    }
}

