/*
 * Decompiled with CFR 0.152.
 */
package stream.scotty.demo.flink;

import java.io.Serializable;
import java.util.Random;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;

public class DemoSource
extends RichSourceFunction<Tuple2<Integer, Integer>>
implements Serializable {
    private Random key;
    private Random value;
    private boolean canceled = false;
    private long watermarkDelay = 1000L;
    public long lastWaterMarkTs = 0L;

    public DemoSource() {
    }

    public DemoSource(long watermarkDelay) {
        this.watermarkDelay = watermarkDelay;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.key = new Random(42L);
        this.value = new Random(43L);
    }

    public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
        while (!this.canceled) {
            ctx.collectWithTimestamp((Object)new Tuple2((Object)1, (Object)this.value.nextInt(10)), System.currentTimeMillis());
            if (this.lastWaterMarkTs + 1000L < System.currentTimeMillis()) {
                long watermark = System.currentTimeMillis() - this.watermarkDelay;
                ctx.emitWatermark(new Watermark(watermark));
                this.lastWaterMarkTs = System.currentTimeMillis();
            }
            Thread.sleep(1L);
        }
    }

    public void cancel() {
        this.canceled = true;
    }
}

