/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.utils.CloseableIterator;

public class ReadOperator
extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<Split, RowData> {
    private static final long serialVersionUID = 1L;
    private final ReadBuilder readBuilder;
    private transient TableRead read;
    private transient StreamRecord<RowData> reuseRecord;
    private transient FlinkRowData reuseRow;
    private transient IOManager ioManager;
    private transient FileStoreSourceReaderMetrics sourceReaderMetrics;

    public ReadOperator(ReadBuilder readBuilder) {
        this.readBuilder = readBuilder;
    }

    public void open() throws Exception {
        super.open();
        this.sourceReaderMetrics = new FileStoreSourceReaderMetrics((MetricGroup)this.getMetricGroup());
        this.getMetricGroup().gauge("currentEmitEventTimeLag", () -> {
            long eventTime = this.sourceReaderMetrics.getLatestFileCreationTime();
            if (eventTime == -1L) {
                return -1L;
            }
            return System.currentTimeMillis() - eventTime;
        });
        this.ioManager = IOManager.create(this.getContainingTask().getEnvironment().getIOManager().getSpillingDirectoriesPaths());
        this.read = this.readBuilder.newRead().withIOManager(this.ioManager);
        this.reuseRow = new FlinkRowData(null);
        this.reuseRecord = new StreamRecord((Object)this.reuseRow);
    }

    public void processElement(StreamRecord<Split> record) throws Exception {
        Split split = (Split)record.getValue();
        long eventTime = ((DataSplit)split).getLatestFileCreationEpochMillis().orElse(-1L);
        this.sourceReaderMetrics.recordSnapshotUpdate(eventTime);
        try (CloseableIterator<InternalRow> iterator = this.read.createReader(split).toCloseableIterator();){
            while (iterator.hasNext()) {
                this.reuseRow.replace((InternalRow)iterator.next());
                this.output.collect(this.reuseRecord);
            }
        }
    }

    public void close() throws Exception {
        super.close();
        if (this.ioManager != null) {
            this.ioManager.close();
        }
    }
}

