/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.cookbook;

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

public class JoinExamples {
    private static final @UnknownKeyFor @NonNull @Initialized String GDELT_EVENTS_TABLE = "apache-beam-testing.samples.gdelt_sample";
    private static final @UnknownKeyFor @NonNull @Initialized String COUNTRY_CODES = "gdelt-bq:full.crosswalk_geocountrycodetohuman";

    static @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized String> joinEvents(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized TableRow> eventsTable, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized TableRow> countryCodes) throws @UnknownKeyFor @NonNull @Initialized Exception {
        final TupleTag eventInfoTag = new TupleTag();
        final TupleTag countryInfoTag = new TupleTag();
        PCollection eventInfo = (PCollection)eventsTable.apply((PTransform)ParDo.of((DoFn)new ExtractEventDataFn()));
        PCollection countryInfo = (PCollection)countryCodes.apply((PTransform)ParDo.of((DoFn)new ExtractCountryInfoFn()));
        PCollection kvpCollection = (PCollection)KeyedPCollectionTuple.of((TupleTag)eventInfoTag, (PCollection)eventInfo).and(countryInfoTag, countryInfo).apply((PTransform)CoGroupByKey.create());
        PCollection finalResultCollection = (PCollection)kvpCollection.apply("Process", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, CoGbkResult>, KV<String, String>>(){

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) {
                KV e = (KV)c.element();
                String countryCode = (String)e.getKey();
                String countryName = (String)((CoGbkResult)e.getValue()).getOnly(countryInfoTag);
                for (String eventInfo : ((CoGbkResult)((KV)c.element()).getValue()).getAll(eventInfoTag)) {
                    c.output((Object)KV.of((Object)countryCode, (Object)("Country name: " + countryName + ", Event info: " + eventInfo)));
                }
            }
        }));
        PCollection formattedResults = (PCollection)finalResultCollection.apply("Format", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, String>, String>(){

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) {
                String outputstring = "Country code: " + (String)((KV)c.element()).getKey() + ", " + (String)((KV)c.element()).getValue();
                c.output((Object)outputstring);
            }
        }));
        return formattedResults;
    }

    public static void main(@UnknownKeyFor @NonNull @Initialized String @UnknownKeyFor @NonNull @Initialized [] args) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Options options = (Options)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(Options.class);
        JoinExamples.runJoinExamples(options);
    }

    static void runJoinExamples(@UnknownKeyFor @NonNull @Initialized Options options) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Pipeline p = Pipeline.create((PipelineOptions)options);
        PCollection eventsTable = (PCollection)p.apply("Read BQ Events", (PTransform)BigQueryIO.readTableRows().from(GDELT_EVENTS_TABLE));
        PCollection countryCodes = (PCollection)p.apply("Read BQ Country Codes", (PTransform)BigQueryIO.readTableRows().from(COUNTRY_CODES));
        PCollection<String> formattedResults = JoinExamples.joinEvents((PCollection<TableRow>)eventsTable, (PCollection<TableRow>)countryCodes);
        formattedResults.apply((PTransform)TextIO.write().to(options.getOutput()));
        p.run().waitUntilFinish();
    }

    public static interface Options
    extends PipelineOptions {
        @Description(value="Path of the file to write to")
        @Validation.Required
        public @UnknownKeyFor @NonNull @Initialized String getOutput();

        public void setOutput(@UnknownKeyFor @NonNull @Initialized String var1);
    }

    static class ExtractCountryInfoFn
    extends DoFn<TableRow, KV<String, String>> {
        ExtractCountryInfoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) {
            TableRow row = (TableRow)c.element();
            String countryCode = (String)row.get((Object)"FIPSCC");
            String countryName = (String)row.get((Object)"HumanName");
            c.output((Object)KV.of((Object)countryCode, (Object)countryName));
        }
    }

    static class ExtractEventDataFn
    extends DoFn<TableRow, KV<String, String>> {
        ExtractEventDataFn() {
        }

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) {
            TableRow row = (TableRow)c.element();
            String countryCode = (String)row.get((Object)"ActionGeo_CountryCode");
            String sqlDate = (String)row.get((Object)"SQLDATE");
            String actor1Name = (String)row.get((Object)"Actor1Name");
            String sourceUrl = (String)row.get((Object)"SOURCEURL");
            String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl;
            c.output((Object)KV.of((Object)countryCode, (Object)eventInfo));
        }
    }
}

