/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.flink.sink.NoopStoreSinkWriteState;
import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
import org.apache.paimon.flink.sink.RestoreCommittableStateManager;
import org.apache.paimon.flink.sink.RowDataStoreWriteOperator;
import org.apache.paimon.flink.sink.StoreCommitter;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.TableCommitImpl;

public abstract class FlinkWriteSink<T>
extends FlinkSink<T> {
    private static final long serialVersionUID = 1L;
    @Nullable
    private final Map<String, String> overwritePartition;

    public FlinkWriteSink(FileStoreTable table, @Nullable Map<String, String> overwritePartition) {
        super(table, overwritePartition != null);
        this.overwritePartition = overwritePartition;
    }

    @Override
    protected Committer.Factory<Committable, ManifestCommittable> createCommitterFactory() {
        return context -> new StoreCommitter(this.table, ((TableCommitImpl)this.table.newCommit(context.commitUser()).withOverwrite((Map)this.overwritePartition)).ignoreEmptyCommit(!context.streamingCheckpointEnabled()), context);
    }

    @Override
    protected CommittableStateManager<ManifestCommittable> createCommittableStateManager() {
        Options options = this.table.coreOptions().toConfiguration();
        return new RestoreAndFailCommittableStateManager<ManifestCommittable>(ManifestCommittableSerializer::new, options.get(FlinkConnectorOptions.PARTITION_MARK_DONE_RECOVER_FROM_STATE));
    }

    protected static OneInputStreamOperatorFactory<InternalRow, Committable> createNoStateRowWriteOperatorFactory(FileStoreTable table, final LogSinkFunction logSinkFunction, final StoreSinkWrite.Provider writeProvider, final String commitUser) {
        return new RowDataStoreWriteOperator.Factory(table, logSinkFunction, writeProvider, commitUser){

            public StreamOperator createStreamOperator(StreamOperatorParameters parameters) {
                return new RowDataStoreWriteOperator(parameters, this.table, logSinkFunction, writeProvider, commitUser){

                    @Override
                    protected StoreSinkWriteState createState(int subtaskId, StateInitializationContext context, StoreSinkWriteState.StateValueFilter stateFilter) {
                        return new NoopStoreSinkWriteState(subtaskId, stateFilter);
                    }

                    @Override
                    protected String getCommitUser(StateInitializationContext context) throws Exception {
                        return commitUser;
                    }
                };
            }
        };
    }

    protected static CommittableStateManager<ManifestCommittable> createRestoreOnlyCommittableStateManager(FileStoreTable table) {
        Options options = table.coreOptions().toConfiguration();
        return new RestoreCommittableStateManager<ManifestCommittable>(ManifestCommittableSerializer::new, options.get(FlinkConnectorOptions.PARTITION_MARK_DONE_RECOVER_FROM_STATE));
    }
}

