/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferedElement;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferedElements;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingElementsHandler;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.KeyedBufferingElementsHandler;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.NonKeyedBufferingElementsHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

public class BufferingDoFnRunner<@UnknownKeyFor InputT, @UnknownKeyFor OutputT>
implements DoFnRunner<InputT, OutputT> {
    private final @UnknownKeyFor @NonNull @Initialized DoFnRunner<InputT, OutputT> underlying;
    private final @UnknownKeyFor @NonNull @Initialized ListState<@UnknownKeyFor @NonNull @Initialized CheckpointIdentifier> notYetAcknowledgedSnapshots;
    private final @UnknownKeyFor @NonNull @Initialized BufferingElementsHandlerFactory bufferingElementsHandlerFactory;
    final @UnknownKeyFor @NonNull @Initialized int numCheckpointBuffers;
    @UnknownKeyFor @NonNull @Initialized int currentStateIndex;
    private @UnknownKeyFor @NonNull @Initialized BufferingElementsHandler currentBufferingElementsHandler;

    public static <InputT, OutputT> @UnknownKeyFor @NonNull @Initialized BufferingDoFnRunner<InputT, OutputT> create(@UnknownKeyFor @NonNull @Initialized DoFnRunner<InputT, OutputT> doFnRunner, @UnknownKeyFor @NonNull @Initialized String stateName, @UnknownKeyFor @NonNull @Initialized Coder windowedInputCoder, @UnknownKeyFor @NonNull @Initialized Coder windowCoder, @UnknownKeyFor @NonNull @Initialized OperatorStateBackend operatorStateBackend, @Nullable @UnknownKeyFor @Initialized KeyedStateBackend<@UnknownKeyFor @NonNull @Initialized Object> keyedStateBackend, @UnknownKeyFor @NonNull @Initialized int maxConcurrentCheckpoints, @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions pipelineOptions) throws @UnknownKeyFor @NonNull @Initialized Exception {
        return new BufferingDoFnRunner<InputT, OutputT>(doFnRunner, stateName, windowedInputCoder, windowCoder, operatorStateBackend, keyedStateBackend, maxConcurrentCheckpoints, pipelineOptions);
    }

    private BufferingDoFnRunner(@UnknownKeyFor @NonNull @Initialized DoFnRunner<InputT, OutputT> underlying, @UnknownKeyFor @NonNull @Initialized String stateName, @UnknownKeyFor @NonNull @Initialized Coder inputCoder, @UnknownKeyFor @NonNull @Initialized Coder windowCoder, @UnknownKeyFor @NonNull @Initialized OperatorStateBackend operatorStateBackend, @Nullable @UnknownKeyFor @Initialized KeyedStateBackend keyedStateBackend, @UnknownKeyFor @NonNull @Initialized int maxConcurrentCheckpoints, @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions pipelineOptions) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Preconditions.checkArgument((maxConcurrentCheckpoints > 0 && maxConcurrentCheckpoints < Short.MAX_VALUE ? 1 : 0) != 0, (String)"Maximum number of concurrent checkpoints not within the bounds of 0 and %s", (int)Short.MAX_VALUE);
        this.underlying = underlying;
        this.notYetAcknowledgedSnapshots = operatorStateBackend.getUnionListState(new ListStateDescriptor("notYetAcknowledgedSnapshots", CheckpointIdentifier.class));
        this.bufferingElementsHandlerFactory = stateId -> {
            ListStateDescriptor stateDescriptor = new ListStateDescriptor(stateName + stateId, new CoderTypeSerializer<BufferedElement>(new BufferedElements.Coder((Coder<WindowedValue>)inputCoder, (Coder<BoundedWindow>)windowCoder, null), pipelineOptions));
            if (keyedStateBackend != null) {
                return KeyedBufferingElementsHandler.create(keyedStateBackend, (ListStateDescriptor<BufferedElement>)stateDescriptor);
            }
            return NonKeyedBufferingElementsHandler.create((ListState<BufferedElement>)operatorStateBackend.getListState(stateDescriptor));
        };
        this.numCheckpointBuffers = this.initializeState(maxConcurrentCheckpoints);
        this.currentBufferingElementsHandler = this.bufferingElementsHandlerFactory.get(this.rotateAndGetStateIndex());
    }

    private @UnknownKeyFor @NonNull @Initialized int initializeState(@UnknownKeyFor @NonNull @Initialized int maxConcurrentCheckpoints) throws @UnknownKeyFor @NonNull @Initialized Exception {
        ArrayList pendingSnapshots = new ArrayList();
        Iterables.addAll(pendingSnapshots, (Iterable)((Iterable)this.notYetAcknowledgedSnapshots.get()));
        int lastUsedIndex = -1;
        int maxIndex = 0;
        if (!pendingSnapshots.isEmpty()) {
            for (CheckpointIdentifier checkpointIdentifier : pendingSnapshots) {
                maxIndex = Math.max(maxIndex, checkpointIdentifier.internalId);
            }
            lastUsedIndex = ((CheckpointIdentifier)pendingSnapshots.get((int)(pendingSnapshots.size() - 1))).internalId;
        }
        this.currentStateIndex = lastUsedIndex;
        return Math.max(maxConcurrentCheckpoints, maxIndex) + 1;
    }

    public void startBundle() {
    }

    public void processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<InputT> elem) {
        this.currentBufferingElementsHandler.buffer(new BufferedElements.Element(elem));
    }

    public <KeyT> void onTimer(@UnknownKeyFor @NonNull @Initialized String timerId, @UnknownKeyFor @NonNull @Initialized String timerFamilyId, KeyT key, @UnknownKeyFor @NonNull @Initialized BoundedWindow window, @UnknownKeyFor @NonNull @Initialized Instant timestamp, @UnknownKeyFor @NonNull @Initialized Instant outputTimestamp, @UnknownKeyFor @NonNull @Initialized TimeDomain timeDomain) {
        this.currentBufferingElementsHandler.buffer(new BufferedElements.Timer<KeyT>(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain));
    }

    public void finishBundle() {
    }

    public <KeyT> void onWindowExpiration(@UnknownKeyFor @NonNull @Initialized BoundedWindow window, @UnknownKeyFor @NonNull @Initialized Instant timestamp, KeyT key) {
    }

    public @UnknownKeyFor @NonNull @Initialized DoFn<InputT, OutputT> getFn() {
        return this.underlying.getFn();
    }

    public void checkpoint(@UnknownKeyFor @NonNull @Initialized long checkpointId) throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.addToBeAcknowledgedCheckpoint(checkpointId, this.getStateIndex());
        int newStateIndex = this.rotateAndGetStateIndex();
        this.currentBufferingElementsHandler = this.bufferingElementsHandlerFactory.get(newStateIndex);
    }

    public void checkpointCompleted(@UnknownKeyFor @NonNull @Initialized long checkpointId) throws @UnknownKeyFor @NonNull @Initialized Exception {
        List<CheckpointIdentifier> allToAck = this.gatherToBeAcknowledgedCheckpoints(checkpointId);
        for (CheckpointIdentifier toBeAcked : allToAck) {
            BufferingElementsHandler bufferingElementsHandler = this.bufferingElementsHandlerFactory.get(toBeAcked.internalId);
            Iterator iterator = bufferingElementsHandler.getElements().iterator();
            boolean hasElements = iterator.hasNext();
            if (hasElements) {
                this.underlying.startBundle();
            }
            while (iterator.hasNext()) {
                BufferedElement bufferedElement = (BufferedElement)iterator.next();
                bufferedElement.processWith(this.underlying);
            }
            if (hasElements) {
                this.underlying.finishBundle();
            }
            bufferingElementsHandler.clear();
        }
    }

    private void addToBeAcknowledgedCheckpoint(@UnknownKeyFor @NonNull @Initialized long checkpointId, @UnknownKeyFor @NonNull @Initialized int internalId) throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.notYetAcknowledgedSnapshots.addAll(Collections.singletonList(new CheckpointIdentifier(internalId, checkpointId)));
    }

    private @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized CheckpointIdentifier> gatherToBeAcknowledgedCheckpoints(@UnknownKeyFor @NonNull @Initialized long checkpointId) throws @UnknownKeyFor @NonNull @Initialized Exception {
        ArrayList<CheckpointIdentifier> toBeAcknowledged = new ArrayList<CheckpointIdentifier>();
        ArrayList<CheckpointIdentifier> remaining = new ArrayList<CheckpointIdentifier>();
        for (CheckpointIdentifier element : (Iterable)this.notYetAcknowledgedSnapshots.get()) {
            if (element.checkpointId <= checkpointId) {
                toBeAcknowledged.add(element);
                continue;
            }
            remaining.add(element);
        }
        this.notYetAcknowledgedSnapshots.update(remaining);
        toBeAcknowledged.sort(Comparator.comparingLong(o -> o.checkpointId));
        return toBeAcknowledged;
    }

    private @UnknownKeyFor @NonNull @Initialized int rotateAndGetStateIndex() {
        this.currentStateIndex = (this.currentStateIndex + 1) % this.numCheckpointBuffers;
        return this.currentStateIndex;
    }

    private @UnknownKeyFor @NonNull @Initialized int getStateIndex() {
        return this.currentStateIndex;
    }

    static class CheckpointIdentifier {
        final @UnknownKeyFor @NonNull @Initialized int internalId;
        final @UnknownKeyFor @NonNull @Initialized long checkpointId;

        CheckpointIdentifier(@UnknownKeyFor @NonNull @Initialized int internalId, @UnknownKeyFor @NonNull @Initialized long checkpointId) {
            this.internalId = internalId;
            this.checkpointId = checkpointId;
        }
    }

    private static interface BufferingElementsHandlerFactory {
        public @UnknownKeyFor @NonNull @Initialized BufferingElementsHandler get(@UnknownKeyFor @NonNull @Initialized int var1) throws @UnknownKeyFor @NonNull @Initialized Exception;
    }
}

