/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.nio.ByteBuffer;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.joda.time.Duration;

public class DedupingOperator<T>
extends AbstractStreamOperator<WindowedValue<T>>
implements OneInputStreamOperator<WindowedValue<ValueWithRecordId<T>>, WindowedValue<T>>,
Triggerable<ByteBuffer, VoidNamespace> {
    private static final long MAX_RETENTION_SINCE_ACCESS = Duration.standardMinutes((long)10L).getMillis();
    private ValueStateDescriptor<Long> dedupingStateDescriptor = new ValueStateDescriptor("dedup-cache", (TypeSerializer)LongSerializer.INSTANCE);
    private transient InternalTimerService<VoidNamespace> timerService;

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.timerService = this.getInternalTimerService("dedup-cleanup-timer", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
    }

    public void processElement(StreamRecord<WindowedValue<ValueWithRecordId<T>>> streamRecord) throws Exception {
        ValueState dedupingState = (ValueState)this.getPartitionedState((StateDescriptor)this.dedupingStateDescriptor);
        Long lastSeenTimestamp = (Long)dedupingState.value();
        if (lastSeenTimestamp == null) {
            WindowedValue value = (WindowedValue)streamRecord.getValue();
            this.output.collect((Object)streamRecord.replace((Object)value.withValue(((ValueWithRecordId)value.getValue()).getValue())));
        }
        long currentProcessingTime = this.timerService.currentProcessingTime();
        dedupingState.update((Object)currentProcessingTime);
        this.timerService.registerProcessingTimeTimer((Object)VoidNamespace.INSTANCE, currentProcessingTime + MAX_RETENTION_SINCE_ACCESS);
    }

    public void onEventTime(InternalTimer<ByteBuffer, VoidNamespace> internalTimer) {
    }

    public void onProcessingTime(InternalTimer<ByteBuffer, VoidNamespace> internalTimer) throws Exception {
        ValueState dedupingState = (ValueState)this.getPartitionedState((StateDescriptor)this.dedupingStateDescriptor);
        Long lastSeenTimestamp = (Long)dedupingState.value();
        if (lastSeenTimestamp != null && lastSeenTimestamp.equals(internalTimer.getTimestamp() - MAX_RETENTION_SINCE_ACCESS)) {
            dedupingState.clear();
        }
    }
}

