/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction;

import com.google.auto.service.AutoService;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class TestStreamTranslation {
    static <T> RunnerApi.TestStreamPayload testStreamToPayload(TestStream<T> transform, SdkComponents components) throws IOException {
        String coderId = components.registerCoder(transform.getValueCoder());
        RunnerApi.TestStreamPayload.Builder builder = RunnerApi.TestStreamPayload.newBuilder().setCoderId(coderId);
        for (TestStream.Event event : transform.getEvents()) {
            builder.addEvents(TestStreamTranslation.toProto(event, transform.getValueCoder()));
        }
        return builder.build();
    }

    private static TestStream<?> fromProto(RunnerApi.TestStreamPayload testStreamPayload, RunnerApi.Components components) throws IOException {
        Coder<?> coder = CoderTranslation.fromProto(components.getCodersOrThrow(testStreamPayload.getCoderId()), components);
        ArrayList events = new ArrayList();
        for (RunnerApi.TestStreamPayload.Event event : testStreamPayload.getEventsList()) {
            events.add(TestStreamTranslation.fromProto(event, coder));
        }
        return TestStream.fromRawEvents(coder, events);
    }

    public static <T> TestStream<T> getTestStream(AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> application) throws IOException {
        SdkComponents sdkComponents = SdkComponents.create();
        RunnerApi.PTransform transformProto = PTransformTranslation.toProto(application, sdkComponents);
        Preconditions.checkArgument("urn:beam:transform:teststream:v1".equals(transformProto.getSpec().getUrn()), "Attempt to get %s from a transform with wrong URN %s", (Object)TestStream.class.getSimpleName(), (Object)transformProto.getSpec().getUrn());
        RunnerApi.TestStreamPayload testStreamPayload = (RunnerApi.TestStreamPayload)transformProto.getSpec().getParameter().unpack(RunnerApi.TestStreamPayload.class);
        return TestStreamTranslation.fromProto(testStreamPayload, sdkComponents.toComponents());
    }

    static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> event, Coder<T> coder) throws IOException {
        switch (event.getType()) {
            case WATERMARK: {
                return RunnerApi.TestStreamPayload.Event.newBuilder().setWatermarkEvent(RunnerApi.TestStreamPayload.Event.AdvanceWatermark.newBuilder().setNewWatermark(((TestStream.WatermarkEvent)event).getWatermark().getMillis())).build();
            }
            case PROCESSING_TIME: {
                return RunnerApi.TestStreamPayload.Event.newBuilder().setProcessingTimeEvent(RunnerApi.TestStreamPayload.Event.AdvanceProcessingTime.newBuilder().setAdvanceDuration(((TestStream.ProcessingTimeEvent)event).getProcessingTimeAdvance().getMillis())).build();
            }
            case ELEMENT: {
                RunnerApi.TestStreamPayload.Event.AddElements.Builder builder = RunnerApi.TestStreamPayload.Event.AddElements.newBuilder();
                for (TimestampedValue element : ((TestStream.ElementEvent)event).getElements()) {
                    builder.addElements(RunnerApi.TestStreamPayload.TimestampedElement.newBuilder().setTimestamp(element.getTimestamp().getMillis()).setEncodedElement(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray(coder, (Object)element.getValue()))));
                }
                return RunnerApi.TestStreamPayload.Event.newBuilder().setElementEvent(builder).build();
            }
        }
        throw new IllegalArgumentException(String.format("Unsupported type of %s: %s", TestStream.Event.class.getCanonicalName(), event.getType()));
    }

    static <T> TestStream.Event<T> fromProto(RunnerApi.TestStreamPayload.Event protoEvent, Coder<T> coder) throws IOException {
        switch (protoEvent.getEventCase()) {
            case WATERMARK_EVENT: {
                return TestStream.WatermarkEvent.advanceTo((Instant)new Instant(protoEvent.getWatermarkEvent().getNewWatermark()));
            }
            case PROCESSING_TIME_EVENT: {
                return TestStream.ProcessingTimeEvent.advanceBy((Duration)Duration.millis((long)protoEvent.getProcessingTimeEvent().getAdvanceDuration()));
            }
            case ELEMENT_EVENT: {
                ArrayList<TimestampedValue> decodedElements = new ArrayList<TimestampedValue>();
                for (RunnerApi.TestStreamPayload.TimestampedElement element : protoEvent.getElementEvent().getElementsList()) {
                    decodedElements.add(TimestampedValue.of((Object)CoderUtils.decodeFromByteArray(coder, (byte[])element.getEncodedElement().toByteArray()), (Instant)new Instant(element.getTimestamp())));
                }
                return TestStream.ElementEvent.add(decodedElements);
            }
        }
        throw new IllegalArgumentException(String.format("Unsupported type of %s: %s", RunnerApi.TestStreamPayload.Event.class.getCanonicalName(), protoEvent.getEventCase()));
    }

    @AutoService(value=TransformPayloadTranslatorRegistrar.class)
    public static class Registrar
    implements TransformPayloadTranslatorRegistrar {
        @Override
        public Map<? extends Class<? extends PTransform>, ? extends PTransformTranslation.TransformPayloadTranslator> getTransformPayloadTranslators() {
            return Collections.singletonMap(TestStream.class, new TestStreamTranslator());
        }
    }

    static class TestStreamTranslator
    implements PTransformTranslation.TransformPayloadTranslator<TestStream<?>> {
        TestStreamTranslator() {
        }

        @Override
        public String getUrn(TestStream<?> transform) {
            return "urn:beam:transform:teststream:v1";
        }

        @Override
        public RunnerApi.FunctionSpec translate(AppliedPTransform<?, ?, TestStream<?>> transform, SdkComponents components) throws IOException {
            return RunnerApi.FunctionSpec.newBuilder().setUrn(this.getUrn((TestStream)transform.getTransform())).setParameter(Any.pack((Message)TestStreamTranslation.testStreamToPayload((TestStream)transform.getTransform(), components))).build();
        }
    }
}

