/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.BeamFnDataWriteRunner;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.PTransformRunnerFactoryTestContext;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(value=JUnit4.class)
public class BeamFnDataWriteRunnerTest {
    private static final String ELEM_CODER_ID = "string-coder-id";
    private static final Coder<String> ELEM_CODER = StringUtf8Coder.of();
    private static final String WIRE_CODER_ID = "windowed-string-coder-id";
    private static final Coder<WindowedValue<String>> WIRE_CODER = WindowedValue.getFullCoder(ELEM_CODER, (Coder)GlobalWindow.Coder.INSTANCE);
    private static final RunnerApi.Coder WIRE_CODER_SPEC;
    private static final RunnerApi.Components COMPONENTS;
    private static final BeamFnApi.RemoteGrpcPort PORT_SPEC;
    private static final String TRANSFORM_ID = "1";
    @Mock
    private BeamFnDataClient mockBeamFnDataClient;

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
    }

    private BeamFnDataOutboundAggregator createRecordingAggregator(final Map<String, List<WindowedValue<String>>> output, final Supplier<String> bundleId) {
        PipelineOptions options = PipelineOptionsFactory.create();
        ((ExperimentalOptions)options.as(ExperimentalOptions.class)).setExperiments(Arrays.asList("data_buffer_size_limit=0"));
        return new BeamFnDataOutboundAggregator(options, bundleId, (StreamObserver)new StreamObserver<BeamFnApi.Elements>(){

            public void onNext(BeamFnApi.Elements elements) {
                for (BeamFnApi.Elements.Data data : elements.getDataList()) {
                    try {
                        ((List)output.get(bundleId.get())).add((WindowedValue)WIRE_CODER.decode(data.getData().newInput()));
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Failed to decode output.");
                    }
                }
            }

            public void onError(Throwable throwable) {
            }

            public void onCompleted() {
            }
        }, false);
    }

    @Test
    public void testReuseForMultipleBundles() throws Exception {
        AtomicReference<String> bundleId = new AtomicReference<String>("0");
        String localInputId = "inputPC";
        RunnerApi.PTransform pTransform = RemoteGrpcPortWrite.writeToPort((String)localInputId, (BeamFnApi.RemoteGrpcPort)PORT_SPEC).toPTransform();
        ArrayList output0 = new ArrayList();
        ArrayList output1 = new ArrayList();
        HashMap<Endpoints.ApiServiceDescriptor, BeamFnDataOutboundAggregator> aggregators = new HashMap<Endpoints.ApiServiceDescriptor, BeamFnDataOutboundAggregator>();
        BeamFnDataOutboundAggregator aggregator = this.createRecordingAggregator((Map<String, List<WindowedValue<String>>>)ImmutableMap.of((Object)"0", output0, (Object)TRANSFORM_ID, output1), bundleId::get);
        aggregators.put(PORT_SPEC.getApiServiceDescriptor(), aggregator);
        PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TRANSFORM_ID, pTransform).beamFnDataClient(this.mockBeamFnDataClient).processBundleInstructionIdSupplier(bundleId::get).outboundAggregators(aggregators).components(RunnerApi.Components.newBuilder().putAllPcollections((Map)ImmutableMap.of((Object)localInputId, (Object)RunnerApi.PCollection.newBuilder().setCoderId(ELEM_CODER_ID).build())).putAllCoders(COMPONENTS.getCodersMap()).putAllWindowingStrategies(COMPONENTS.getWindowingStrategiesMap()).build()).build();
        new BeamFnDataWriteRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
        MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(localInputId));
        FnDataReceiver pCollectionConsumer = context.getPCollectionConsumer(localInputId);
        pCollectionConsumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)"ABC"));
        pCollectionConsumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)"DEF"));
        MatcherAssert.assertThat(output0, Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"ABC"), WindowedValue.valueInGlobalWindow((Object)"DEF")));
        output0.clear();
        bundleId.set(TRANSFORM_ID);
        pCollectionConsumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)"GHI"));
        pCollectionConsumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)"JKL"));
        MatcherAssert.assertThat(output1, Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"GHI"), WindowedValue.valueInGlobalWindow((Object)"JKL")));
        Mockito.verifyNoMoreInteractions(this.mockBeamFnDataClient);
    }

    @Test
    public void testRegistration() {
        for (PTransformRunnerFactory.Registrar registrar : ServiceLoader.load(PTransformRunnerFactory.Registrar.class)) {
            if (!(registrar instanceof BeamFnDataWriteRunner.Registrar)) continue;
            MatcherAssert.assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey("beam:runner:sink:v1"));
            return;
        }
        Assert.fail("Expected registrar not found.");
    }

    static {
        PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).setCoderId(WIRE_CODER_ID).build();
        try {
            RunnerApi.MessageWithComponents coderAndComponents = CoderTranslation.toProto(WIRE_CODER);
            WIRE_CODER_SPEC = coderAndComponents.getCoder();
            COMPONENTS = coderAndComponents.getComponents().toBuilder().putCoders(WIRE_CODER_ID, WIRE_CODER_SPEC).putCoders(ELEM_CODER_ID, CoderTranslation.toProto(ELEM_CODER).getCoder()).build();
        }
        catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}

