/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.flink.sink.AppendOnlySingleTableCompactionWorkerOperator;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.NoopCommittableStateManager;
import org.apache.paimon.flink.sink.StoreCommitter;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;

public class AppendTableCompactSink
extends FlinkSink<AppendCompactTask> {
    private final FileStoreTable table;
    private final boolean isStreaming;

    public AppendTableCompactSink(FileStoreTable table, boolean isStreaming) {
        super(table, true);
        this.table = table;
        this.isStreaming = isStreaming;
    }

    public static DataStreamSink<?> sink(FileStoreTable table, DataStream<AppendCompactTask> input) {
        boolean isStreaming = AppendTableCompactSink.isStreaming(input);
        return new AppendTableCompactSink(table, isStreaming).sinkFrom(input);
    }

    @Override
    protected OneInputStreamOperatorFactory<AppendCompactTask, Committable> createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) {
        return new AppendOnlySingleTableCompactionWorkerOperator.Factory(this.table, commitUser, this.isStreaming);
    }

    @Override
    protected Committer.Factory<Committable, ManifestCommittable> createCommitterFactory() {
        return context -> new StoreCommitter(this.table, this.table.newCommit(context.commitUser()), context);
    }

    @Override
    protected CommittableStateManager<ManifestCommittable> createCommittableStateManager() {
        return new NoopCommittableStateManager();
    }
}

