/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferedElement;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferedElements;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingElementsHandler;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.KeyedBufferingElementsHandler;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.NonKeyedBufferingElementsHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.joda.time.Instant;

public class BufferingDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT, OutputT> {
    private final DoFnRunner<InputT, OutputT> underlying;
    private final ListState<CheckpointElement> notYetAcknowledgedSnapshots;
    private final BufferingElementsHandlerFactory bufferingElementsHandlerFactory;
    private String currentStateId;
    private BufferingElementsHandler currentBufferingElementsHandler;

    public static <InputT, OutputT> BufferingDoFnRunner<InputT, OutputT> create(DoFnRunner<InputT, OutputT> doFnRunner, String stateName, Coder windowedInputCoder, Coder windowCoder, OperatorStateBackend operatorStateBackend, @Nullable KeyedStateBackend<Object> keyedStateBackend) throws Exception {
        return new BufferingDoFnRunner<InputT, OutputT>(doFnRunner, stateName, windowedInputCoder, windowCoder, operatorStateBackend, keyedStateBackend);
    }

    private BufferingDoFnRunner(DoFnRunner<InputT, OutputT> underlying, String stateName, Coder inputCoder, Coder windowCoder, OperatorStateBackend operatorStateBackend, @Nullable KeyedStateBackend keyedStateBackend) throws Exception {
        this.underlying = underlying;
        this.notYetAcknowledgedSnapshots = operatorStateBackend.getUnionListState(new ListStateDescriptor("notYetAcknowledgedSnapshots", CheckpointElement.class));
        this.bufferingElementsHandlerFactory = stateId -> {
            ListStateDescriptor stateDescriptor = new ListStateDescriptor(stateName + stateId, new CoderTypeSerializer<BufferedElement>(new BufferedElements.Coder((Coder<WindowedValue>)inputCoder, (Coder<BoundedWindow>)windowCoder)));
            if (keyedStateBackend != null) {
                return KeyedBufferingElementsHandler.create(keyedStateBackend, (ListStateDescriptor<BufferedElement>)stateDescriptor);
            }
            return NonKeyedBufferingElementsHandler.create((ListState<BufferedElement>)operatorStateBackend.getListState(stateDescriptor));
        };
        this.currentStateId = BufferingDoFnRunner.generateNewId();
        this.currentBufferingElementsHandler = this.bufferingElementsHandlerFactory.get(this.currentStateId);
    }

    public void startBundle() {
    }

    public void processElement(WindowedValue<InputT> elem) {
        this.currentBufferingElementsHandler.buffer(new BufferedElements.Element(elem));
    }

    public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
        this.currentBufferingElementsHandler.buffer(new BufferedElements.Timer(timerId, window, timestamp, timeDomain));
    }

    public void finishBundle() {
    }

    public DoFn<InputT, OutputT> getFn() {
        return this.underlying.getFn();
    }

    public void checkpoint(long checkpointId) throws Exception {
        this.addToBeAcknowledgedCheckpoint(checkpointId, this.currentStateId);
        this.currentStateId = BufferingDoFnRunner.generateNewId();
        this.currentBufferingElementsHandler = this.bufferingElementsHandlerFactory.get(this.currentStateId);
    }

    public void checkpointCompleted(long checkpointId) throws Exception {
        List<CheckpointElement> toAck = this.removeToBeAcknowledgedCheckpoints(checkpointId);
        for (CheckpointElement toBeAcked : toAck) {
            BufferingElementsHandler bufferingElementsHandler = this.bufferingElementsHandlerFactory.get(toBeAcked.internalId);
            Iterator iterator = bufferingElementsHandler.getElements().iterator();
            boolean hasElements = iterator.hasNext();
            if (hasElements) {
                this.underlying.startBundle();
            }
            while (iterator.hasNext()) {
                BufferedElement bufferedElement = (BufferedElement)iterator.next();
                bufferedElement.processWith(this.underlying);
            }
            if (hasElements) {
                this.underlying.finishBundle();
            }
            bufferingElementsHandler.clear();
        }
    }

    private void addToBeAcknowledgedCheckpoint(long checkpointId, String internalId) throws Exception {
        this.notYetAcknowledgedSnapshots.addAll(Collections.singletonList(new CheckpointElement(internalId, checkpointId)));
    }

    private List<CheckpointElement> removeToBeAcknowledgedCheckpoints(long checkpointId) throws Exception {
        ArrayList<CheckpointElement> toBeAcknowledged = new ArrayList<CheckpointElement>();
        ArrayList<CheckpointElement> checkpoints = new ArrayList<CheckpointElement>();
        for (CheckpointElement element : (Iterable)this.notYetAcknowledgedSnapshots.get()) {
            if (element.checkpointId <= checkpointId) {
                toBeAcknowledged.add(element);
                continue;
            }
            checkpoints.add(element);
        }
        this.notYetAcknowledgedSnapshots.update(checkpoints);
        toBeAcknowledged.sort(Comparator.comparingLong(o -> o.checkpointId));
        return toBeAcknowledged;
    }

    private static String generateNewId() {
        return UUID.randomUUID().toString();
    }

    private static class CheckpointElement {
        final String internalId;
        final long checkpointId;

        CheckpointElement(String internalId, long checkpointId) {
            this.internalId = internalId;
            this.checkpointId = checkpointId;
        }
    }

    private static interface BufferingElementsHandlerFactory {
        public BufferingElementsHandler get(String var1) throws Exception;
    }
}

