/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.core.construction.graph;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Predicate;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.direct_java.runners.core.construction.ModelCoders;
import org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation;
import org.apache.beam.repackaged.direct_java.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;

public class SplittableParDoExpander {
    public static ProtoOverrides.TransformReplacement createSizedReplacement() {
        return SizedReplacement.INSTANCE;
    }

    private static String getOrAddDoubleCoder(RunnerApi.ComponentsOrBuilder existingComponents, RunnerApi.MessageWithComponents.Builder out) {
        for (Map.Entry coder : existingComponents.getCodersMap().entrySet()) {
            if (!ModelCoders.DOUBLE_CODER_URN.equals(((RunnerApi.Coder)coder.getValue()).getSpec().getUrn())) continue;
            return (String)coder.getKey();
        }
        String doubleCoderId = SplittableParDoExpander.generateUniqueId("DoubleCoder", arg_0 -> ((RunnerApi.ComponentsOrBuilder)existingComponents).containsCoders(arg_0));
        out.getComponentsBuilder().putCoders(doubleCoderId, RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.DOUBLE_CODER_URN)).build());
        return doubleCoderId;
    }

    private static String generateUniquePCollectonName(String prefix, RunnerApi.ComponentsOrBuilder existingComponents) {
        return SplittableParDoExpander.generateUniqueId(prefix, input -> {
            for (RunnerApi.PCollection pc : existingComponents.getPcollectionsMap().values()) {
                if (!input.equals(pc.getUniqueName())) continue;
                return true;
            }
            return false;
        });
    }

    private static String generateUniqueId(String prefix, Predicate<String> isExistingId) {
        int i = 0;
        while (isExistingId.test(prefix + i)) {
            ++i;
        }
        return prefix + i;
    }

    private static class SizedReplacement
    implements ProtoOverrides.TransformReplacement {
        private static final SizedReplacement INSTANCE = new SizedReplacement();

        private SizedReplacement() {
        }

        @Override
        public RunnerApi.MessageWithComponents getReplacement(String transformId, RunnerApi.ComponentsOrBuilder existingComponents) {
            try {
                RunnerApi.MessageWithComponents.Builder rval = RunnerApi.MessageWithComponents.newBuilder();
                RunnerApi.PTransform splittableParDo = existingComponents.getTransformsOrThrow(transformId);
                RunnerApi.ParDoPayload payload = RunnerApi.ParDoPayload.parseFrom((ByteString)splittableParDo.getSpec().getPayload());
                if (payload.getRestrictionCoderId() == null || payload.getRestrictionCoderId().isEmpty()) {
                    return null;
                }
                String mainInputName = ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)splittableParDo);
                String mainInputPCollectionId = splittableParDo.getInputsOrThrow(mainInputName);
                RunnerApi.PCollection mainInputPCollection = existingComponents.getPcollectionsOrThrow(mainInputPCollectionId);
                Map sideInputs = Maps.filterKeys((Map)splittableParDo.getInputsMap(), input -> payload.containsSideInputs(input));
                String pairWithRestrictionOutCoderId = SplittableParDoExpander.generateUniqueId(mainInputPCollection.getCoderId() + "/PairWithRestriction", arg_0 -> ((RunnerApi.ComponentsOrBuilder)existingComponents).containsCoders(arg_0));
                rval.getComponentsBuilder().putCoders(pairWithRestrictionOutCoderId, ModelCoders.kvCoder(mainInputPCollection.getCoderId(), payload.getRestrictionCoderId()));
                String pairWithRestrictionOutId = SplittableParDoExpander.generateUniqueId(mainInputPCollectionId + "/PairWithRestriction", arg_0 -> ((RunnerApi.ComponentsOrBuilder)existingComponents).containsPcollections(arg_0));
                rval.getComponentsBuilder().putPcollections(pairWithRestrictionOutId, RunnerApi.PCollection.newBuilder().setCoderId(pairWithRestrictionOutCoderId).setIsBounded(mainInputPCollection.getIsBounded()).setWindowingStrategyId(mainInputPCollection.getWindowingStrategyId()).setUniqueName(SplittableParDoExpander.generateUniquePCollectonName(mainInputPCollection.getUniqueName() + "/PairWithRestriction", existingComponents)).build());
                String splitAndSizeOutCoderId = SplittableParDoExpander.generateUniqueId(mainInputPCollection.getCoderId() + "/SplitAndSize", arg_0 -> ((RunnerApi.ComponentsOrBuilder)existingComponents).containsCoders(arg_0));
                rval.getComponentsBuilder().putCoders(splitAndSizeOutCoderId, ModelCoders.kvCoder(pairWithRestrictionOutCoderId, SplittableParDoExpander.getOrAddDoubleCoder(existingComponents, rval)));
                String splitAndSizeOutId = SplittableParDoExpander.generateUniqueId(mainInputPCollectionId + "/SplitAndSize", arg_0 -> ((RunnerApi.ComponentsOrBuilder)existingComponents).containsPcollections(arg_0));
                rval.getComponentsBuilder().putPcollections(splitAndSizeOutId, RunnerApi.PCollection.newBuilder().setCoderId(splitAndSizeOutCoderId).setIsBounded(mainInputPCollection.getIsBounded()).setWindowingStrategyId(mainInputPCollection.getWindowingStrategyId()).setUniqueName(SplittableParDoExpander.generateUniquePCollectonName(mainInputPCollection.getUniqueName() + "/SplitAndSize", existingComponents)).build());
                String pairWithRestrictionId = SplittableParDoExpander.generateUniqueId(transformId + "/PairWithRestriction", arg_0 -> ((RunnerApi.ComponentsOrBuilder)existingComponents).containsTransforms(arg_0));
                RunnerApi.PTransform.Builder pairWithRestriction = RunnerApi.PTransform.newBuilder();
                pairWithRestriction.putAllInputs(splittableParDo.getInputsMap());
                pairWithRestriction.putOutputs("out", pairWithRestrictionOutId);
                pairWithRestriction.setUniqueName(SplittableParDoExpander.generateUniquePCollectonName(splittableParDo.getUniqueName() + "/PairWithRestriction", existingComponents));
                pairWithRestriction.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:sdf_pair_with_restriction:v1").setPayload(splittableParDo.getSpec().getPayload()));
                pairWithRestriction.setEnvironmentId(splittableParDo.getEnvironmentId());
                rval.getComponentsBuilder().putTransforms(pairWithRestrictionId, pairWithRestriction.build());
                String splitAndSizeId = SplittableParDoExpander.generateUniqueId(transformId + "/SplitAndSize", arg_0 -> ((RunnerApi.ComponentsOrBuilder)existingComponents).containsTransforms(arg_0));
                RunnerApi.PTransform.Builder splitAndSize = RunnerApi.PTransform.newBuilder();
                splitAndSize.putInputs(mainInputName, pairWithRestrictionOutId);
                splitAndSize.putAllInputs(sideInputs);
                splitAndSize.putOutputs("out", splitAndSizeOutId);
                splitAndSize.setUniqueName(SplittableParDoExpander.generateUniquePCollectonName(splittableParDo.getUniqueName() + "/SplitAndSize", existingComponents));
                splitAndSize.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:sdf_split_and_size_restrictions:v1").setPayload(splittableParDo.getSpec().getPayload()));
                splitAndSize.setEnvironmentId(splittableParDo.getEnvironmentId());
                rval.getComponentsBuilder().putTransforms(splitAndSizeId, splitAndSize.build());
                String processSizedElementsAndRestrictionsId = SplittableParDoExpander.generateUniqueId(transformId + "/ProcessSizedElementsAndRestrictions", arg_0 -> ((RunnerApi.ComponentsOrBuilder)existingComponents).containsTransforms(arg_0));
                RunnerApi.PTransform.Builder processSizedElementsAndRestrictions = RunnerApi.PTransform.newBuilder();
                processSizedElementsAndRestrictions.putInputs(mainInputName, splitAndSizeOutId);
                processSizedElementsAndRestrictions.putAllInputs(sideInputs);
                processSizedElementsAndRestrictions.putAllOutputs(splittableParDo.getOutputsMap());
                processSizedElementsAndRestrictions.setUniqueName(SplittableParDoExpander.generateUniquePCollectonName(splittableParDo.getUniqueName() + "/ProcessSizedElementsAndRestrictions", existingComponents));
                processSizedElementsAndRestrictions.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:sdf_process_sized_element_and_restrictions:v1").setPayload(splittableParDo.getSpec().getPayload()));
                processSizedElementsAndRestrictions.setEnvironmentId(splittableParDo.getEnvironmentId());
                rval.getComponentsBuilder().putTransforms(processSizedElementsAndRestrictionsId, processSizedElementsAndRestrictions.build());
                RunnerApi.PTransform.Builder newCompositeRoot = splittableParDo.toBuilder().clearSpec().addAllSubtransforms(Arrays.asList(pairWithRestrictionId, splitAndSizeId, processSizedElementsAndRestrictionsId));
                rval.setPtransform(newCompositeRoot);
                return rval.build();
            }
            catch (IOException e) {
                throw new RuntimeException("Unable to perform expansion for transform " + transformId, e);
            }
        }
    }
}

