/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.cookbook;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterEach;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class TriggerExample {
    public static final @UnknownKeyFor @NonNull @Initialized int WINDOW_DURATION = 30;
    public static final @UnknownKeyFor @NonNull @Initialized Duration ONE_MINUTE = Duration.standardMinutes((long)1L);
    public static final @UnknownKeyFor @NonNull @Initialized Duration FIVE_MINUTES = Duration.standardMinutes((long)5L);
    public static final @UnknownKeyFor @NonNull @Initialized Duration ONE_DAY = Duration.standardDays((long)1L);

    public static void main(@UnknownKeyFor @NonNull @Initialized String @UnknownKeyFor @NonNull @Initialized [] args) throws @UnknownKeyFor @NonNull @Initialized Exception {
        TrafficFlowOptions options = (TrafficFlowOptions)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(TrafficFlowOptions.class);
        TriggerExample.runTriggerExample(options);
    }

    static void runTriggerExample(@UnknownKeyFor @NonNull @Initialized TrafficFlowOptions options) throws @UnknownKeyFor @NonNull @Initialized IOException {
        options.setStreaming(true);
        ExampleUtils exampleUtils = new ExampleUtils(options);
        exampleUtils.setup();
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        TableReference tableRef = TriggerExample.getTableReference(options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable());
        PCollectionList resultList = (PCollectionList)((PCollection)((PCollection)((PCollection)pipeline.apply("ReadMyFile", (PTransform)TextIO.read().from(options.getInput()))).apply("InsertRandomDelays", (PTransform)ParDo.of((DoFn)new InsertDelays()))).apply((PTransform)ParDo.of((DoFn)new ExtractFlowInfo()))).apply((PTransform)new CalculateTotalFlow(options.getWindowDuration()));
        for (int i = 0; i < resultList.size(); ++i) {
            resultList.get(i).apply("BigQuery Write Rows" + i, (PTransform)BigQueryIO.writeTableRows().to(tableRef).withSchema(TriggerExample.getSchema()));
        }
        PipelineResult result = pipeline.run();
        exampleUtils.waitToFinish(result);
    }

    private static @UnknownKeyFor @NonNull @Initialized TableReference getTableReference(@UnknownKeyFor @NonNull @Initialized String project, @UnknownKeyFor @NonNull @Initialized String dataset, @UnknownKeyFor @NonNull @Initialized String table) {
        TableReference tableRef = new TableReference();
        tableRef.setProjectId(project);
        tableRef.setDatasetId(dataset);
        tableRef.setTableId(table);
        return tableRef;
    }

    private static @UnknownKeyFor @NonNull @Initialized TableSchema getSchema() {
        ArrayList<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
        fields.add(new TableFieldSchema().setName("trigger_type").setType("STRING"));
        fields.add(new TableFieldSchema().setName("freeway").setType("STRING"));
        fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("number_of_records").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("window").setType("STRING"));
        fields.add(new TableFieldSchema().setName("isFirst").setType("BOOLEAN"));
        fields.add(new TableFieldSchema().setName("isLast").setType("BOOLEAN"));
        fields.add(new TableFieldSchema().setName("timing").setType("STRING"));
        fields.add(new TableFieldSchema().setName("event_time").setType("TIMESTAMP"));
        fields.add(new TableFieldSchema().setName("processing_time").setType("TIMESTAMP"));
        return new TableSchema().setFields(fields);
    }

    private static @UnknownKeyFor @NonNull @Initialized Integer tryIntegerParse(@UnknownKeyFor @NonNull @Initialized String number) {
        try {
            return Integer.parseInt(number);
        }
        catch (NumberFormatException e) {
            return null;
        }
    }

    public static class InsertDelays
    extends DoFn<String, String> {
        private static final @UnknownKeyFor @NonNull @Initialized double THRESHOLD = 0.001;
        private static final @UnknownKeyFor @NonNull @Initialized int MIN_DELAY = 1;
        private static final @UnknownKeyFor @NonNull @Initialized int MAX_DELAY = 100;

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) throws @UnknownKeyFor @NonNull @Initialized Exception {
            Instant timestamp = Instant.now();
            Random random = new Random();
            if (random.nextDouble() < 0.001) {
                int range = 99;
                int delayInMinutes = random.nextInt(range) + 1;
                long delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes);
                timestamp = new Instant(timestamp.getMillis() - delayInMillis);
            }
            c.outputWithTimestamp((Object)((String)c.element()), timestamp);
        }
    }

    public static interface TrafficFlowOptions
    extends ExampleOptions,
    ExampleBigQueryTableOptions,
    StreamingOptions {
        @Description(value="Input file to read from")
        @Default.String(value="gs://apache-beam-samples/traffic_sensor/Freeways-5Minaa2010-01-01_to_2010-02-15.csv")
        public @UnknownKeyFor @NonNull @Initialized String getInput();

        public void setInput(@UnknownKeyFor @NonNull @Initialized String var1);

        @Description(value="Numeric value of window duration for fixed windows, in minutes")
        @Default.Integer(value=30)
        public @UnknownKeyFor @NonNull @Initialized Integer getWindowDuration();

        public void setWindowDuration(@UnknownKeyFor @NonNull @Initialized Integer var1);
    }

    static class ExtractFlowInfo
    extends DoFn<String, KV<String, Integer>> {
        private static final @UnknownKeyFor @NonNull @Initialized int VALID_NUM_FIELDS = 50;

        ExtractFlowInfo() {
        }

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) throws @UnknownKeyFor @NonNull @Initialized Exception {
            String[] laneInfo = ((String)c.element()).split(",", -1);
            if ("timestamp".equals(laneInfo[0])) {
                return;
            }
            if (laneInfo.length < 50) {
                return;
            }
            String freeway = laneInfo[2];
            Integer totalFlow = TriggerExample.tryIntegerParse(laneInfo[7]);
            if (totalFlow == null || totalFlow <= 0) {
                return;
            }
            c.output((Object)KV.of((Object)freeway, (Object)totalFlow));
        }
    }

    static class FormatTotalFlow
    extends DoFn<KV<String, String>, TableRow> {
        private @UnknownKeyFor @NonNull @Initialized String triggerType;

        public FormatTotalFlow(@UnknownKeyFor @NonNull @Initialized String triggerType) {
            this.triggerType = triggerType;
        }

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c, @UnknownKeyFor @NonNull @Initialized BoundedWindow window) throws @UnknownKeyFor @NonNull @Initialized Exception {
            String[] values = ((String)((KV)c.element()).getValue()).split(",", -1);
            TableRow row = new TableRow().set("trigger_type", (Object)this.triggerType).set("freeway", ((KV)c.element()).getKey()).set("total_flow", (Object)Integer.parseInt(values[0])).set("number_of_records", (Object)Long.parseLong(values[1])).set("window", (Object)window.toString()).set("isFirst", (Object)c.pane().isFirst()).set("isLast", (Object)c.pane().isLast()).set("timing", (Object)c.pane().getTiming().toString()).set("event_time", (Object)c.timestamp().toString()).set("processing_time", (Object)Instant.now().toString());
            c.output((Object)row);
        }
    }

    static class TotalFlow
    extends PTransform<PCollection<KV<String, Integer>>, PCollection<TableRow>> {
        private @UnknownKeyFor @NonNull @Initialized String triggerType;

        public TotalFlow(@UnknownKeyFor @NonNull @Initialized String triggerType) {
            this.triggerType = triggerType;
        }

        public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized TableRow> expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer>> flowInfo) {
            PCollection flowPerFreeway = (PCollection)flowInfo.apply((PTransform)GroupByKey.create());
            PCollection results = (PCollection)flowPerFreeway.apply((PTransform)ParDo.of((DoFn)new DoFn<KV<String, Iterable<Integer>>, KV<String, String>>(){

                @DoFn.ProcessElement
                public void processElement(/*
                 * Issues handling annotations - annotations may be inaccurate
                 */
                // Could not load outer class - annotation placement on inner may be incorrect
                @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) throws @UnknownKeyFor @NonNull @Initialized Exception {
                    Iterable flows = (Iterable)((KV)c.element()).getValue();
                    Integer sum = 0;
                    Long numberOfRecords = 0L;
                    for (Integer value : flows) {
                        sum = sum + value;
                        Long l = numberOfRecords;
                        Long l2 = numberOfRecords = Long.valueOf(numberOfRecords + 1L);
                    }
                    c.output((Object)KV.of((Object)((String)((KV)c.element()).getKey()), (Object)(sum + "," + numberOfRecords)));
                }
            }));
            PCollection output = (PCollection)results.apply((PTransform)ParDo.of((DoFn)new FormatTotalFlow(this.triggerType)));
            return output;
        }
    }

    static class CalculateTotalFlow
    extends PTransform<PCollection<KV<String, Integer>>, PCollectionList<TableRow>> {
        private @UnknownKeyFor @NonNull @Initialized int windowDuration;

        CalculateTotalFlow(@UnknownKeyFor @NonNull @Initialized int windowDuration) {
            this.windowDuration = windowDuration;
        }

        public @UnknownKeyFor @NonNull @Initialized PCollectionList<@UnknownKeyFor @NonNull @Initialized TableRow> expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer>> flowInfo) {
            PCollection defaultTriggerResults = (PCollection)((PCollection)flowInfo.apply("Default", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)this.windowDuration))).triggering((Trigger)Repeatedly.forever((Trigger)AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.ZERO).discardingFiredPanes())).apply("Total Flow Default", (PTransform)new TotalFlow("default"));
            PCollection withAllowedLatenessResults = (PCollection)((PCollection)flowInfo.apply("WithLateData", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)this.windowDuration))).triggering((Trigger)Repeatedly.forever((Trigger)AfterWatermark.pastEndOfWindow())).discardingFiredPanes().withAllowedLateness(ONE_DAY))).apply("Total Flow WithAllowedLateness", (PTransform)new TotalFlow("withAllowedLateness"));
            PCollection speculativeResults = (PCollection)((PCollection)flowInfo.apply("Speculative", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)this.windowDuration))).triggering((Trigger)Repeatedly.forever((Trigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE))).accumulatingFiredPanes().withAllowedLateness(ONE_DAY))).apply("Total Flow Speculative", (PTransform)new TotalFlow("speculative"));
            PCollection sequentialResults = (PCollection)((PCollection)flowInfo.apply("Sequential", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)this.windowDuration))).triggering((Trigger)AfterEach.inOrder((Trigger[])new Trigger[]{Repeatedly.forever((Trigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(ONE_MINUTE)).orFinally((Trigger.OnceTrigger)AfterWatermark.pastEndOfWindow()), Repeatedly.forever((Trigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES))})).accumulatingFiredPanes().withAllowedLateness(ONE_DAY))).apply("Total Flow Sequential", (PTransform)new TotalFlow("sequential"));
            PCollectionList resultsList = PCollectionList.of((PCollection)defaultTriggerResults).and(withAllowedLatenessResults).and(speculativeResults).and(sequentialResults);
            return resultsList;
        }
    }
}

