/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.connectors.flink;

import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.TruncatedDataException;
import io.pravega.connectors.flink.AbstractStreamingReaderBuilder;
import io.pravega.connectors.flink.ReaderCheckpointHook;
import io.pravega.connectors.flink.serialization.DeserializerFromSchemaRegistry;
import io.pravega.connectors.flink.serialization.PravegaDeserializationSchema;
import io.pravega.connectors.flink.util.FlinkPravegaUtils;
import io.pravega.connectors.flink.watermark.AssignerWithTimeWindows;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkPravegaReader<T>
extends RichParallelSourceFunction<T>
implements ResultTypeQueryable<T>,
ExternallyInducedSource<T, Checkpoint> {
    private static final Logger log = LoggerFactory.getLogger(FlinkPravegaReader.class);
    protected static final String PRAVEGA_READER_METRICS_GROUP = "PravegaReader";
    protected static final String READER_GROUP_METRICS_GROUP = "readerGroup";
    protected static final String STREAM_METRICS_GROUP = "stream";
    protected static final String UNREAD_BYTES_METRICS_GAUGE = "unreadBytes";
    protected static final String READER_GROUP_NAME_METRICS_GAUGE = "readerGroupName";
    protected static final String SCOPE_NAME_METRICS_GAUGE = "scope";
    protected static final String ONLINE_READERS_METRICS_GAUGE = "onlineReaders";
    protected static final String STREAM_NAMES_METRICS_GAUGE = "streams";
    protected static final String SEGMENT_POSITIONS_METRICS_GAUGE = "segmentPositions";
    protected static final String SEPARATOR = ",";
    private static final long serialVersionUID = 1L;
    protected transient EventStreamClientFactory eventStreamClientFactory;
    protected transient ReaderGroupManager readerGroupManager = null;
    final String hookUid;
    final ClientConfig clientConfig;
    final ReaderGroupConfig readerGroupConfig;
    final String readerGroupScope;
    final String readerGroupName;
    final DeserializationSchema<T> deserializationSchema;
    final SerializedValue<AssignerWithTimeWindows<T>> assignerWithTimeWindows;
    final Time eventReadTimeout;
    final Time checkpointInitiateTimeout;
    final boolean enableMetrics;
    volatile boolean running = true;
    private transient ExternallyInducedSource.CheckpointTrigger checkpointTrigger;
    private transient ReaderGroup readerGroup = null;

    protected FlinkPravegaReader(String hookUid, ClientConfig clientConfig, ReaderGroupConfig readerGroupConfig, String readerGroupScope, String readerGroupName, DeserializationSchema<T> deserializationSchema, SerializedValue<AssignerWithTimeWindows<T>> assignerWithTimeWindows, Time eventReadTimeout, Time checkpointInitiateTimeout, boolean enableMetrics) {
        this.hookUid = (String)Preconditions.checkNotNull((Object)hookUid, (String)"hookUid");
        this.clientConfig = (ClientConfig)Preconditions.checkNotNull((Object)clientConfig, (String)"clientConfig");
        this.readerGroupConfig = (ReaderGroupConfig)Preconditions.checkNotNull((Object)readerGroupConfig, (String)"readerGroupConfig");
        this.readerGroupScope = (String)Preconditions.checkNotNull((Object)readerGroupScope, (String)"readerGroupScope");
        this.readerGroupName = (String)Preconditions.checkNotNull((Object)readerGroupName, (String)READER_GROUP_NAME_METRICS_GAUGE);
        this.deserializationSchema = (DeserializationSchema)Preconditions.checkNotNull(deserializationSchema, (String)"deserializationSchema");
        this.eventReadTimeout = (Time)Preconditions.checkNotNull((Object)eventReadTimeout, (String)"eventReadTimeout");
        this.checkpointInitiateTimeout = (Time)Preconditions.checkNotNull((Object)checkpointInitiateTimeout, (String)"checkpointInitiateTimeout");
        this.enableMetrics = enableMetrics;
        this.assignerWithTimeWindows = assignerWithTimeWindows;
    }

    void initialize() {
        this.createEventStreamClientFactory();
        log.info("Creating reader group: {}/{} for the Flink job", (Object)this.readerGroupScope, (Object)this.readerGroupName);
        this.createReaderGroupManager();
        this.createReaderGroup();
        if (this.isEventTimeMode()) {
            Preconditions.checkArgument((this.readerGroup.getStreamNames().size() == 1 ? 1 : 0) != 0, (Object)"Only 1 Pravega stream is allowed in the event-time mode");
        }
    }

    private boolean isEventTimeMode() {
        return this.assignerWithTimeWindows != null;
    }

    private long autoWatermarkInterval() {
        return this.getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
        RuntimeContext runtimeContext = this.getRuntimeContext();
        String readerId = FlinkPravegaUtils.getReaderName(runtimeContext.getTaskName(), runtimeContext.getIndexOfThisSubtask() + 1, runtimeContext.getNumberOfParallelSubtasks());
        log.info("{} : Creating Pravega reader with ID '{}' for controller URI: {}", new Object[]{runtimeContext.getTaskNameWithSubtasks(), readerId, this.clientConfig.getControllerURI()});
        try (EventStreamReader<T> pravegaReader = this.createEventStreamReader(readerId);){
            Function<EventRead, Object> deserFunc;
            log.info("Starting Pravega reader '{}' for controller URI {}", (Object)readerId, (Object)this.clientConfig.getControllerURI());
            long previousTimestamp = Long.MIN_VALUE;
            AssignerWithTimeWindows assigner = null;
            if (this.isEventTimeMode()) {
                assigner = (AssignerWithTimeWindows)this.assignerWithTimeWindows.deserializeValue(runtimeContext.getUserCodeClassLoader());
                PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(pravegaReader, ctx, runtimeContext.getUserCodeClassLoader(), ((StreamingRuntimeContext)runtimeContext).getProcessingTimeService());
                log.info("Periodic Watermark Emitter for Reader ID: {} has started with an interval of {}", (Object)readerId, (Object)this.autoWatermarkInterval());
                periodicEmitter.start();
            }
            Function<EventRead, Object> function = this.deserializationSchema instanceof PravegaDeserializationSchema ? ((PravegaDeserializationSchema)this.deserializationSchema)::extractEvent : (deserFunc = eventRead -> eventRead.getEvent());
            while (this.running) {
                EventRead<T> eventRead2;
                try {
                    eventRead2 = pravegaReader.readNextEvent(this.eventReadTimeout.toMilliseconds());
                }
                catch (TruncatedDataException e) {
                    continue;
                }
                Object event = deserFunc.apply(eventRead2);
                if (event != null) {
                    if (this.deserializationSchema.isEndOfStream(event)) {
                        log.info("Reached end of stream for reader: {}", (Object)readerId);
                        return;
                    }
                    Object object = ctx.getCheckpointLock();
                    synchronized (object) {
                        if (this.isEventTimeMode()) {
                            long currentTimestamp = assigner.extractTimestamp(event, previousTimestamp);
                            ctx.collectWithTimestamp(event, currentTimestamp);
                            previousTimestamp = currentTimestamp;
                        } else {
                            ctx.collect(event);
                        }
                    }
                }
                if (!eventRead2.isCheckpoint()) continue;
                this.triggerCheckpoint(eventRead2.getCheckpointName());
            }
        }
        catch (RuntimeException e) {
            log.error("Exception occurred while creating a Pravega EventStreamReader to read events", (Throwable)e);
            throw e;
        }
    }

    public void cancel() {
        this.running = false;
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    public void open(Configuration parameters) throws Exception {
        this.createEventStreamClientFactory();
        this.createReaderGroupManager();
        this.createReaderGroup();
        if (this.enableMetrics) {
            this.registerMetrics();
        }
        if (this.isEventTimeMode()) {
            Preconditions.checkArgument((this.autoWatermarkInterval() > 0L ? 1 : 0) != 0, (Object)"Periodic watermark interval should be positive, please use env.getConfig().setAutoWatermarkInterval() to set a positive number. Recommended value: 10000");
        }
    }

    public void close() throws Exception {
        Throwable ex = null;
        if (this.eventStreamClientFactory != null) {
            try {
                log.info("Closing Pravega eventStreamClientFactory");
                this.eventStreamClientFactory.close();
            }
            catch (Throwable e) {
                if (e instanceof InterruptedException) {
                    log.warn("Interrupted while waiting for eventStreamClientFactory to close, retrying ...");
                    this.eventStreamClientFactory.close();
                }
                ex = ExceptionUtils.firstOrSuppressed((Throwable)e, ex);
            }
        }
        if (this.readerGroupManager != null) {
            log.info("Closing Pravega ReaderGroupManager");
            try {
                this.readerGroupManager.close();
            }
            catch (Throwable e) {
                if (e instanceof InterruptedException) {
                    log.warn("Interrupted while waiting for ReaderGroupManager to close, retrying ...");
                    this.readerGroupManager.close();
                }
                ex = ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)ex);
            }
        }
        if (this.readerGroup != null) {
            try {
                log.info("Closing Pravega ReaderGroup");
                this.readerGroup.close();
            }
            catch (Throwable e) {
                if (e instanceof InterruptedException) {
                    log.warn("Interrupted while waiting for ReaderGroup to close, retrying ...");
                    this.readerGroup.close();
                }
                ex = ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)ex);
            }
        }
        if (ex != null && ex instanceof Exception) {
            throw (Exception)ex;
        }
    }

    public MasterTriggerRestoreHook<Checkpoint> createMasterTriggerRestoreHook() {
        return new ReaderCheckpointHook(this.hookUid, this.readerGroupName, this.readerGroupScope, this.checkpointInitiateTimeout, this.clientConfig, this.readerGroupConfig);
    }

    public void setCheckpointTrigger(ExternallyInducedSource.CheckpointTrigger checkpointTrigger) {
        this.checkpointTrigger = checkpointTrigger;
    }

    private void triggerCheckpoint(String checkpointIdentifier) throws FlinkException {
        long checkpointId;
        Preconditions.checkState((this.checkpointTrigger != null ? 1 : 0) != 0, (Object)"checkpoint trigger not set");
        log.debug("{} received checkpoint event for {}", (Object)this.getRuntimeContext().getTaskNameWithSubtasks(), (Object)checkpointIdentifier);
        try {
            checkpointId = ReaderCheckpointHook.parseCheckpointId(checkpointIdentifier);
        }
        catch (IllegalArgumentException e) {
            throw new FlinkException("Cannot trigger checkpoint due to invalid Pravega checkpoint name", e.getCause());
        }
        this.checkpointTrigger.triggerCheckpoint(checkpointId);
    }

    private void registerMetrics() {
        Preconditions.checkState((this.readerGroup != null ? 1 : 0) != 0, (Object)"Reader Group is not created");
        MetricGroup pravegaReaderMetricGroup = this.getRuntimeContext().getMetricGroup().addGroup(PRAVEGA_READER_METRICS_GROUP);
        MetricGroup readerGroupMetricGroup = pravegaReaderMetricGroup.addGroup(READER_GROUP_METRICS_GROUP);
        readerGroupMetricGroup.gauge(UNREAD_BYTES_METRICS_GAUGE, (Gauge)new UnreadBytesGauge(this.readerGroup));
        readerGroupMetricGroup.gauge(READER_GROUP_NAME_METRICS_GAUGE, (Gauge)new ReaderGroupNameGauge(this.readerGroup));
        readerGroupMetricGroup.gauge(SCOPE_NAME_METRICS_GAUGE, (Gauge)new ScopeNameGauge(this.readerGroup));
        readerGroupMetricGroup.gauge(ONLINE_READERS_METRICS_GAUGE, (Gauge)new OnlineReadersGauge(this.readerGroup));
        readerGroupMetricGroup.gauge(STREAM_NAMES_METRICS_GAUGE, (Gauge)new StreamNamesGauge(this.readerGroup));
        Set<String> streamNames = this.readerGroup.getStreamNames();
        for (String scopedStream : streamNames) {
            String[] streamInfo = scopedStream.split("/", 2);
            Preconditions.checkArgument((streamInfo.length == 2 ? 1 : 0) != 0, (Object)"not a fully qualified stream expected: scopeName/streamName");
            MetricGroup streamMetricGroup = readerGroupMetricGroup.addGroup("stream." + streamInfo[0] + "_" + streamInfo[1]);
            streamMetricGroup.gauge(SEGMENT_POSITIONS_METRICS_GAUGE, (Gauge)new SegmentPositionsGauge(this.readerGroup, streamInfo[0], streamInfo[1]));
        }
    }

    private ReaderGroup createReaderGroup() {
        this.readerGroupManager.createReaderGroup(this.readerGroupName, this.readerGroupConfig);
        this.readerGroup = this.readerGroupManager.getReaderGroup(this.readerGroupName);
        return this.readerGroup;
    }

    protected ReaderGroupManager createReaderGroupManager() {
        if (this.readerGroupManager == null) {
            this.readerGroupManager = ReaderGroupManager.withScope(this.readerGroupScope, this.clientConfig);
        }
        return this.readerGroupManager;
    }

    protected EventStreamClientFactory createEventStreamClientFactory() {
        if (this.eventStreamClientFactory == null) {
            this.eventStreamClientFactory = EventStreamClientFactory.withScope(this.readerGroupScope, this.clientConfig);
        }
        return this.eventStreamClientFactory;
    }

    protected EventStreamReader<T> createEventStreamReader(String readerId) {
        return FlinkPravegaUtils.createPravegaReader(readerId, this.readerGroupName, this.deserializationSchema, ReaderConfig.builder().build(), this.eventStreamClientFactory);
    }

    public static <T> Builder<T> builder() {
        return new Builder();
    }

    public static class Builder<T>
    extends AbstractStreamingReaderBuilder<T, Builder<T>> {
        private DeserializationSchema<T> deserializationSchema;
        private SerializedValue<AssignerWithTimeWindows<T>> assignerWithTimeWindows;

        @Override
        protected Builder<T> builder() {
            return this;
        }

        public Builder<T> withDeserializationSchema(DeserializationSchema<T> deserializationSchema) {
            this.deserializationSchema = deserializationSchema;
            return this.builder();
        }

        public Builder<T> withDeserializationSchemaFromRegistry(String groupId, Class<T> tClass) {
            this.deserializationSchema = new PravegaDeserializationSchema<T>(tClass, new DeserializerFromSchemaRegistry<T>(this.getPravegaConfig(), groupId, tClass));
            return this.builder();
        }

        public Builder<T> withTimestampAssigner(AssignerWithTimeWindows<T> assignerWithTimeWindows) {
            try {
                ClosureCleaner.clean(assignerWithTimeWindows, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
                this.assignerWithTimeWindows = new SerializedValue(assignerWithTimeWindows);
            }
            catch (IOException e) {
                throw new IllegalArgumentException("The given assigner is not serializable", e);
            }
            return this;
        }

        @Override
        protected DeserializationSchema<T> getDeserializationSchema() {
            Preconditions.checkState((this.deserializationSchema != null ? 1 : 0) != 0, (Object)"Deserialization schema must not be null.");
            return this.deserializationSchema;
        }

        @Override
        protected SerializedValue<AssignerWithTimeWindows<T>> getAssignerWithTimeWindows() {
            return this.assignerWithTimeWindows;
        }

        public FlinkPravegaReader<T> build() {
            FlinkPravegaReader reader = this.buildSourceFunction();
            reader.initialize();
            return reader;
        }
    }

    private static class SegmentPositionsGauge
    implements Gauge<String> {
        private final ReaderGroup readerGroup;
        private final String scope;
        private final String stream;

        public SegmentPositionsGauge(ReaderGroup readerGroup, String scope, String stream) {
            this.readerGroup = readerGroup;
            this.scope = scope;
            this.stream = stream;
        }

        public String getValue() {
            StringBuilder builder = new StringBuilder();
            builder.append("scope=").append(this.scope).append(", ");
            builder.append("stream=").append(this.stream).append(", segments={");
            Map<Stream, StreamCut> streamCuts = this.readerGroup.getStreamCuts();
            Optional<Map.Entry> optionalStreamCutEntry = streamCuts.entrySet().stream().filter(e -> ((Stream)e.getKey()).getStreamName().equals(this.stream) && ((Stream)e.getKey()).getScope().equals(this.scope)).findFirst();
            if (optionalStreamCutEntry.isPresent()) {
                builder.append(((StreamCut)optionalStreamCutEntry.get().getValue()).toString());
            }
            builder.append("}");
            return builder.toString();
        }
    }

    private static class StreamNamesGauge
    implements Gauge<String> {
        private final ReaderGroup readerGroup;

        public StreamNamesGauge(ReaderGroup readerGroup) {
            this.readerGroup = readerGroup;
        }

        public String getValue() {
            return this.readerGroup.getStreamNames().stream().collect(Collectors.joining(FlinkPravegaReader.SEPARATOR));
        }
    }

    private static class OnlineReadersGauge
    implements Gauge<String> {
        private final ReaderGroup readerGroup;

        public OnlineReadersGauge(ReaderGroup readerGroup) {
            this.readerGroup = readerGroup;
        }

        public String getValue() {
            return this.readerGroup.getOnlineReaders().stream().collect(Collectors.joining(FlinkPravegaReader.SEPARATOR));
        }
    }

    private static class ScopeNameGauge
    implements Gauge<String> {
        private final ReaderGroup readerGroup;

        public ScopeNameGauge(ReaderGroup readerGroup) {
            this.readerGroup = readerGroup;
        }

        public String getValue() {
            return this.readerGroup.getScope();
        }
    }

    private static class ReaderGroupNameGauge
    implements Gauge<String> {
        private final ReaderGroup readerGroup;

        public ReaderGroupNameGauge(ReaderGroup readerGroup) {
            this.readerGroup = readerGroup;
        }

        public String getValue() {
            return this.readerGroup.getGroupName();
        }
    }

    private static class UnreadBytesGauge
    implements Gauge<Long> {
        private final ReaderGroup readerGroup;

        public UnreadBytesGauge(ReaderGroup readerGroup) {
            this.readerGroup = readerGroup;
        }

        public Long getValue() {
            return this.readerGroup.getMetrics().unreadBytes();
        }
    }

    private class PeriodicWatermarkEmitter
    implements ProcessingTimeCallback {
        private EventStreamReader<?> pravegaReader;
        private Stream stream;
        private final SourceFunction.SourceContext<?> ctx;
        private final ProcessingTimeService timerService;
        private long lastWatermarkTimestamp;
        private AssignerWithTimeWindows<?> userAssigner;

        protected PeriodicWatermarkEmitter(EventStreamReader<?> pravegaReader, SourceFunction.SourceContext<?> ctx, ClassLoader userCodeClassLoader, ProcessingTimeService timerService) throws Exception {
            this.pravegaReader = (EventStreamReader)Preconditions.checkNotNull(pravegaReader);
            this.stream = Stream.of(FlinkPravegaReader.this.readerGroup.getStreamNames().iterator().next());
            this.ctx = (SourceFunction.SourceContext)Preconditions.checkNotNull(ctx);
            this.timerService = (ProcessingTimeService)Preconditions.checkNotNull((Object)timerService);
            this.lastWatermarkTimestamp = Long.MIN_VALUE;
            this.userAssigner = (AssignerWithTimeWindows)FlinkPravegaReader.this.assignerWithTimeWindows.deserializeValue(userCodeClassLoader);
        }

        protected void start() {
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + FlinkPravegaReader.this.autoWatermarkInterval(), (ProcessingTimeCallback)this);
        }

        public void onProcessingTime(long timestamp) {
            Watermark watermark = this.userAssigner.getWatermark(this.pravegaReader.getCurrentTimeWindow(this.stream));
            if (watermark != null && watermark.getTimestamp() > this.lastWatermarkTimestamp) {
                this.lastWatermarkTimestamp = watermark.getTimestamp();
                log.debug("Emit watermark with timestamp: {}", (Object)watermark.getTimestamp());
                this.ctx.emitWatermark(watermark);
            }
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + FlinkPravegaReader.this.autoWatermarkInterval(), (ProcessingTimeCallback)this);
        }
    }
}

