/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact;

import java.util.Collections;
import java.util.HashMap;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.operation.AppendFileStoreWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;

public class AppendPreCommitCompactWorkerOperator
extends AbstractStreamOperator<Committable>
implements OneInputStreamOperator<Either<Committable, Tuple2<Long, AppendCompactTask>>, Committable> {
    private final FileStoreTable table;
    private transient AppendFileStoreWrite write;
    private transient FileStorePathFactory pathFactory;
    private transient FileIO fileIO;

    public AppendPreCommitCompactWorkerOperator(FileStoreTable table) {
        this.table = table;
    }

    public void open() throws Exception {
        super.open();
        this.write = (AppendFileStoreWrite)this.table.store().newWrite(null);
        this.pathFactory = this.table.store().pathFactory();
        this.fileIO = this.table.fileIO();
    }

    public void processElement(StreamRecord<Either<Committable, Tuple2<Long, AppendCompactTask>>> record) throws Exception {
        if (((Either)record.getValue()).isLeft()) {
            this.output.collect((Object)new StreamRecord(((Either)record.getValue()).left()));
        } else {
            long checkpointId = (Long)((Tuple2)((Either)record.getValue()).right()).f0;
            CommitMessage message = this.doCompact((AppendCompactTask)((Tuple2)((Either)record.getValue()).right()).f1);
            this.output.collect((Object)new StreamRecord((Object)new Committable(checkpointId, Committable.Kind.FILE, message)));
        }
    }

    private CommitMessage doCompact(AppendCompactTask task) throws Exception {
        CommitMessageImpl message = (CommitMessageImpl)task.doCompact(this.table, this.write);
        HashMap<String, DataFileMeta> toDelete = new HashMap<String, DataFileMeta>();
        for (DataFileMeta meta : message.compactIncrement().compactBefore()) {
            toDelete.put(meta.fileName(), meta);
        }
        for (DataFileMeta meta : message.compactIncrement().compactAfter()) {
            toDelete.remove(meta.fileName());
        }
        DataFilePathFactory dataFilePathFactory = this.pathFactory.createDataFilePathFactory(task.partition(), message.bucket());
        for (DataFileMeta meta : toDelete.values()) {
            this.fileIO.deleteQuietly(dataFilePathFactory.toPath(meta));
        }
        return new CommitMessageImpl(message.partition(), message.bucket(), message.totalBuckets(), new DataIncrement(message.compactIncrement().compactAfter(), Collections.emptyList(), Collections.emptyList()), CompactIncrement.emptyIncrement());
    }

    public void close() throws Exception {
        if (this.write != null) {
            this.write.close();
        }
    }
}

