/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.sls.source;

import com.alibaba.ververica.connectors.common.source.resolver.RecordResolver;
import com.alibaba.ververica.connectors.sls.source.SlsSourceFunction;
import com.alibaba.ververica.connectors.sls.source.SourceRecord;
import java.io.IOException;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.Collector;

public class SLSSourceFunctionAdapter
extends RichParallelSourceFunction<RowData>
implements CheckpointedFunction,
ResultTypeQueryable<RowData> {
    private SlsSourceFunction slsSourceFunction;
    private RecordResolver<SourceRecord, RowData> slsRecordParser;

    public SLSSourceFunctionAdapter(SlsSourceFunction source, RecordResolver<SourceRecord, RowData> parser) {
        this.slsSourceFunction = source;
        this.slsRecordParser = parser;
    }

    public void setRuntimeContext(RuntimeContext t) {
        super.setRuntimeContext(t);
        this.slsSourceFunction.setRuntimeContext(t);
    }

    public void open(Configuration parameters) throws Exception {
        this.slsSourceFunction.open(parameters);
        this.slsRecordParser.open(new FunctionContext(this.getRuntimeContext()));
    }

    public TypeInformation<RowData> getProducedType() {
        return this.slsRecordParser.getProducedType();
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.slsSourceFunction.snapshotState(functionSnapshotContext);
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.slsSourceFunction.initializeState(functionInitializationContext);
    }

    public void run(SourceFunction.SourceContext<RowData> sourceContext) throws Exception {
        SourceContextWrapper sourceContextWrapper = new SourceContextWrapper(sourceContext, this.slsRecordParser);
        this.slsSourceFunction.run(sourceContextWrapper);
    }

    public void cancel() {
        this.slsSourceFunction.cancel();
    }

    public void close() throws IOException {
        try {
            this.slsSourceFunction.close();
        }
        finally {
            this.slsRecordParser.close();
        }
    }

    public static class SourceContextWrapper
    implements SourceFunction.SourceContext<SourceRecord> {
        private SourceFunction.SourceContext<RowData> sourceContext;
        private RecordResolver<SourceRecord, RowData> parser;
        private Collector<RowData> recordEntryCollector;
        private SourceRecordWithTimeStampCollector recordEntryWithTimestampCollector;

        public SourceContextWrapper(final SourceFunction.SourceContext<RowData> sourceContext, RecordResolver<SourceRecord, RowData> parser) {
            this.sourceContext = sourceContext;
            this.parser = parser;
            this.recordEntryCollector = new Collector<RowData>(){

                public void collect(RowData r) {
                    sourceContext.collect((Object)r);
                }

                public void close() {
                }
            };
            this.recordEntryWithTimestampCollector = new SourceRecordWithTimeStampCollector(sourceContext);
        }

        public void collect(SourceRecord recordEntry) {
            this.parser.parse(recordEntry, this.recordEntryCollector);
        }

        public void collectWithTimestamp(SourceRecord recordEntry, long l) {
            this.recordEntryWithTimestampCollector.setTimestamp(l);
            this.parser.parse(recordEntry, this.recordEntryWithTimestampCollector);
        }

        public void emitWatermark(Watermark watermark) {
            this.sourceContext.emitWatermark(watermark);
        }

        public void markAsTemporarilyIdle() {
            this.sourceContext.markAsTemporarilyIdle();
        }

        public Object getCheckpointLock() {
            return this.sourceContext.getCheckpointLock();
        }

        public void close() {
            this.sourceContext.close();
        }
    }

    public static class SourceRecordWithTimeStampCollector
    implements Collector<RowData> {
        private SourceFunction.SourceContext<RowData> sourceContext;
        private long timestamp = 0L;

        public SourceRecordWithTimeStampCollector(SourceFunction.SourceContext<RowData> sourceContext) {
            this.sourceContext = sourceContext;
        }

        public void setTimestamp(long t) {
            this.timestamp = t;
        }

        public void collect(RowData data) {
            this.sourceContext.collectWithTimestamp((Object)data, this.timestamp);
        }

        public void close() {
        }
    }
}

