/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class KafkaStreaming {
    private static final @UnknownKeyFor @NonNull @Initialized String TOPIC_NAME = "my-topic";
    private static final @UnknownKeyFor @NonNull @Initialized int ALLOWED_LATENESS_TIME = 1;
    private static final @UnknownKeyFor @NonNull @Initialized int TIME_OUTPUT_AFTER_FIRST_ELEMENT = 10;
    private static final @UnknownKeyFor @NonNull @Initialized int WINDOW_TIME = 30;
    private static final @UnknownKeyFor @NonNull @Initialized int MESSAGES_COUNT = 100;
    private static final @UnknownKeyFor @NonNull @Initialized String @UnknownKeyFor @NonNull @Initialized [] NAMES = new String[]{"Alice", "Bob", "Charlie", "David"};
    private static final @UnknownKeyFor @NonNull @Initialized DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern((String)"HH:mm:ss");

    public static void main(@UnknownKeyFor @NonNull @Initialized String @UnknownKeyFor @NonNull @Initialized [] args) {
        Duration windowSize = Duration.standardSeconds((long)30L);
        Instant nextWindowStart = new Instant(Instant.now().getMillis() + windowSize.getMillis() - Instant.now().plus((ReadableDuration)windowSize).getMillis() % windowSize.getMillis());
        KafkaStreamingOptions options = (KafkaStreamingOptions)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(KafkaStreamingOptions.class);
        Timer timer = new Timer();
        KafkaProducer producer = new KafkaProducer(options);
        timer.schedule((TimerTask)producer, nextWindowStart.toDate());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(options);
        kafkaConsumer.run();
    }

    public static class IntermittentlyFailingIntegerDeserializer
    implements Deserializer<Integer> {
        public static final @UnknownKeyFor @NonNull @Initialized IntegerDeserializer INTEGER_DESERIALIZER = new IntegerDeserializer();
        public @UnknownKeyFor @NonNull @Initialized int deserializeCount = 0;

        public @UnknownKeyFor @NonNull @Initialized Integer deserialize(@UnknownKeyFor @NonNull @Initialized String topic, @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] data) {
            ++this.deserializeCount;
            if (this.deserializeCount % 10 == 0) {
                throw new SerializationException("Expected Serialization Exception");
            }
            return INTEGER_DESERIALIZER.deserialize(topic, data);
        }
    }

    static class LogErrors
    extends PTransform<PCollection<BadRecord>, PCollection<BadRecord>> {
        LogErrors() {
        }

        public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized BadRecord> expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized BadRecord> input) {
            return (PCollection)input.apply("Log Errors", (PTransform)ParDo.of((DoFn)new LogErrorFn()));
        }

        static class LogErrorFn
        extends DoFn<BadRecord, BadRecord> {
            LogErrorFn() {
            }

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element @UnknownKeyFor @NonNull @Initialized BadRecord record, // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized BadRecord> receiver) {
                System.out.println(record);
                receiver.output((Object)record);
            }
        }
    }

    static class LogResults
    extends DoFn<Map<String, Integer>, Map<String, Integer>> {
        LogResults() {
        }

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c, @UnknownKeyFor @NonNull @Initialized IntervalWindow w) throws @UnknownKeyFor @NonNull @Initialized Exception {
            Map map = (Map)c.element();
            if (map == null) {
                c.output((Object)((Map)c.element()));
                return;
            }
            String startTime = w.start().toString(dateTimeFormatter);
            String endTime = w.end().toString(dateTimeFormatter);
            PaneInfo.Timing timing = c.pane().getTiming();
            switch (timing) {
                case EARLY: {
                    System.out.println("Live score (running sum) for the current round:");
                    break;
                }
                case ON_TIME: {
                    System.out.println("Final score for the current round:");
                    break;
                }
                case LATE: {
                    System.out.printf("Late score for the round from %s to %s:%n", startTime, endTime);
                    break;
                }
                default: {
                    throw new RuntimeException("Unknown timing value");
                }
            }
            for (Map.Entry entry : map.entrySet()) {
                System.out.printf("%10s: %-10s%n", entry.getKey(), entry.getValue());
            }
            if (timing == PaneInfo.Timing.ON_TIME) {
                System.out.printf("======= End of round from %s to %s =======%n%n", startTime, endTime);
            } else {
                System.out.println();
            }
            c.output((Object)((Map)c.element()));
        }
    }

    static class WindowCombineFn
    extends Combine.CombineFn<KV<String, Integer>, Map<String, Integer>, Map<String, Integer>> {
        WindowCombineFn() {
        }

        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer> createAccumulator() {
            return new HashMap<String, Integer>();
        }

        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer> addInput(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer> mutableAccumulator, @UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer> input) {
            assert (input != null);
            assert (mutableAccumulator != null);
            mutableAccumulator.put((String)input.getKey(), (Integer)input.getValue());
            return mutableAccumulator;
        }

        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer> mergeAccumulators(@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer>> accumulators) {
            HashMap<String, Integer> result = new HashMap<String, Integer>();
            for (Map<String, Integer> acc : accumulators) {
                for (Map.Entry<String, Integer> kv : acc.entrySet()) {
                    if (result.containsKey(kv.getKey())) {
                        result.put(kv.getKey(), (Integer)result.get(kv.getKey()) + kv.getValue());
                        continue;
                    }
                    result.put(kv.getKey(), kv.getValue());
                }
            }
            return result;
        }

        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer> extractOutput(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer> accumulator) {
            return accumulator;
        }
    }

    public static class KafkaConsumer {
        private final @UnknownKeyFor @NonNull @Initialized KafkaStreamingOptions options;

        public KafkaConsumer(@UnknownKeyFor @NonNull @Initialized KafkaStreamingOptions options) {
            this.options = options;
        }

        public void run() {
            PCollection pCollection;
            Pipeline pipeline = Pipeline.create((PipelineOptions)this.options);
            Window window = Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardSeconds((long)30L)));
            AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds((long)10L));
            HashMap<String, String> consumerConfig = new HashMap<String, String>();
            consumerConfig.put("auto.offset.reset", "latest");
            try (ErrorHandler.BadRecordErrorHandler errorHandler = pipeline.registerBadRecordErrorHandler((PTransform)new LogErrors());){
                pCollection = (PCollection)pipeline.apply(KafkaIO.read().withBootstrapServers(this.options.getKafkaHost()).withTopic(KafkaStreaming.TOPIC_NAME).withKeyDeserializer(StringDeserializer.class).withValueDeserializer(IntermittentlyFailingIntegerDeserializer.class).withConsumerConfigUpdates(consumerConfig).withBadRecordErrorHandler((ErrorHandler)errorHandler).withoutMetadata());
            }
            ((PCollection)((PCollection)((PCollection)pCollection.apply((PTransform)window.triggering((Trigger)Repeatedly.forever((Trigger)trigger)).withAllowedLateness(Duration.standardSeconds((long)1L)).accumulatingFiredPanes())).apply((PTransform)Sum.integersPerKey())).apply((PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new WindowCombineFn()).withoutDefaults())).apply((PTransform)ParDo.of((DoFn)new LogResults()));
            pipeline.run().waitUntilFinish();
            System.out.println("Pipeline finished");
        }
    }

    public static class KafkaProducer
    extends TimerTask {
        private final @UnknownKeyFor @NonNull @Initialized KafkaStreamingOptions options;

        public KafkaProducer(@UnknownKeyFor @NonNull @Initialized KafkaStreamingOptions options) {
            this.options = options;
        }

        @Override
        public void run() {
            Pipeline pipeline = Pipeline.create((PipelineOptions)this.options);
            PCollection input = (PCollection)((PCollection)pipeline.apply((PTransform)GenerateSequence.from((long)0L).withRate(100L, Duration.standardSeconds((long)30L)).withTimestampFn((SerializableFunction & Serializable)n -> new Instant(System.currentTimeMillis())))).apply((PTransform)ParDo.of((DoFn)new RandomUserScoreGeneratorFn()));
            input.apply((PTransform)KafkaIO.write().withBootstrapServers(this.options.getKafkaHost()).withTopic(KafkaStreaming.TOPIC_NAME).withKeySerializer(StringSerializer.class).withValueSerializer(IntegerSerializer.class).withProducerConfigUpdates(new HashMap()));
            pipeline.run().waitUntilFinish();
        }

        static class RandomUserScoreGeneratorFn
        extends DoFn<Object, KV<String, Integer>> {
            private static final @UnknownKeyFor @NonNull @Initialized int MAX_SCORE = 100;

            RandomUserScoreGeneratorFn() {
            }

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) {
                c.output(this.generate());
            }

            public @UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer> generate() {
                Random random = new Random();
                String randomName = NAMES[random.nextInt(NAMES.length)];
                int randomScore = random.nextInt(100) + 1;
                return KV.of((Object)randomName, (Object)randomScore);
            }
        }
    }

    public static interface KafkaStreamingOptions
    extends PipelineOptions {
        @Description(value="Kafka server host")
        @Default.String(value="kafka_server:9092")
        public @UnknownKeyFor @NonNull @Initialized String getKafkaHost();

        public void setKafkaHost(@UnknownKeyFor @NonNull @Initialized String var1);
    }
}

