/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.paimon.flink.sink.AppendBypassCompactWorkerOperator;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.flink.source.AppendBypassCoordinateOperatorFactory;
import org.apache.paimon.table.FileStoreTable;

public abstract class UnawareBucketSink<T>
extends FlinkWriteSink<T> {
    protected final FileStoreTable table;
    protected final LogSinkFunction logSinkFunction;
    @Nullable
    protected final Integer parallelism;

    public UnawareBucketSink(FileStoreTable table, @Nullable Map<String, String> overwritePartitions, LogSinkFunction logSinkFunction, @Nullable Integer parallelism) {
        super(table, overwritePartitions);
        this.table = table;
        this.logSinkFunction = logSinkFunction;
        this.parallelism = parallelism;
    }

    @Override
    public DataStream<Committable> doWrite(DataStream<T> input, String initialCommitUser, @Nullable Integer parallelism) {
        boolean isStreamingMode;
        SingleOutputStreamOperator written = super.doWrite(input, initialCommitUser, this.parallelism);
        boolean enableCompaction = !this.table.coreOptions().writeOnly();
        boolean bl = isStreamingMode = input.getExecutionEnvironment().getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        if (enableCompaction && isStreamingMode) {
            written = written.transform("Compact Coordinator: " + this.table.name(), (TypeInformation)new EitherTypeInfo((TypeInformation)new CommittableTypeInfo(), (TypeInformation)new CompactionTaskTypeInfo()), new AppendBypassCoordinateOperatorFactory(this.table)).startNewChain().forceNonParallel().transform("Compact Worker: " + this.table.name(), (TypeInformation)new CommittableTypeInfo(), (OneInputStreamOperatorFactory)new AppendBypassCompactWorkerOperator.Factory(this.table, initialCommitUser)).startNewChain().setParallelism(written.getParallelism());
        }
        return written;
    }
}

