/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.append.MultiTableAppendCompactTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiTableAppendCompactReadOperator
extends AbstractStreamOperator<MultiTableAppendCompactTask>
implements OneInputStreamOperator<MultiTableAppendCompactTask, MultiTableAppendCompactTask> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(MultiTableAppendCompactReadOperator.class);
    private final CatalogLoader catalogLoader;
    private final Duration partitionIdleTime;
    private transient Catalog catalog;
    private transient Map<Identifier, FileStoreTable> tablesMap;

    public MultiTableAppendCompactReadOperator(CatalogLoader catalogLoader, Duration partitionIdleTime) {
        this.catalogLoader = catalogLoader;
        this.partitionIdleTime = partitionIdleTime;
    }

    public void open() throws Exception {
        super.open();
        this.tablesMap = new HashMap<Identifier, FileStoreTable>();
        this.catalog = this.catalogLoader.load();
    }

    public void processElement(StreamRecord<MultiTableAppendCompactTask> record) {
        FileStoreTable table;
        Map<BinaryRow, Long> partitionInfo;
        Identifier identifier = ((MultiTableAppendCompactTask)record.getValue()).tableIdentifier();
        BinaryRow partition = ((MultiTableAppendCompactTask)record.getValue()).partition();
        if (this.checkIsHistoryPartition(partition, partitionInfo = this.getPartitionInfo(table = this.getTable(identifier)))) {
            this.output.collect(record);
        }
    }

    private FileStoreTable getTable(Identifier tableId) {
        FileStoreTable table = this.tablesMap.get(tableId);
        if (table == null) {
            try {
                Table newTable = this.catalog.getTable(tableId);
                Preconditions.checkArgument(newTable instanceof FileStoreTable, "Only FileStoreTable supports compact action. The table type is '%s'.", newTable.getClass().getName());
                table = (FileStoreTable)newTable;
                this.tablesMap.put(tableId, table);
            }
            catch (Catalog.TableNotExistException e) {
                LOG.error(String.format("table: %s not found.", tableId.getFullName()));
            }
        }
        return table;
    }

    private Map<BinaryRow, Long> getPartitionInfo(FileStoreTable table) {
        List<PartitionEntry> partitions = table.newSnapshotReader().partitionEntries();
        return partitions.stream().collect(Collectors.toMap(PartitionEntry::partition, PartitionEntry::lastFileCreationTime));
    }

    private boolean checkIsHistoryPartition(BinaryRow partition, Map<BinaryRow, Long> partitionInfo) {
        long historyMilli = LocalDateTime.now().minus(this.partitionIdleTime).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
        return partitionInfo.get(partition) <= historyMilli;
    }

    public void close() throws Exception {
        super.close();
        if (this.catalog != null) {
            this.catalog.close();
        }
    }
}

