/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.table.FileStoreTable;

public abstract class UnawareBucketSink<T>
extends FlinkWriteSink<T> {
    protected final FileStoreTable table;
    protected final LogSinkFunction logSinkFunction;
    @Nullable
    protected final Integer parallelism;
    protected final boolean boundedInput;

    public UnawareBucketSink(FileStoreTable table, @Nullable Map<String, String> overwritePartitions, LogSinkFunction logSinkFunction, @Nullable Integer parallelism, boolean boundedInput) {
        super(table, overwritePartitions);
        this.table = table;
        this.logSinkFunction = logSinkFunction;
        this.parallelism = parallelism;
        this.boundedInput = boundedInput;
    }

    @Override
    public DataStream<Committable> doWrite(DataStream<T> input, String initialCommitUser, @Nullable Integer parallelism) {
        boolean isStreamingMode;
        DataStream written = super.doWrite(input, initialCommitUser, this.parallelism);
        boolean enableCompaction = !this.table.coreOptions().writeOnly();
        boolean bl = isStreamingMode = input.getExecutionEnvironment().getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        if (enableCompaction && isStreamingMode && !this.boundedInput) {
            UnawareBucketCompactionTopoBuilder builder = new UnawareBucketCompactionTopoBuilder(input.getExecutionEnvironment(), this.table.name(), this.table);
            builder.withContinuousMode(true);
            written = written.union(new DataStream[]{builder.fetchUncommitted(initialCommitUser)});
        }
        return written;
    }
}

