/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.connectors.flink;

import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class EventTimeOrderingOperator<K, T>
extends AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T>,
Triggerable<K, VoidNamespace>,
InputTypeConfigurable {
    private static final long serialVersionUID = 1L;
    private static final String EVENT_QUEUE_STATE_NAME = "eventQueue";
    @VisibleForTesting
    long lastWatermark = Long.MIN_VALUE;
    private TypeSerializer<T> inputSerializer;
    private transient InternalTimerService<VoidNamespace> internalTimerService;
    private transient MapState<Long, List<T>> elementQueueState;

    public EventTimeOrderingOperator() {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
        this.inputSerializer = type.createSerializer(executionConfig);
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        if (this.elementQueueState == null) {
            this.elementQueueState = this.getRuntimeContext().getMapState(new MapStateDescriptor(EVENT_QUEUE_STATE_NAME, (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)new ListSerializer(this.inputSerializer)));
        }
    }

    public void open() throws Exception {
        super.open();
        this.internalTimerService = this.getInternalTimerService("ordering-timers", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
    }

    public void processElement(StreamRecord<T> element) throws Exception {
        if (!element.hasTimestamp()) {
            this.output.collect(element);
            return;
        }
        if (element.getTimestamp() > this.lastWatermark) {
            this.saveRegisterWatermarkTimer();
            this.bufferEvent(element);
        }
    }

    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.lastWatermark = mark.getTimestamp();
    }

    private void bufferEvent(StreamRecord<T> element) throws Exception {
        assert (element.hasTimestamp());
        long timestamp = element.getTimestamp();
        ArrayList<Object> elementsForTimestamp = (ArrayList<Object>)this.elementQueueState.get((Object)timestamp);
        if (elementsForTimestamp == null) {
            elementsForTimestamp = new ArrayList<Object>(1);
        }
        if (this.getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()) {
            elementsForTimestamp.add(this.inputSerializer.copy(element.getValue()));
        } else {
            elementsForTimestamp.add(element.getValue());
        }
        this.elementQueueState.put((Object)timestamp, elementsForTimestamp);
    }

    private void saveRegisterWatermarkTimer() {
        long currentWatermark = this.internalTimerService.currentWatermark();
        if (currentWatermark + 1L > currentWatermark) {
            this.internalTimerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, currentWatermark + 1L);
        }
    }

    public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        long currentWatermark = this.internalTimerService.currentWatermark();
        PriorityQueue<Long> sortedTimestamps = this.getSortedTimestamps();
        while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= currentWatermark) {
            long timestamp = sortedTimestamps.poll();
            for (Object event : (List)this.elementQueueState.get((Object)timestamp)) {
                this.output.collect((Object)new StreamRecord(event, timestamp));
            }
            this.elementQueueState.remove((Object)timestamp);
        }
        if (sortedTimestamps.isEmpty()) {
            this.elementQueueState.clear();
        }
        if (!sortedTimestamps.isEmpty()) {
            this.saveRegisterWatermarkTimer();
        }
    }

    public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
    }

    private PriorityQueue<Long> getSortedTimestamps() throws Exception {
        PriorityQueue<Long> sortedTimestamps = new PriorityQueue<Long>();
        for (Long timestamp : this.elementQueueState.keys()) {
            sortedTimestamps.offer(timestamp);
        }
        return sortedTimestamps;
    }
}

