/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.core.construction;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers;
import org.apache.beam.repackaged.direct_java.runners.core.construction.ReadTranslation;
import org.apache.beam.repackaged.direct_java.runners.core.construction.ReplacementOutputs;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;

public class JavaReadViaImpulse {
    private static final long DEFAULT_BUNDLE_SIZE_BYTES = 0x4000000L;

    public static <T> PTransform<PBegin, PCollection<T>> bounded(BoundedSource<T> source) {
        return new BoundedReadViaImpulse(source);
    }

    public static PTransformOverride boundedOverride() {
        return PTransformOverride.of((PTransformMatcher)JavaReadViaImpulse.boundedMatcher(), new BoundedOverrideFactory());
    }

    private static PTransformMatcher boundedMatcher() {
        return PTransformMatchers.urnEqualTo("beam:transform:read:v1").and(transform -> ReadTranslation.sourceIsBounded(transform) == PCollection.IsBounded.BOUNDED);
    }

    @VisibleForTesting
    static class BoundedSourceCoder<T>
    extends CustomCoder<BoundedSource<T>> {
        private final Coder<BoundedSource<T>> coder = SerializableCoder.of(BoundedSource.class);

        BoundedSourceCoder() {
        }

        public void encode(BoundedSource<T> value, OutputStream outStream) throws CoderException, IOException {
            this.coder.encode(value, outStream);
        }

        public BoundedSource<T> decode(InputStream inStream) throws CoderException, IOException {
            return (BoundedSource)this.coder.decode(inStream);
        }
    }

    @VisibleForTesting
    static class ReadFromBoundedSourceFn<T>
    extends DoFn<BoundedSource<T>, T> {
        ReadFromBoundedSourceFn() {
        }

        @DoFn.ProcessElement
        public void readSource(DoFn.ProcessContext ctxt) throws IOException {
            try (BoundedSource.BoundedReader reader = ((BoundedSource)ctxt.element()).createReader(ctxt.getPipelineOptions());){
                boolean more = reader.start();
                while (more) {
                    ctxt.outputWithTimestamp(reader.getCurrent(), reader.getCurrentTimestamp());
                    more = reader.advance();
                }
            }
        }
    }

    @VisibleForTesting
    static class SplitBoundedSourceFn<T>
    extends DoFn<byte[], BoundedSource<T>> {
        private final BoundedSource<T> source;
        private final long bundleSize;

        public SplitBoundedSourceFn(BoundedSource<T> source, long bundleSize) {
            this.source = source;
            this.bundleSize = bundleSize;
        }

        @DoFn.ProcessElement
        public void splitSource(DoFn.ProcessContext ctxt) throws Exception {
            for (BoundedSource split : this.source.split(this.bundleSize, ctxt.getPipelineOptions())) {
                ctxt.output((Object)split);
            }
        }
    }

    private static class BoundedOverrideFactory<T>
    implements PTransformOverrideFactory<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> {
        private BoundedOverrideFactory() {
        }

        public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform) {
            BoundedSource<T> source;
            PBegin input = PBegin.in((Pipeline)transform.getPipeline());
            try {
                source = ReadTranslation.boundedSourceFromTransform(transform);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return PTransformOverrideFactory.PTransformReplacement.of((PInput)input, JavaReadViaImpulse.bounded(source));
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
            return ReplacementOutputs.singleton(outputs, newOutput);
        }
    }

    private static class BoundedReadViaImpulse<T>
    extends PTransform<PBegin, PCollection<T>> {
        private final BoundedSource<T> source;

        private BoundedReadViaImpulse(BoundedSource<T> source) {
            this.source = source;
        }

        public PCollection<T> expand(PBegin input) {
            return ((PCollection)((PCollection)((PCollection)((PCollection)input.apply((PTransform)Impulse.create())).apply((PTransform)ParDo.of(new SplitBoundedSourceFn<T>(this.source, 0x4000000L)))).setCoder(new BoundedSourceCoder()).apply((PTransform)Reshuffle.viaRandomKey())).apply((PTransform)ParDo.of(new ReadFromBoundedSourceFn()))).setCoder(this.source.getOutputCoder());
        }
    }
}

