/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.utils.MultiTablesCompactorUtil;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.system.CompactBucketsTable;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializationUtils;

public class MultiTablesReadOperator
extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<Tuple2<Split, String>, RowData> {
    private static final long serialVersionUID = 1L;
    private final CatalogLoader catalogLoader;
    private final boolean isStreaming;
    private Duration partitionIdleTime = null;
    private transient Catalog catalog;
    private transient IOManager ioManager;
    private transient Map<Identifier, CompactBucketsTable> tablesMap;
    private transient Map<Identifier, TableRead> readsMap;
    private transient StreamRecord<RowData> reuseRecord;
    private transient FlinkRowData reuseRow;

    public MultiTablesReadOperator(CatalogLoader catalogLoader, boolean isStreaming) {
        this.catalogLoader = catalogLoader;
        this.isStreaming = isStreaming;
    }

    public MultiTablesReadOperator(CatalogLoader catalogLoader, boolean isStreaming, Duration partitionIdleTime) {
        this.catalogLoader = catalogLoader;
        this.isStreaming = isStreaming;
        this.partitionIdleTime = partitionIdleTime;
    }

    public void open() throws Exception {
        super.open();
        this.ioManager = IOManager.create(this.getContainingTask().getEnvironment().getIOManager().getSpillingDirectoriesPaths());
        this.tablesMap = new HashMap<Identifier, CompactBucketsTable>();
        this.readsMap = new HashMap<Identifier, TableRead>();
        this.catalog = this.catalogLoader.load();
        this.reuseRow = new FlinkRowData(null);
        this.reuseRecord = new StreamRecord((Object)this.reuseRow);
        if (this.isStreaming) {
            Preconditions.checkArgument(this.partitionIdleTime == null, "Streaming mode does not support partitionIdleTime");
        }
    }

    public void processElement(StreamRecord<Tuple2<Split, String>> record) throws Exception {
        Identifier identifier = Identifier.fromString((String)((Tuple2)record.getValue()).f1);
        TableRead read = this.getTableRead(identifier);
        Map<BinaryRow, Long> partitionInfo = this.getPartitionInfo(this.tablesMap.get(identifier));
        try (CloseableIterator<InternalRow> iterator2 = read.createReader((Split)((Tuple2)record.getValue()).f0).toCloseableIterator();){
            if (this.partitionIdleTime == null) {
                while (iterator2.hasNext()) {
                    this.reuseRow.replace((InternalRow)iterator2.next());
                    this.output.collect(this.reuseRecord);
                }
            } else {
                while (iterator2.hasNext()) {
                    InternalRow row = (InternalRow)iterator2.next();
                    if (!this.checkIsHistoryPartition(row, partitionInfo)) continue;
                    this.reuseRow.replace(row);
                    this.output.collect(this.reuseRecord);
                }
            }
        }
    }

    private TableRead getTableRead(Identifier tableId) {
        Table table = this.tablesMap.get(tableId);
        if (table == null) {
            try {
                Table newTable = this.catalog.getTable(tableId);
                Preconditions.checkArgument(newTable instanceof FileStoreTable, "Only FileStoreTable supports compact action. The table type is '%s'.", newTable.getClass().getName());
                table = new CompactBucketsTable((FileStoreTable)newTable, this.isStreaming, tableId.getDatabaseName()).copy((Map)MultiTablesCompactorUtil.compactOptions(this.isStreaming));
                this.tablesMap.put(tableId, (CompactBucketsTable)table);
                this.readsMap.put(tableId, table.newReadBuilder().newRead().withIOManager(this.ioManager));
            }
            catch (Catalog.TableNotExistException e) {
                LOG.error(String.format("table: %s not found.", tableId.getFullName()));
            }
        }
        return this.readsMap.get(tableId);
    }

    private Map<BinaryRow, Long> getPartitionInfo(CompactBucketsTable table) {
        List<PartitionEntry> partitions = table.newSnapshotReader().partitionEntries();
        return partitions.stream().collect(Collectors.toMap(PartitionEntry::partition, PartitionEntry::lastFileCreationTime));
    }

    private boolean checkIsHistoryPartition(InternalRow row, Map<BinaryRow, Long> partitionInfo) {
        BinaryRow partition = SerializationUtils.deserializeBinaryRow(row.getBinary(1));
        long historyMilli = LocalDateTime.now().minus(this.partitionIdleTime).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
        return partitionInfo.get(partition) <= historyMilli;
    }

    public void close() throws Exception {
        super.close();
        if (this.ioManager != null) {
            this.ioManager.close();
        }
        if (this.catalog != null) {
            this.catalog.close();
        }
    }
}

