/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.AsyncLookupSinkWrite;
import org.apache.paimon.flink.sink.GlobalFullCompactionSinkWrite;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.flink.sink.StoreSinkWriteStateImpl;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializationUtils;

public class MultiTablesStoreCompactOperator
extends PrepareCommitOperator<RowData, MultiTableCommittable> {
    private static final long serialVersionUID = 1L;
    private StoreSinkWrite.Provider storeSinkWriteProvider;
    private final CheckpointConfig checkpointConfig;
    private final boolean isStreaming;
    private final boolean ignorePreviousFiles;
    private final boolean fullCompaction;
    private final String initialCommitUser;
    private transient StoreSinkWriteState state;
    private transient DataFileMetaSerializer dataFileMetaSerializer;
    private final CatalogLoader catalogLoader;
    protected Catalog catalog;
    protected Map<Identifier, FileStoreTable> tables;
    protected Map<Identifier, StoreSinkWrite> writes;
    protected String commitUser;

    private MultiTablesStoreCompactOperator(StreamOperatorParameters<MultiTableCommittable> parameters, CatalogLoader catalogLoader, String initialCommitUser, CheckpointConfig checkpointConfig, boolean isStreaming, boolean ignorePreviousFiles, boolean fullCompaction, Options options) {
        super(parameters, options);
        this.catalogLoader = catalogLoader;
        this.initialCommitUser = initialCommitUser;
        this.checkpointConfig = checkpointConfig;
        this.isStreaming = isStreaming;
        this.ignorePreviousFiles = ignorePreviousFiles;
        this.fullCompaction = fullCompaction;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.catalog = this.catalogLoader.load();
        this.commitUser = StateUtils.getSingleValueFromState(context, "commit_user_state", String.class, this.initialCommitUser);
        this.state = new StoreSinkWriteStateImpl(RuntimeContextUtils.getIndexOfThisSubtask((RuntimeContext)this.getRuntimeContext()), context, (tableName, partition, bucket) -> ChannelComputer.select(partition, bucket, RuntimeContextUtils.getNumberOfParallelSubtasks((RuntimeContext)this.getRuntimeContext())) == RuntimeContextUtils.getIndexOfThisSubtask((RuntimeContext)this.getRuntimeContext()));
        this.tables = new HashMap<Identifier, FileStoreTable>();
        this.writes = new HashMap<Identifier, StoreSinkWrite>();
    }

    public void open() throws Exception {
        super.open();
        this.dataFileMetaSerializer = new DataFileMetaSerializer();
    }

    public void processElement(StreamRecord<RowData> element) throws Exception {
        String tableName;
        RowData record = (RowData)element.getValue();
        long snapshotId = record.getLong(0);
        BinaryRow partition = SerializationUtils.deserializeBinaryRow(record.getBinary(1));
        int bucket = record.getInt(2);
        byte[] serializedFiles = record.getBinary(3);
        List<DataFileMeta> files = this.dataFileMetaSerializer.deserializeList(serializedFiles);
        String databaseName = record.getString(4).toString();
        Identifier tableId = Identifier.create(databaseName, tableName = record.getString(5).toString());
        FileStoreTable table = this.getTable(tableId);
        Preconditions.checkArgument(!table.coreOptions().writeOnly(), CoreOptions.WRITE_ONLY.key() + " should not be true for MultiTablesStoreCompactOperator.");
        this.storeSinkWriteProvider = this.createWriteProvider(table, this.checkpointConfig, this.isStreaming, this.ignorePreviousFiles);
        StoreSinkWrite write = this.writes.computeIfAbsent(tableId, id -> this.storeSinkWriteProvider.provide(table, this.commitUser, this.state, this.getContainingTask().getEnvironment().getIOManager(), this.memoryPool, (MetricGroup)this.getMetricGroup()));
        if (write.streamingMode()) {
            write.notifyNewFiles(snapshotId, partition, bucket, files);
            write.compact(partition, bucket, false);
        } else {
            Preconditions.checkArgument(files.isEmpty(), "Batch compact job does not concern what files are compacted. They only need to know what buckets are compacted.");
            write.compact(partition, bucket, this.fullCompaction);
        }
    }

    @Override
    protected List<MultiTableCommittable> prepareCommit(boolean waitCompaction, long checkpointId) throws IOException {
        LinkedList<MultiTableCommittable> committables = new LinkedList<MultiTableCommittable>();
        for (Map.Entry<Identifier, StoreSinkWrite> entry : this.writes.entrySet()) {
            Identifier key = entry.getKey();
            StoreSinkWrite write = entry.getValue();
            committables.addAll(write.prepareCommit(waitCompaction, checkpointId).stream().map(committable -> MultiTableCommittable.fromCommittable(key, committable)).collect(Collectors.toList()));
        }
        return committables;
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        for (StoreSinkWrite write : this.writes.values()) {
            write.snapshotState();
        }
        this.state.snapshotState();
    }

    @Override
    public void close() throws Exception {
        super.close();
        for (StoreSinkWrite write : this.writes.values()) {
            write.close();
        }
        if (this.catalog != null) {
            this.catalog.close();
            this.catalog = null;
        }
    }

    private FileStoreTable getTable(Identifier tableId) throws InterruptedException {
        Table table = this.tables.get(tableId);
        if (table == null) {
            while (true) {
                try {
                    table = (FileStoreTable)this.catalog.getTable(tableId);
                    table = table.copy((Map)this.options.toMap());
                    HashMap<String, String> dynamicOptions = new HashMap<String, String>(){
                        {
                            this.put(CoreOptions.WRITE_ONLY.key(), "false");
                        }
                    };
                    if (this.isStreaming) {
                        dynamicOptions.put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
                        dynamicOptions.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
                        dynamicOptions.put(CoreOptions.LOOKUP_WAIT.key(), "false");
                    }
                    table = table.copy(dynamicOptions);
                    this.tables.put(tableId, (FileStoreTable)table);
                }
                catch (Catalog.TableNotExistException tableNotExistException) {
                    Thread.sleep(500L);
                    continue;
                }
                break;
            }
        }
        return table;
    }

    private StoreSinkWrite.Provider createWriteProvider(FileStoreTable fileStoreTable, CheckpointConfig checkpointConfig, boolean isStreaming, boolean ignorePreviousFiles) {
        boolean waitCompaction;
        Options options = fileStoreTable.coreOptions().toConfiguration();
        CoreOptions.ChangelogProducer changelogProducer = fileStoreTable.coreOptions().changelogProducer();
        CoreOptions coreOptions = fileStoreTable.coreOptions();
        if (coreOptions.writeOnly()) {
            waitCompaction = false;
        } else {
            waitCompaction = coreOptions.prepareCommitWaitCompaction();
            int deltaCommits = -1;
            if (options.contains(CoreOptions.FULL_COMPACTION_DELTA_COMMITS)) {
                deltaCommits = options.get(CoreOptions.FULL_COMPACTION_DELTA_COMMITS);
            } else if (options.contains(FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
                long fullCompactionThresholdMs = options.get(FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
                deltaCommits = (int)(fullCompactionThresholdMs / checkpointConfig.getCheckpointInterval());
            }
            if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) {
                int finalDeltaCommits = Math.max(deltaCommits, 1);
                return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> new GlobalFullCompactionSinkWrite(table, commitUser, state, ioManager, ignorePreviousFiles, waitCompaction, finalDeltaCommits, isStreaming, memoryPool, metricGroup);
            }
        }
        if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) {
            return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> new AsyncLookupSinkWrite(table, commitUser, state, ioManager, ignorePreviousFiles, waitCompaction, isStreaming, memoryPool, metricGroup);
        }
        return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl(table, commitUser, state, ioManager, ignorePreviousFiles, waitCompaction, isStreaming, memoryPool, metricGroup);
    }

    public static class Factory
    extends PrepareCommitOperator.Factory<RowData, MultiTableCommittable> {
        private final CatalogLoader catalogLoader;
        private final CheckpointConfig checkpointConfig;
        private final boolean isStreaming;
        private final boolean ignorePreviousFiles;
        private final boolean fullCompaction;
        private final String initialCommitUser;

        public Factory(CatalogLoader catalogLoader, String initialCommitUser, CheckpointConfig checkpointConfig, boolean isStreaming, boolean ignorePreviousFiles, boolean fullCompaction, Options options) {
            super(options);
            this.catalogLoader = catalogLoader;
            this.initialCommitUser = initialCommitUser;
            this.checkpointConfig = checkpointConfig;
            this.isStreaming = isStreaming;
            this.ignorePreviousFiles = ignorePreviousFiles;
            this.fullCompaction = fullCompaction;
        }

        public <T extends StreamOperator<MultiTableCommittable>> T createStreamOperator(StreamOperatorParameters<MultiTableCommittable> parameters) {
            return (T)((Object)new MultiTablesStoreCompactOperator(parameters, this.catalogLoader, this.initialCommitUser, this.checkpointConfig, this.isStreaming, this.ignorePreviousFiles, this.fullCompaction, this.options));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return MultiTablesStoreCompactOperator.class;
        }
    }
}

