/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.schemas.io;

import java.io.ByteArrayOutputStream;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.schemas.io.Failure;
import org.apache.beam.sdk.schemas.io.GenericDlq;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.WithFailures;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;

@Internal
@Experimental(value=Experimental.Kind.SCHEMAS)
public class DeadLetteredTransform<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
    private final SimpleFunction<InputT, OutputT> transform;
    private final PTransform<PCollection<Failure>, PDone> deadLetter;

    public DeadLetteredTransform(SimpleFunction<InputT, OutputT> transform, String deadLetterConfig) {
        this(transform, GenericDlq.getDlqTransform(deadLetterConfig));
    }

    @VisibleForTesting
    DeadLetteredTransform(SimpleFunction<InputT, OutputT> transform, PTransform<PCollection<Failure>, PDone> deadLetter) {
        this.transform = transform;
        this.deadLetter = deadLetter;
    }

    private <RealInputT extends InputT> PCollection<OutputT> expandInternal(PCollection<RealInputT> input) {
        Coder coder = input.getCoder();
        SerializableFunction<Object, Object> localTransform = this.transform::apply;
        MapElements.MapWithFailures<Object, Object, Failure> mapWithFailures = MapElements.into(this.transform.getOutputTypeDescriptor()).via(localTransform).exceptionsInto(TypeDescriptor.of(Failure.class)).exceptionsVia(x -> {
            try (ByteArrayOutputStream os = new ByteArrayOutputStream();){
                coder.encode(x.element(), os);
                Failure failure = Failure.newBuilder().setPayload(os.toByteArray()).setError(String.format("%s%n%n%s", x.exception().getMessage(), ExceptionUtils.getStackTrace(x.exception()))).build();
                return failure;
            }
        });
        WithFailures.Result<PCollection<Object>, Failure> result = mapWithFailures.expand(input);
        result.failures().apply(this.deadLetter);
        return result.output();
    }

    @Override
    public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
        return this.expandInternal(input);
    }
}

