/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import javax.annotation.Nullable;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.utils.CloseableIterator;

public class ReadOperator
extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<Split, RowData> {
    private static final long serialVersionUID = 1L;
    private final ReadBuilder readBuilder;
    @Nullable
    private final NestedProjectedRowData nestedProjectedRowData;
    private transient TableRead read;
    private transient StreamRecord<RowData> reuseRecord;
    private transient FlinkRowData reuseRow;
    private transient IOManager ioManager;
    private transient FileStoreSourceReaderMetrics sourceReaderMetrics;
    private transient long emitEventTimeLag = -1L;
    private transient long idleStartTime = Long.MAX_VALUE;
    private transient Counter numRecordsIn;

    public ReadOperator(ReadBuilder readBuilder, @Nullable NestedProjectedRowData nestedProjectedRowData) {
        this.readBuilder = readBuilder;
        this.nestedProjectedRowData = nestedProjectedRowData;
    }

    public void open() throws Exception {
        super.open();
        this.sourceReaderMetrics = new FileStoreSourceReaderMetrics((MetricGroup)this.getMetricGroup());
        this.getMetricGroup().gauge("currentEmitEventTimeLag", () -> this.emitEventTimeLag);
        this.getMetricGroup().gauge("sourceIdleTime", this::getIdleTime);
        this.numRecordsIn = InternalSourceReaderMetricGroup.wrap((OperatorMetricGroup)this.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
        this.ioManager = IOManager.create(this.getContainingTask().getEnvironment().getIOManager().getSpillingDirectoriesPaths());
        this.read = this.readBuilder.newRead().withIOManager(this.ioManager);
        this.reuseRow = new FlinkRowData(null);
        this.reuseRecord = this.nestedProjectedRowData != null ? new StreamRecord((Object)this.nestedProjectedRowData) : new StreamRecord((Object)this.reuseRow);
        this.idlingStarted();
    }

    public void processElement(StreamRecord<Split> record) throws Exception {
        Split split = (Split)record.getValue();
        long eventTime = ((DataSplit)split).earliestFileCreationEpochMillis().orElse(-1L);
        this.sourceReaderMetrics.recordSnapshotUpdate(eventTime);
        this.idleStartTime = Long.MAX_VALUE;
        boolean firstRecord = true;
        try (CloseableIterator<InternalRow> iterator = this.read.createReader(split).toCloseableIterator();){
            while (iterator.hasNext()) {
                this.emitEventTimeLag = System.currentTimeMillis() - eventTime;
                if (firstRecord) {
                    firstRecord = false;
                } else {
                    this.numRecordsIn.inc();
                }
                this.reuseRow.replace((InternalRow)iterator.next());
                if (this.nestedProjectedRowData != null) {
                    this.nestedProjectedRowData.replaceRow(this.reuseRow);
                }
                this.output.collect(this.reuseRecord);
            }
        }
        this.idlingStarted();
    }

    public void close() throws Exception {
        super.close();
        if (this.ioManager != null) {
            this.ioManager.close();
        }
    }

    private void idlingStarted() {
        if (!this.isIdling()) {
            this.idleStartTime = System.currentTimeMillis();
        }
    }

    private boolean isIdling() {
        return this.idleStartTime != Long.MAX_VALUE;
    }

    private long getIdleTime() {
        return this.isIdling() ? System.currentTimeMillis() - this.idleStartTime : 0L;
    }
}

