/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.multilanguage;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.python.PythonExternalTransform;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.PythonCallableSource;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

public class PythonDataframeWordCount {
    public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";

    static void runWordCount(WordCountOptions options) {
        Pipeline p = Pipeline.create((PipelineOptions)options);
        ((PCollection)((PCollection)((PCollection)((PCollection)p.apply("ReadLines", (PTransform)TextIO.read().from(options.getInputFile()))).apply((PTransform)ParDo.of((DoFn)new ExtractWordsFn()))).setRowSchema(ExtractWordsFn.SCHEMA).apply((PTransform)PythonExternalTransform.from((String)"apache_beam.dataframe.transforms.DataframeTransform", (String)options.getExpansionService()).withKwarg("func", (Object)PythonCallableSource.of((String)"lambda df: df.groupby('word').sum()")).withKwarg("include_indexes", (Object)true))).apply((PTransform)MapElements.via((SimpleFunction)new FormatAsTextFn()))).apply("WriteCounts", (PTransform)TextIO.write().to(options.getOutput()));
        p.run().waitUntilFinish();
    }

    public static void main(String[] args) {
        WordCountOptions options = (WordCountOptions)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(WordCountOptions.class);
        PythonDataframeWordCount.runWordCount(options);
    }

    public static interface WordCountOptions
    extends PipelineOptions {
        @Description(value="Path of the file to read from")
        @Default.String(value="gs://apache-beam-samples/shakespeare/kinglear.txt")
        public String getInputFile();

        public void setInputFile(String var1);

        @Description(value="Path of the file to write to")
        @Validation.Required
        public String getOutput();

        public void setOutput(String var1);

        @Description(value="URL of Python expansion service")
        public String getExpansionService();

        public void setExpansionService(String var1);
    }

    public static class FormatAsTextFn
    extends SimpleFunction<Row, String> {
        public String apply(Row input) {
            return input.getString("word") + ": " + input.getInt32("count");
        }
    }

    static class ExtractWordsFn
    extends DoFn<String, Row> {
        public static final Schema SCHEMA = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"word", (Schema.FieldType)Schema.FieldType.STRING), Schema.Field.of((String)"count", (Schema.FieldType)Schema.FieldType.INT32)});
        private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, (String)"emptyLines");
        private final Distribution lineLenDist = Metrics.distribution(ExtractWordsFn.class, (String)"lineLenDistro");

        ExtractWordsFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element String element, DoFn.OutputReceiver<Row> receiver) {
            String[] words;
            this.lineLenDist.update((long)element.length());
            if (element.trim().isEmpty()) {
                this.emptyLines.inc();
            }
            for (String word : words = element.split(PythonDataframeWordCount.TOKENIZER_PATTERN, -1)) {
                if (word.isEmpty()) continue;
                receiver.output((Object)Row.withSchema((Schema)SCHEMA).withFieldValue("word", (Object)word).withFieldValue("count", (Object)1).build());
            }
        }
    }
}

