/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.BeamFnTimerClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.StateBackedIterable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataReadRunner<OutputT> {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
    private final String pTransformId;
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final FnDataReceiver<WindowedValue<OutputT>> consumer;
    private final Supplier<String> processBundleInstructionIdSupplier;
    private final BeamFnDataClient beamFnDataClient;
    private final Coder<WindowedValue<OutputT>> coder;
    private final Object splittingLock = new Object();
    private long index;
    private long stopIndex;
    private InboundDataClient readFuture;

    BeamFnDataReadRunner(String pTransformId, RunnerApi.PTransform grpcReadNode, final Supplier<String> processBundleInstructionIdSupplier, Map<String, RunnerApi.Coder> coders, BeamFnDataClient beamFnDataClient, final BeamFnStateClient beamFnStateClient, Consumer<PTransformRunnerFactory.ProgressRequestCallback> addProgressRequestCallback, FnDataReceiver<WindowedValue<OutputT>> consumer) throws IOException {
        this.pTransformId = pTransformId;
        BeamFnApi.RemoteGrpcPort port = RemoteGrpcPortRead.fromPTransform(grpcReadNode).getPort();
        this.apiServiceDescriptor = port.getApiServiceDescriptor();
        this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
        this.beamFnDataClient = beamFnDataClient;
        this.consumer = consumer;
        RehydratedComponents components = RehydratedComponents.forComponents(RunnerApi.Components.newBuilder().putAllCoders(coders).build());
        this.coder = CoderTranslation.fromProto(coders.get(port.getCoderId()), components, new StateBackedIterable.StateBackedIterableTranslationContext(){

            @Override
            public BeamFnStateClient getStateClient() {
                return beamFnStateClient;
            }

            @Override
            public Supplier<String> getCurrentInstructionId() {
                return processBundleInstructionIdSupplier;
            }
        });
        addProgressRequestCallback.accept(() -> {
            Object object = this.splittingLock;
            synchronized (object) {
                return ImmutableList.of(new SimpleMonitoringInfoBuilder().setUrn(MonitoringInfoConstants.Urns.DATA_CHANNEL_READ_INDEX).setLabel("PTRANSFORM", pTransformId).setInt64SumValue(this.index).build());
            }
        });
        this.clearSplitIndices();
    }

    public void registerInputLocation() {
        this.readFuture = this.beamFnDataClient.receive(this.apiServiceDescriptor, LogicalEndpoint.data(this.processBundleInstructionIdSupplier.get(), this.pTransformId), this.coder, this::forwardElementToConsumer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void forwardElementToConsumer(WindowedValue<OutputT> element) throws Exception {
        Object object = this.splittingLock;
        synchronized (object) {
            if (this.index == this.stopIndex - 1L) {
                return;
            }
            ++this.index;
        }
        this.consumer.accept(element);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void trySplit(BeamFnApi.ProcessBundleSplitRequest request, BeamFnApi.ProcessBundleSplitResponse.Builder response) {
        BeamFnApi.ProcessBundleSplitRequest.DesiredSplit desiredSplit = request.getDesiredSplitsMap().get(this.pTransformId);
        if (desiredSplit == null) {
            return;
        }
        long totalBufferSize = desiredSplit.getEstimatedInputElements();
        ArrayList<Long> allowedSplitPoints = new ArrayList<Long>(desiredSplit.getAllowedSplitPointsList());
        HandlesSplits splittingConsumer = null;
        if (this.consumer instanceof HandlesSplits) {
            splittingConsumer = (HandlesSplits)((Object)this.consumer);
        }
        Object object = this.splittingLock;
        synchronized (object) {
            long newStopIndex;
            double keepOfElementRemainder;
            if (this.index == this.stopIndex) {
                return;
            }
            if (!request.getInstructionId().equals(this.processBundleInstructionIdSupplier.get())) {
                return;
            }
            if (totalBufferSize < this.index + 1L) {
                totalBufferSize = this.index + 1L;
            } else if (totalBufferSize > this.stopIndex) {
                totalBufferSize = this.stopIndex;
            }
            double currentElementProgress = 1.0;
            if (this.index >= 0L) {
                currentElementProgress = splittingConsumer != null ? splittingConsumer.getProgress() : 0.5;
            }
            double remainder = (double)(totalBufferSize - this.index) - currentElementProgress;
            double keep = remainder * desiredSplit.getFractionOfRemainder();
            if (currentElementProgress < 1.0 && (keepOfElementRemainder = keep / (1.0 - currentElementProgress)) < 1.0 && this.isValidSplitPoint(allowedSplitPoints, this.index) && this.isValidSplitPoint(allowedSplitPoints, this.index + 1L)) {
                HandlesSplits.SplitResult splitResult;
                HandlesSplits.SplitResult splitResult2 = splitResult = splittingConsumer != null ? splittingConsumer.trySplit(keepOfElementRemainder) : null;
                if (splitResult != null) {
                    this.stopIndex = this.index + 1L;
                    response.addAllPrimaryRoots(splitResult.getPrimaryRoots()).addAllResidualRoots(splitResult.getResidualRoots()).addChannelSplitsBuilder().setLastPrimaryElement(this.index - 1L).setFirstResidualElement(this.stopIndex);
                    return;
                }
            }
            if (!this.isValidSplitPoint(allowedSplitPoints, newStopIndex = this.index + Math.max(1L, Math.round(currentElementProgress + keep)))) {
                Collections.sort(allowedSplitPoints);
                int closestSplitPointIndex = -(Collections.binarySearch(allowedSplitPoints, newStopIndex) + 1);
                if (closestSplitPointIndex == 0) {
                    newStopIndex = (Long)allowedSplitPoints.get(0);
                } else if (closestSplitPointIndex == allowedSplitPoints.size()) {
                    newStopIndex = (Long)allowedSplitPoints.get(closestSplitPointIndex - 1);
                } else {
                    long prevPoint = (Long)allowedSplitPoints.get(closestSplitPointIndex - 1);
                    long nextPoint = (Long)allowedSplitPoints.get(closestSplitPointIndex);
                    newStopIndex = this.index < prevPoint && newStopIndex - prevPoint < nextPoint - newStopIndex ? prevPoint : nextPoint;
                }
            }
            if (newStopIndex < this.stopIndex && newStopIndex > this.index) {
                this.stopIndex = newStopIndex;
                response.addChannelSplitsBuilder().setLastPrimaryElement(this.stopIndex - 1L).setFirstResidualElement(this.stopIndex);
                return;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void blockTillReadFinishes() throws Exception {
        LOG.debug("Waiting for process bundle instruction {} and transform {} to close.", (Object)this.processBundleInstructionIdSupplier.get(), (Object)this.pTransformId);
        this.readFuture.awaitCompletion();
        Object object = this.splittingLock;
        synchronized (object) {
            ++this.index;
            this.stopIndex = this.index;
        }
    }

    public void reset() {
        Preconditions.checkArgument(this.processBundleInstructionIdSupplier.get() == null, "Cannot reset an active bundle processor.");
        this.clearSplitIndices();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearSplitIndices() {
        Object object = this.splittingLock;
        synchronized (object) {
            this.index = -1L;
            this.stopIndex = Long.MAX_VALUE;
        }
    }

    private boolean isValidSplitPoint(List<Long> allowedSplitPoints, long index) {
        return allowedSplitPoints.isEmpty() || allowedSplitPoints.contains(index);
    }

    static class Factory<OutputT>
    implements PTransformRunnerFactory<BeamFnDataReadRunner<OutputT>> {
        Factory() {
        }

        @Override
        public BeamFnDataReadRunner<OutputT> createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, BeamFnTimerClient beamFnTimerClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, Map<String, RunnerApi.PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry startFunctionRegistry, PTransformFunctionRegistry finishFunctionRegistry, Consumer<ThrowingRunnable> addResetFunction, Consumer<ThrowingRunnable> tearDownFunctions, Consumer<PTransformRunnerFactory.ProgressRequestCallback> addProgressRequestCallback, BundleSplitListener splitListener, DoFn.BundleFinalizer bundleFinalizer) throws IOException {
            FnDataReceiver consumer = pCollectionConsumerRegistry.getMultiplexingConsumer(Iterables.getOnlyElement(pTransform.getOutputsMap().values()));
            BeamFnDataReadRunner runner = new BeamFnDataReadRunner(pTransformId, pTransform, processBundleInstructionId, coders, beamFnDataClient, beamFnStateClient, addProgressRequestCallback, consumer);
            startFunctionRegistry.register(pTransformId, runner::registerInputLocation);
            finishFunctionRegistry.register(pTransformId, runner::blockTillReadFinishes);
            addResetFunction.accept(runner::reset);
            return runner;
        }
    }

    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of("beam:runner:source:v1", new Factory());
        }
    }
}

