/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.portable;

import java.util.ArrayList;
import java.util.Collection;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.splittabledofn.SDFFeederViaStateAndTimers;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.wire.WireCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.runners.direct.portable.BundleFactory;
import org.apache.beam.runners.direct.portable.BundleFactoryOutputReceiverFactory;
import org.apache.beam.runners.direct.portable.CommittedBundle;
import org.apache.beam.runners.direct.portable.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.runners.direct.portable.DirectTimerInternals;
import org.apache.beam.runners.direct.portable.StepStateAndTimers;
import org.apache.beam.runners.direct.portable.StepTransformResult;
import org.apache.beam.runners.direct.portable.TransformEvaluator;
import org.apache.beam.runners.direct.portable.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.portable.TransformResult;
import org.apache.beam.runners.direct.portable.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;

class SplittableRemoteStageEvaluatorFactory
implements TransformEvaluatorFactory {
    public static final String URN = "beam:directrunner:transforms:splittable_remote_stage:v1";
    public static final String FEED_SDF_URN = "beam:directrunner:transforms:feed_sdf:v1";
    private final BundleFactory bundleFactory;
    private final JobBundleFactory jobBundleFactory;
    private final StepStateAndTimers.Provider stp;

    SplittableRemoteStageEvaluatorFactory(BundleFactory bundleFactory, JobBundleFactory jobBundleFactory, StepStateAndTimers.Provider stepStateAndTimers) {
        this.bundleFactory = bundleFactory;
        this.jobBundleFactory = jobBundleFactory;
        this.stp = stepStateAndTimers;
    }

    @Override
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(PipelineNode.PTransformNode application, CommittedBundle<?> inputBundle) throws Exception {
        return new SplittableRemoteStageEvaluator(this.bundleFactory, this.jobBundleFactory, this.stp.forStepAndKey(application, inputBundle.getKey()), application);
    }

    @Override
    public void cleanup() throws Exception {
        this.jobBundleFactory.close();
    }

    private static class SplittableRemoteStageEvaluator<InputT, RestrictionT>
    implements TransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> {
        private final PipelineNode.PTransformNode transform;
        private final ExecutableStage stage;
        private final CopyOnAccessInMemoryStateInternals<byte[]> stateInternals;
        private final DirectTimerInternals timerInternals;
        private final RemoteBundle bundle;
        private final FnDataReceiver<WindowedValue<?>> mainInput;
        private final Collection<UncommittedBundle<?>> outputs;
        private final SDFFeederViaStateAndTimers<InputT, RestrictionT> feeder;

        private SplittableRemoteStageEvaluator(BundleFactory bundleFactory, JobBundleFactory jobBundleFactory, StepStateAndTimers<byte[]> stp, PipelineNode.PTransformNode transform) throws Exception {
            this.stateInternals = stp.stateInternals();
            this.timerInternals = stp.timerInternals();
            this.transform = transform;
            this.stage = ExecutableStage.fromPayload(RunnerApi.ExecutableStagePayload.parseFrom((ByteString)transform.getTransform().getSpec().getPayload()));
            this.outputs = new ArrayList();
            WindowedValue.FullWindowedValueCoder windowedValueCoder = (WindowedValue.FullWindowedValueCoder)WireCoders.instantiateRunnerWireCoder(this.stage.getInputPCollection(), this.stage.getComponents());
            KvCoder kvCoder = (KvCoder)windowedValueCoder.getValueCoder();
            this.feeder = new SDFFeederViaStateAndTimers(this.stateInternals, this.timerInternals, kvCoder.getKeyCoder(), kvCoder.getValueCoder(), (Coder<BoundedWindow>)windowedValueCoder.getWindowCoder());
            this.bundle = jobBundleFactory.forStage(this.stage).getBundle(BundleFactoryOutputReceiverFactory.create(bundleFactory, this.stage.getComponents(), this.outputs::add), StateRequestHandler.unsupported(), new BundleProgressHandler(){

                @Override
                public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {
                }

                @Override
                public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
                }
            });
            this.mainInput = (FnDataReceiver)Iterables.getOnlyElement(this.bundle.getInputReceivers().values());
        }

        @Override
        public void processElement(WindowedValue<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> windowedWorkItem) throws Exception {
            KeyedWorkItem kwi = (KeyedWorkItem)windowedWorkItem.getValue();
            WindowedValue<KV<InputT, RestrictionT>> elementRestriction = (WindowedValue<KV<InputT, RestrictionT>>)Iterables.getOnlyElement(kwi.elementsIterable(), null);
            if (elementRestriction != null) {
                this.feeder.seed(elementRestriction);
            } else {
                elementRestriction = this.feeder.resume((TimerInternals.TimerData)Iterables.getOnlyElement(kwi.timersIterable()));
            }
            this.mainInput.accept(elementRestriction);
        }

        @Override
        public TransformResult<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> finishBundle() throws Exception {
            this.bundle.close();
            this.feeder.commit();
            CopyOnAccessInMemoryStateInternals<byte[]> state = this.stateInternals.commit();
            StepTransformResult.Builder result = StepTransformResult.withHold(this.transform, state.getEarliestWatermarkHold());
            return result.addOutput(this.outputs).withState(state).withTimerUpdate(this.timerInternals.getTimerUpdate()).build();
        }
    }
}

