/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import org.apache.beam.fn.harness.FlattenRunner;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.PTransformRunnerFactoryTestContext;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class FlattenRunnerTest {
    @Test
    public void testCreatingAndProcessingDoFlatten() throws Exception {
        String pTransformId = "pTransformId";
        String mainOutputId = "101";
        RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:flatten:v1").build();
        RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder().setSpec(functionSpec).putInputs("inputA", "inputATarget").putInputs("inputB", "inputBTarget").putInputs("inputC", "inputCTarget").putOutputs(mainOutputId, "mainOutputTarget").build();
        HashMap<String, RunnerApi.PCollection> pCollectionMap = new HashMap<String, RunnerApi.PCollection>();
        pCollectionMap.put("inputATarget", RunnerApi.PCollection.newBuilder().setUniqueName("inputATarget").setCoderId("coder-id").build());
        pCollectionMap.put("inputBTarget", RunnerApi.PCollection.newBuilder().setUniqueName("inputBTarget").setCoderId("coder-id").build());
        pCollectionMap.put("inputCTarget", RunnerApi.PCollection.newBuilder().setUniqueName("inputCTarget").setCoderId("coder-id").build());
        RunnerApi.Coder coder = CoderTranslation.toProto((Coder)StringUtf8Coder.of()).getCoder();
        PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(pTransformId, pTransform).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllPcollections(pCollectionMap).putAllCoders(Collections.singletonMap("coder-id", coder)).build()).build();
        ArrayList mainOutputValues = new ArrayList();
        context.addPCollectionConsumer("mainOutputTarget", mainOutputValues::add);
        new FlattenRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
        mainOutputValues.clear();
        MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder("inputATarget", "inputBTarget", "inputCTarget", "mainOutputTarget"));
        context.getPCollectionConsumer("inputATarget").accept((Object)WindowedValue.valueInGlobalWindow((Object)"A1"));
        context.getPCollectionConsumer("inputATarget").accept((Object)WindowedValue.valueInGlobalWindow((Object)"A2"));
        context.getPCollectionConsumer("inputBTarget").accept((Object)WindowedValue.valueInGlobalWindow((Object)"B"));
        context.getPCollectionConsumer("inputCTarget").accept((Object)WindowedValue.valueInGlobalWindow((Object)"C"));
        MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"A1"), WindowedValue.valueInGlobalWindow((Object)"A2"), WindowedValue.valueInGlobalWindow((Object)"B"), WindowedValue.valueInGlobalWindow((Object)"C")));
        mainOutputValues.clear();
    }

    @Test
    public void testFlattenWithDuplicateInputCollectionProducesMultipleOutputs() throws Exception {
        String pTransformId = "pTransformId";
        String mainOutputId = "101";
        RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:flatten:v1").build();
        RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder().setSpec(functionSpec).putInputs("inputA", "inputATarget").putInputs("inputAAgain", "inputATarget").putOutputs(mainOutputId, "mainOutputTarget").build();
        RunnerApi.PCollection pCollection = RunnerApi.PCollection.newBuilder().setUniqueName("inputATarget").setCoderId("coder-id").build();
        RunnerApi.Coder coder = CoderTranslation.toProto((Coder)StringUtf8Coder.of()).getCoder();
        PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(pTransformId, pTransform).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllPcollections(Collections.singletonMap("inputATarget", pCollection)).putAllCoders(Collections.singletonMap("coder-id", coder)).build()).build();
        ArrayList mainOutputValues = new ArrayList();
        context.addPCollectionConsumer("mainOutputTarget", mainOutputValues::add);
        new FlattenRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
        mainOutputValues.clear();
        MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder("inputATarget", "mainOutputTarget"));
        MatcherAssert.assertThat(context.getPCollectionConsumers().get("inputATarget"), Matchers.hasSize(2));
        FnDataReceiver input = context.getPCollectionConsumer("inputATarget");
        input.accept((Object)WindowedValue.valueInGlobalWindow((Object)"A1"));
        input.accept((Object)WindowedValue.valueInGlobalWindow((Object)"A2"));
        MatcherAssert.assertThat(mainOutputValues, Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow((Object)"A1"), WindowedValue.valueInGlobalWindow((Object)"A1"), WindowedValue.valueInGlobalWindow((Object)"A2"), WindowedValue.valueInGlobalWindow((Object)"A2")));
    }
}

