/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.compact.UnawareBucketNewFilesCompactionCoordinatorOperator;
import org.apache.paimon.flink.compact.UnawareBucketNewFilesCompactionWorkerOperator;
import org.apache.paimon.flink.sink.AppendBypassCompactWorkerOperator;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.flink.source.AppendBypassCoordinateOperatorFactory;
import org.apache.paimon.flink.utils.ParallelismUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;

public abstract class UnawareBucketSink<T>
extends FlinkWriteSink<T> {
    protected final FileStoreTable table;
    protected final LogSinkFunction logSinkFunction;
    @Nullable
    protected final Integer parallelism;

    public UnawareBucketSink(FileStoreTable table, @Nullable Map<String, String> overwritePartitions, LogSinkFunction logSinkFunction, @Nullable Integer parallelism) {
        super(table, overwritePartitions);
        this.table = table;
        this.logSinkFunction = logSinkFunction;
        this.parallelism = parallelism;
    }

    @Override
    public DataStream<Committable> doWrite(DataStream<T> input, String initialCommitUser, @Nullable Integer parallelism) {
        boolean isStreamingMode;
        SingleOutputStreamOperator written = super.doWrite(input, initialCommitUser, this.parallelism);
        Options options = new Options(this.table.options());
        if (options.get(FlinkConnectorOptions.PRECOMMIT_COMPACT).booleanValue()) {
            SingleOutputStreamOperator newWritten = written.transform("New Files Compact Coordinator: " + this.table.name(), (TypeInformation)new EitherTypeInfo((TypeInformation)new CommittableTypeInfo(), (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, new CompactionTaskTypeInfo()})), (OneInputStreamOperator)new UnawareBucketNewFilesCompactionCoordinatorOperator(this.table.coreOptions())).startNewChain().forceNonParallel().transform("New Files Compact Worker: " + this.table.name(), (TypeInformation)new CommittableTypeInfo(), (OneInputStreamOperator)new UnawareBucketNewFilesCompactionWorkerOperator(this.table)).startNewChain();
            ParallelismUtils.forwardParallelism(newWritten, written);
            written = newWritten;
        }
        boolean enableCompaction = !this.table.coreOptions().writeOnly();
        boolean bl = isStreamingMode = input.getExecutionEnvironment().getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        if (enableCompaction && isStreamingMode) {
            SingleOutputStreamOperator newWritten = written.transform("Compact Coordinator: " + this.table.name(), (TypeInformation)new EitherTypeInfo((TypeInformation)new CommittableTypeInfo(), (TypeInformation)new CompactionTaskTypeInfo()), new AppendBypassCoordinateOperatorFactory(this.table)).startNewChain().forceNonParallel().transform("Compact Worker: " + this.table.name(), (TypeInformation)new CommittableTypeInfo(), (OneInputStreamOperatorFactory)new AppendBypassCompactWorkerOperator.Factory(this.table, initialCommitUser)).startNewChain();
            ParallelismUtils.setParallelism(newWritten, written.getParallelism(), false);
            written = newWritten;
        }
        return written;
    }
}

