/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.BeamFnTimerClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.StateBackedIterable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataWriteRunner<InputT> {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataWriteRunner.class);
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final String pTransformId;
    private final Coder<WindowedValue<InputT>> coder;
    private final BeamFnDataClient beamFnDataClientFactory;
    private final Supplier<String> processBundleInstructionIdSupplier;
    private CloseableFnDataReceiver<WindowedValue<InputT>> consumer;

    BeamFnDataWriteRunner(String pTransformId, RunnerApi.PTransform remoteWriteNode, final Supplier<String> processBundleInstructionIdSupplier, Map<String, RunnerApi.Coder> coders, BeamFnDataClient beamFnDataClientFactory, final BeamFnStateClient beamFnStateClient) throws IOException {
        this.pTransformId = pTransformId;
        BeamFnApi.RemoteGrpcPort port = RemoteGrpcPortWrite.fromPTransform(remoteWriteNode).getPort();
        this.apiServiceDescriptor = port.getApiServiceDescriptor();
        this.beamFnDataClientFactory = beamFnDataClientFactory;
        this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
        RehydratedComponents components = RehydratedComponents.forComponents(RunnerApi.Components.newBuilder().putAllCoders(coders).build());
        this.coder = CoderTranslation.fromProto(coders.get(port.getCoderId()), components, new StateBackedIterable.StateBackedIterableTranslationContext(){

            @Override
            public BeamFnStateClient getStateClient() {
                return beamFnStateClient;
            }

            @Override
            public Supplier<String> getCurrentInstructionId() {
                return processBundleInstructionIdSupplier;
            }
        });
    }

    public void registerForOutput() {
        this.consumer = this.beamFnDataClientFactory.send(this.apiServiceDescriptor, LogicalEndpoint.data(this.processBundleInstructionIdSupplier.get(), this.pTransformId), this.coder);
    }

    public void close() throws Exception {
        this.consumer.close();
    }

    public void consume(WindowedValue<InputT> value) throws Exception {
        this.consumer.accept(value);
    }

    static class Factory<InputT>
    implements PTransformRunnerFactory<BeamFnDataWriteRunner<InputT>> {
        Factory() {
        }

        @Override
        public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, BeamFnTimerClient beamFnTimerClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, Map<String, RunnerApi.PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry startFunctionRegistry, PTransformFunctionRegistry finishFunctionRegistry, Consumer<ThrowingRunnable> addResetFunction, Consumer<ThrowingRunnable> tearDownFunctions, Consumer<PTransformRunnerFactory.ProgressRequestCallback> addProgressRequestCallback, BundleSplitListener splitListener, DoFn.BundleFinalizer bundleFinalizer) throws IOException {
            BeamFnDataWriteRunner runner = new BeamFnDataWriteRunner(pTransformId, pTransform, processBundleInstructionId, coders, beamFnDataClient, beamFnStateClient);
            startFunctionRegistry.register(pTransformId, runner::registerForOutput);
            pCollectionConsumerRegistry.register(Iterables.getOnlyElement(pTransform.getInputsMap().values()), pTransformId, runner::consume, ((WindowedValue.WindowedValueCoder)runner.coder).getValueCoder());
            finishFunctionRegistry.register(pTransformId, runner::close);
            return runner;
        }
    }

    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of("beam:runner:sink:v1", new Factory());
        }
    }
}

