/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.flink.sink.StoreSinkWriteStateImpl;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializationUtils;

public class StoreCompactOperator
extends PrepareCommitOperator<RowData, Committable> {
    private final FileStoreTable table;
    private final StoreSinkWrite.Provider storeSinkWriteProvider;
    private final String initialCommitUser;
    private final boolean fullCompaction;
    private transient StoreSinkWriteState state;
    private transient StoreSinkWrite write;
    private transient DataFileMetaSerializer dataFileMetaSerializer;
    private transient Set<Pair<BinaryRow, Integer>> waitToCompact;

    private StoreCompactOperator(StreamOperatorParameters<Committable> parameters, FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser, boolean fullCompaction) {
        super(parameters, Options.fromMap(table.options()));
        Preconditions.checkArgument(!table.coreOptions().writeOnly(), CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator.");
        this.table = table;
        this.storeSinkWriteProvider = storeSinkWriteProvider;
        this.initialCommitUser = initialCommitUser;
        this.fullCompaction = fullCompaction;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        String commitUser = StateUtils.getSingleValueFromState(context, "commit_user_state", String.class, this.initialCommitUser);
        this.state = new StoreSinkWriteStateImpl(context, (tableName, partition, bucket) -> ChannelComputer.select(partition, bucket, RuntimeContextUtils.getNumberOfParallelSubtasks((RuntimeContext)this.getRuntimeContext())) == RuntimeContextUtils.getIndexOfThisSubtask((RuntimeContext)this.getRuntimeContext()));
        this.write = this.storeSinkWriteProvider.provide(this.table, commitUser, this.state, this.getContainingTask().getEnvironment().getIOManager(), this.memoryPool, (MetricGroup)this.getMetricGroup());
    }

    public void open() throws Exception {
        super.open();
        this.dataFileMetaSerializer = new DataFileMetaSerializer();
        this.waitToCompact = new LinkedHashSet<Pair<BinaryRow, Integer>>();
    }

    public void processElement(StreamRecord<RowData> element) throws Exception {
        RowData record = (RowData)element.getValue();
        long snapshotId = record.getLong(0);
        BinaryRow partition = SerializationUtils.deserializeBinaryRow(record.getBinary(1));
        int bucket = record.getInt(2);
        byte[] serializedFiles = record.getBinary(3);
        List<DataFileMeta> files = this.dataFileMetaSerializer.deserializeList(serializedFiles);
        if (this.write.streamingMode()) {
            this.write.notifyNewFiles(snapshotId, partition, bucket, files);
        } else {
            Preconditions.checkArgument(files.isEmpty(), "Batch compact job does not concern what files are compacted. They only need to know what buckets are compacted.");
        }
        this.waitToCompact.add(Pair.of(partition, bucket));
    }

    @Override
    protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) throws IOException {
        try {
            for (Pair<BinaryRow, Integer> partitionBucket : this.waitToCompact) {
                this.write.compact(partitionBucket.getKey(), partitionBucket.getRight(), this.fullCompaction);
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Exception happens while executing compaction.", e);
        }
        this.waitToCompact.clear();
        return this.write.prepareCommit(waitCompaction, checkpointId);
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.write.snapshotState();
        this.state.snapshotState();
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.write.close();
    }

    public static class Factory
    extends PrepareCommitOperator.Factory<RowData, Committable> {
        private final FileStoreTable table;
        private final StoreSinkWrite.Provider storeSinkWriteProvider;
        private final String initialCommitUser;
        private final boolean fullCompaction;

        public Factory(FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser, boolean fullCompaction) {
            super(Options.fromMap(table.options()));
            Preconditions.checkArgument(!table.coreOptions().writeOnly(), CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator.");
            this.table = table;
            this.storeSinkWriteProvider = storeSinkWriteProvider;
            this.initialCommitUser = initialCommitUser;
            this.fullCompaction = fullCompaction;
        }

        public <T extends StreamOperator<Committable>> T createStreamOperator(StreamOperatorParameters<Committable> parameters) {
            return (T)((Object)new StoreCompactOperator(parameters, this.table, this.storeSinkWriteProvider, this.initialCommitUser, this.fullCompaction));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return StoreCompactOperator.class;
        }
    }
}

