/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.partition;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.sink.partition.PartitionListener;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.PartitionHandler;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;
import org.apache.paimon.utils.PartitionStatisticsReporter;

public class ReportPartStatsListener
implements PartitionListener {
    private static final ListStateDescriptor<Map<String, Long>> PENDING_REPORT_STATE_DESC = new ListStateDescriptor("pending-report-hms-partition", (TypeSerializer)new MapSerializer((TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)LongSerializer.INSTANCE));
    private final InternalRowPartitionComputer partitionComputer;
    private final PartitionStatisticsReporter partitionStatisticsReporter;
    private final ListState<Map<String, Long>> pendingPartitionsState;
    private final Map<String, Long> pendingPartitions;
    private final long idleTime;

    private ReportPartStatsListener(InternalRowPartitionComputer partitionComputer, PartitionStatisticsReporter partitionStatisticsReporter, OperatorStateStore store, boolean isRestored, long idleTime) throws Exception {
        Iterator it;
        this.partitionComputer = partitionComputer;
        this.partitionStatisticsReporter = partitionStatisticsReporter;
        this.pendingPartitionsState = store.getListState(PENDING_REPORT_STATE_DESC);
        this.pendingPartitions = new HashMap<String, Long>();
        if (isRestored && (it = ((Iterable)this.pendingPartitionsState.get()).iterator()).hasNext()) {
            Map state = (Map)it.next();
            this.pendingPartitions.putAll(state);
        }
        this.idleTime = idleTime;
    }

    @Override
    public void notifyCommittable(List<ManifestCommittable> committables) {
        HashSet<String> partition = new HashSet<String>();
        boolean endInput = false;
        for (ManifestCommittable committable : committables) {
            for (CommitMessage commitMessage : committable.fileCommittables()) {
                CommitMessageImpl message = (CommitMessageImpl)commitMessage;
                if (message.newFilesIncrement().isEmpty() && message.compactIncrement().isEmpty()) continue;
                partition.add(PartitionPathUtils.generatePartitionPath(this.partitionComputer.generatePartValues(message.partition())));
            }
            if (committable.identifier() != Long.MAX_VALUE) continue;
            endInput = true;
        }
        long current = System.currentTimeMillis();
        partition.forEach(p -> this.pendingPartitions.put((String)p, current));
        try {
            Map<String, Long> partitions = this.reportPartition(endInput);
            for (Map.Entry<String, Long> entry : partitions.entrySet()) {
                this.partitionStatisticsReporter.report(entry.getKey(), entry.getValue());
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Map<String, Long> reportPartition(boolean endInput) {
        if (endInput) {
            return this.pendingPartitions;
        }
        Iterator<Map.Entry<String, Long>> iterator2 = this.pendingPartitions.entrySet().iterator();
        HashMap<String, Long> result = new HashMap<String, Long>();
        long current = System.currentTimeMillis();
        while (iterator2.hasNext()) {
            Map.Entry<String, Long> entry = iterator2.next();
            if (current - entry.getValue() <= this.idleTime) continue;
            result.put(entry.getKey(), entry.getValue());
            iterator2.remove();
        }
        return result;
    }

    @Override
    public void snapshotState() throws Exception {
        this.pendingPartitionsState.update(Collections.singletonList(this.pendingPartitions));
    }

    public static Optional<ReportPartStatsListener> create(boolean isRestored, OperatorStateStore stateStore, FileStoreTable table) throws Exception {
        CoreOptions coreOptions = table.coreOptions();
        Options options = coreOptions.toConfiguration();
        if (options.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis() <= 0L) {
            return Optional.empty();
        }
        if (table.partitionKeys().isEmpty()) {
            return Optional.empty();
        }
        if (!coreOptions.partitionedTableInMetastore()) {
            return Optional.empty();
        }
        PartitionHandler partitionHandler = table.catalogEnvironment().partitionHandler();
        if (partitionHandler == null) {
            return Optional.empty();
        }
        InternalRowPartitionComputer partitionComputer = new InternalRowPartitionComputer(coreOptions.partitionDefaultName(), table.schema().logicalPartitionType(), table.partitionKeys().toArray(new String[0]), coreOptions.legacyPartitionName());
        return Optional.of(new ReportPartStatsListener(partitionComputer, new PartitionStatisticsReporter(table, partitionHandler), stateStore, isRestored, options.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis()));
    }

    @Override
    public void close() throws IOException {
        if (this.partitionStatisticsReporter != null) {
            this.partitionStatisticsReporter.close();
        }
    }
}

