/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.functions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.TimerReference;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkExecutableStageFunction<InputT>
extends AbstractRichFunction
implements MapPartitionFunction<WindowedValue<InputT>, RawUnionValue>,
GroupReduceFunction<WindowedValue<InputT>, RawUnionValue> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutableStageFunction.class);
    private final RunnerApi.ExecutableStagePayload stagePayload;
    private final JobInfo jobInfo;
    private final Map<String, Integer> outputMap;
    private final FlinkExecutableStageContext.Factory contextFactory;
    private final Coder windowCoder;
    private final String stageName;
    private transient RuntimeContext runtimeContext;
    private transient FlinkMetricContainer container;
    private transient StateRequestHandler stateRequestHandler;
    private transient FlinkExecutableStageContext stageContext;
    private transient StageBundleFactory stageBundleFactory;
    private transient BundleProgressHandler progressHandler;
    private transient InMemoryBagUserStateFactory bagUserStateHandlerFactory;
    private transient ExecutableStage executableStage;
    private transient Object currentTimerKey;

    public FlinkExecutableStageFunction(RunnerApi.ExecutableStagePayload stagePayload, JobInfo jobInfo, Map<String, Integer> outputMap, FlinkExecutableStageContext.Factory contextFactory, Coder windowCoder) {
        this.stagePayload = stagePayload;
        this.jobInfo = jobInfo;
        this.outputMap = outputMap;
        this.contextFactory = contextFactory;
        this.windowCoder = windowCoder;
        this.stageName = stagePayload.getInput();
    }

    public void open(Configuration parameters) throws Exception {
        FileSystems.setDefaultPipelineOptions((PipelineOptions)PipelineOptionsFactory.create());
        this.executableStage = ExecutableStage.fromPayload((RunnerApi.ExecutableStagePayload)this.stagePayload);
        this.runtimeContext = this.getRuntimeContext();
        this.container = new FlinkMetricContainer(this.getRuntimeContext());
        this.stageContext = this.contextFactory.get(this.jobInfo);
        this.stageBundleFactory = this.stageContext.getStageBundleFactory(this.executableStage);
        this.stateRequestHandler = this.getStateRequestHandler(this.executableStage, this.stageBundleFactory.getProcessBundleDescriptor(), this.runtimeContext);
        this.progressHandler = new BundleProgressHandler(){

            public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {
                FlinkExecutableStageFunction.this.container.updateMetrics(FlinkExecutableStageFunction.this.stageName, progress.getMonitoringInfosList());
            }

            public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
                FlinkExecutableStageFunction.this.container.updateMetrics(FlinkExecutableStageFunction.this.stageName, response.getMonitoringInfosList());
            }
        };
    }

    private StateRequestHandler getStateRequestHandler(ExecutableStage executableStage, ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor, RuntimeContext runtimeContext) {
        StateRequestHandler userStateHandler;
        StateRequestHandler sideInputHandler;
        BatchSideInputHandlerFactory sideInputHandlerFactory = BatchSideInputHandlerFactory.forStage((ExecutableStage)executableStage, arg_0 -> ((RuntimeContext)runtimeContext).getBroadcastVariable(arg_0));
        try {
            sideInputHandler = StateRequestHandlers.forSideInputHandlerFactory((Map)ProcessBundleDescriptors.getSideInputs((ExecutableStage)executableStage), (StateRequestHandlers.SideInputHandlerFactory)sideInputHandlerFactory);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to setup state handler", e);
        }
        if (executableStage.getUserStates().size() > 0) {
            this.bagUserStateHandlerFactory = new InMemoryBagUserStateFactory();
            userStateHandler = StateRequestHandlers.forBagUserStateHandlerFactory((ProcessBundleDescriptors.ExecutableProcessBundleDescriptor)processBundleDescriptor, (StateRequestHandlers.BagUserStateHandlerFactory)this.bagUserStateHandlerFactory);
        } else {
            userStateHandler = StateRequestHandler.unsupported();
        }
        EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> handlerMap = new EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler>(BeamFnApi.StateKey.TypeCase.class);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, sideInputHandler);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.BAG_USER_STATE, userStateHandler);
        return StateRequestHandlers.delegateBasedUponType(handlerMap);
    }

    public void mapPartition(Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> collector) throws Exception {
        ReceiverFactory receiverFactory = new ReceiverFactory(collector, this.outputMap);
        try (RemoteBundle bundle = this.stageBundleFactory.getBundle((OutputReceiverFactory)receiverFactory, this.stateRequestHandler, this.progressHandler);){
            this.processElements(iterable, bundle);
        }
    }

    public void reduce(Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> collector) throws Exception {
        if (this.bagUserStateHandlerFactory != null) {
            this.bagUserStateHandlerFactory.resetForNewKey();
        }
        InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
        timerInternals.advanceProcessingTime(Instant.now());
        timerInternals.advanceSynchronizedProcessingTime(Instant.now());
        ReceiverFactory receiverFactory = new ReceiverFactory(collector, this.outputMap, new TimerReceiverFactory(this.stageBundleFactory, this.executableStage.getTimers(), this.stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs(), (timerElement, timerData) -> {
            this.currentTimerKey = ((KV)timerElement.getValue()).getKey();
            timerInternals.setTimer(timerData);
        }, this.windowCoder));
        try (RemoteBundle bundle = this.stageBundleFactory.getBundle((OutputReceiverFactory)receiverFactory, this.stateRequestHandler, this.progressHandler);){
            this.processElements(iterable, bundle);
        }
        timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
        timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
        bundle = this.stageBundleFactory.getBundle((OutputReceiverFactory)receiverFactory, this.stateRequestHandler, this.progressHandler);
        var6_6 = null;
        try {
            this.fireEligibleTimers(timerInternals, (timerId, timerValue) -> {
                FnDataReceiver fnTimerReceiver = (FnDataReceiver)bundle.getInputReceivers().get(timerId);
                Preconditions.checkNotNull((Object)fnTimerReceiver, (String)"No FnDataReceiver found for %s", (Object[])new Object[]{timerId});
                try {
                    fnTimerReceiver.accept(timerValue);
                }
                catch (Exception e) {
                    throw new RuntimeException(String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
                }
            });
        }
        catch (Throwable throwable) {
            var6_6 = throwable;
            throw throwable;
        }
        finally {
            if (bundle != null) {
                FlinkExecutableStageFunction.$closeResource(var6_6, (AutoCloseable)bundle);
            }
        }
    }

    private void processElements(Iterable<WindowedValue<InputT>> iterable, RemoteBundle bundle) throws Exception {
        Preconditions.checkArgument((bundle != null ? 1 : 0) != 0, (Object)"RemoteBundle must not be null");
        String inputPCollectionId = this.executableStage.getInputPCollection().getId();
        FnDataReceiver mainReceiver = (FnDataReceiver)Preconditions.checkNotNull((Object)((FnDataReceiver)bundle.getInputReceivers().get(inputPCollectionId)), (String)"Main input receiver for %s could not be initialized", (Object[])new Object[]{inputPCollectionId});
        for (WindowedValue<InputT> input : iterable) {
            mainReceiver.accept(input);
        }
    }

    private void fireEligibleTimers(InMemoryTimerInternals timerInternals, BiConsumer<String, WindowedValue> timerConsumer) {
        boolean hasFired;
        do {
            TimerInternals.TimerData timer;
            hasFired = false;
            while ((timer = timerInternals.removeNextEventTimer()) != null) {
                hasFired = true;
                this.fireTimer(timer, timerConsumer);
            }
            while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
                hasFired = true;
                this.fireTimer(timer, timerConsumer);
            }
            while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
                hasFired = true;
                this.fireTimer(timer, timerConsumer);
            }
        } while (hasFired);
    }

    private void fireTimer(TimerInternals.TimerData timer, BiConsumer<String, WindowedValue> timerConsumer) {
        StateNamespace namespace = timer.getNamespace();
        Preconditions.checkArgument((boolean)(namespace instanceof StateNamespaces.WindowNamespace));
        BoundedWindow window = ((StateNamespaces.WindowNamespace)namespace).getWindow();
        Instant timestamp = timer.getTimestamp();
        WindowedValue timerValue = WindowedValue.of((Object)KV.of((Object)this.currentTimerKey, (Object)Timer.of((Instant)timestamp, (Object)new byte[0])), (Instant)timestamp, Collections.singleton(window), (PaneInfo)PaneInfo.NO_FIRING);
        timerConsumer.accept(timer.getTimerId(), timerValue);
    }

    public void close() throws Exception {
        if (this.stageContext != null) {
            try (StageBundleFactory bundleFactoryCloser = this.stageBundleFactory;){
                FlinkExecutableStageContext closable = this.stageContext;
                Throwable throwable = null;
                if (closable != null) {
                    FlinkExecutableStageFunction.$closeResource(throwable, closable);
                }
            }
            catch (Exception e) {
                LOG.error("Error in close: ", (Throwable)e);
                throw e;
            }
        }
        this.stageContext = null;
    }

    private static class InMemoryBagUserStateFactory
    implements StateRequestHandlers.BagUserStateHandlerFactory {
        private List<InMemorySingleKeyBagState> handlers = new ArrayList<InMemorySingleKeyBagState>();

        private InMemoryBagUserStateFactory() {
        }

        public <K, V, W extends BoundedWindow> StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState(String pTransformId, String userStateId, Coder<K> keyCoder, Coder<V> valueCoder, Coder<W> windowCoder) {
            InMemorySingleKeyBagState bagUserStateHandler = new InMemorySingleKeyBagState(userStateId, valueCoder, windowCoder);
            this.handlers.add(bagUserStateHandler);
            return bagUserStateHandler;
        }

        void resetForNewKey() {
            for (InMemorySingleKeyBagState stateBags : this.handlers) {
                stateBags.reset();
            }
        }

        static class InMemorySingleKeyBagState<K, V, W extends BoundedWindow>
        implements StateRequestHandlers.BagUserStateHandler<K, V, W> {
            private final StateTag<BagState<V>> stateTag;
            private final Coder<W> windowCoder;
            private volatile StateInternals stateInternals;

            InMemorySingleKeyBagState(String userStateId, Coder<V> valueCoder, Coder<W> windowCoder) {
                this.windowCoder = windowCoder;
                this.stateTag = StateTags.bag((String)userStateId, valueCoder);
            }

            public Iterable<V> get(K key, W window) {
                this.initStateInternals(key);
                StateNamespace namespace = StateNamespaces.window(this.windowCoder, window);
                BagState bagState = (BagState)this.stateInternals.state(namespace, this.stateTag);
                return bagState.read();
            }

            public void append(K key, W window, Iterator<V> values) {
                this.initStateInternals(key);
                StateNamespace namespace = StateNamespaces.window(this.windowCoder, window);
                BagState bagState = (BagState)this.stateInternals.state(namespace, this.stateTag);
                while (values.hasNext()) {
                    bagState.add(values.next());
                }
            }

            public void clear(K key, W window) {
                this.initStateInternals(key);
                StateNamespace namespace = StateNamespaces.window(this.windowCoder, window);
                BagState bagState = (BagState)this.stateInternals.state(namespace, this.stateTag);
                bagState.clear();
            }

            private void initStateInternals(K key) {
                if (this.stateInternals == null) {
                    this.stateInternals = InMemoryStateInternals.forKey(key);
                }
            }

            void reset() {
                this.stateInternals = null;
            }
        }
    }

    private static class TimerReceiverFactory
    implements OutputReceiverFactory {
        private final StageBundleFactory stageBundleFactory;
        private final HashMap<String, ProcessBundleDescriptors.TimerSpec> timerOutputIdToSpecMap;
        private final Map<String, Map<String, ProcessBundleDescriptors.TimerSpec>> timerSpecMap;
        private final BiConsumer<WindowedValue, TimerInternals.TimerData> timerDataConsumer;
        private final Coder windowCoder;

        TimerReceiverFactory(StageBundleFactory stageBundleFactory, Collection<TimerReference> timerReferenceCollection, Map<String, Map<String, ProcessBundleDescriptors.TimerSpec>> timerSpecMap, BiConsumer<WindowedValue, TimerInternals.TimerData> timerDataConsumer, Coder windowCoder) {
            this.stageBundleFactory = stageBundleFactory;
            this.timerOutputIdToSpecMap = new HashMap();
            for (Map transformTimerMap : stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values()) {
                for (ProcessBundleDescriptors.TimerSpec timerSpec : transformTimerMap.values()) {
                    this.timerOutputIdToSpecMap.put(timerSpec.outputCollectionId(), timerSpec);
                }
            }
            this.timerSpecMap = timerSpecMap;
            this.timerDataConsumer = timerDataConsumer;
            this.windowCoder = windowCoder;
        }

        public <OutputT> FnDataReceiver<OutputT> create(String pCollectionId) {
            ProcessBundleDescriptors.TimerSpec timerSpec = this.timerOutputIdToSpecMap.get(pCollectionId);
            return receivedElement -> {
                WindowedValue windowedValue = (WindowedValue)receivedElement;
                Timer timer = (Timer)Preconditions.checkNotNull((Object)((Timer)((KV)windowedValue.getValue()).getValue()), (String)"Received null Timer from SDK harness: %s", (Object[])new Object[]{receivedElement});
                LOG.debug("Timer received: {} {}", (Object)pCollectionId, (Object)timer);
                for (Object window : windowedValue.getWindows()) {
                    StateNamespace namespace = StateNamespaces.window((Coder)this.windowCoder, (BoundedWindow)((BoundedWindow)window));
                    TimeDomain timeDomain = timerSpec.getTimerSpec().getTimeDomain();
                    String timerId = timerSpec.inputCollectionId();
                    TimerInternals.TimerData timerData = TimerInternals.TimerData.of((String)timerId, (StateNamespace)namespace, (Instant)timer.getTimestamp(), (TimeDomain)timeDomain);
                    this.timerDataConsumer.accept(windowedValue, timerData);
                }
            };
        }
    }

    private static class ReceiverFactory
    implements OutputReceiverFactory {
        private final Object collectorLock = new Object();
        @GuardedBy(value="collectorLock")
        private final Collector<RawUnionValue> collector;
        private final Map<String, Integer> outputMap;
        @Nullable
        private final TimerReceiverFactory timerReceiverFactory;

        ReceiverFactory(Collector<RawUnionValue> collector, Map<String, Integer> outputMap) {
            this(collector, outputMap, null);
        }

        ReceiverFactory(Collector<RawUnionValue> collector, Map<String, Integer> outputMap, @Nullable TimerReceiverFactory timerReceiverFactory) {
            this.collector = collector;
            this.outputMap = outputMap;
            this.timerReceiverFactory = timerReceiverFactory;
        }

        public <OutputT> FnDataReceiver<OutputT> create(String collectionId) {
            Integer unionTag = this.outputMap.get(collectionId);
            if (unionTag != null) {
                int tagInt = unionTag;
                return receivedElement -> {
                    Object object = this.collectorLock;
                    synchronized (object) {
                        this.collector.collect((Object)new RawUnionValue(tagInt, receivedElement));
                    }
                };
            }
            if (this.timerReceiverFactory != null) {
                return this.timerReceiverFactory.create(collectionId);
            }
            throw new IllegalStateException(String.format(Locale.ENGLISH, "Unknown PCollectionId %s", collectionId));
        }
    }
}

