/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.service;

import java.util.ArrayList;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.system.FileMonitorTable;
import org.apache.paimon.utils.SerializationUtils;

public class QueryFileMonitor
extends RichSourceFunction<InternalRow> {
    private static final long serialVersionUID = 1L;
    private final Table table;
    private final long monitorInterval;
    private transient SourceFunction.SourceContext<InternalRow> ctx;
    private transient StreamTableScan scan;
    private transient TableRead read;
    private volatile boolean isRunning = true;

    public QueryFileMonitor(Table table) {
        this.table = table;
        this.monitorInterval = Options.fromMap(table.options()).get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
    }

    public void open(Configuration parameters) throws Exception {
        FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable)this.table);
        ReadBuilder readBuilder = monitorTable.newReadBuilder();
        this.scan = readBuilder.newStreamScan();
        this.read = readBuilder.newRead();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<InternalRow> ctx) throws Exception {
        this.ctx = ctx;
        while (this.isRunning) {
            boolean isEmpty;
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                if (!this.isRunning) {
                    return;
                }
                isEmpty = this.doScan();
            }
            if (!isEmpty) continue;
            Thread.sleep(this.monitorInterval);
        }
    }

    private boolean doScan() throws Exception {
        ArrayList records = new ArrayList();
        this.read.createReader(this.scan.plan()).forEachRemaining(records::add);
        records.forEach(arg_0 -> this.ctx.collect(arg_0));
        return records.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        if (this.ctx != null) {
            Object object = this.ctx.getCheckpointLock();
            synchronized (object) {
                this.isRunning = false;
            }
        } else {
            this.isRunning = false;
        }
    }

    public static DataStream<InternalRow> build(StreamExecutionEnvironment env, Table table) {
        return env.addSource((SourceFunction)new QueryFileMonitor(table), "FileMonitor-" + table.name(), InternalTypeInfo.fromRowType(FileMonitorTable.getRowType()));
    }

    public static ChannelComputer<InternalRow> createChannelComputer() {
        return new FileMonitorChannelComputer();
    }

    private static class FileMonitorChannelComputer
    implements ChannelComputer<InternalRow> {
        private int numChannels;

        private FileMonitorChannelComputer() {
        }

        @Override
        public void setup(int numChannels) {
            this.numChannels = numChannels;
        }

        @Override
        public int channel(InternalRow row) {
            BinaryRow partition = SerializationUtils.deserializeBinaryRow(row.getBinary(1));
            int bucket = row.getInt(2);
            return ChannelComputer.select(partition, bucket, this.numChannels);
        }
    }
}

