/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.List;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.sink.ChannelComputer;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;

public abstract class TableWriteOperator<IN>
extends PrepareCommitOperator<IN, Committable> {
    protected FileStoreTable table;
    private final StoreSinkWrite.Provider storeSinkWriteProvider;
    private final String initialCommitUser;
    private transient StoreSinkWriteState state;
    protected transient StoreSinkWrite write;

    public TableWriteOperator(FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
        super(Options.fromMap(table.options()));
        this.table = table;
        this.storeSinkWriteProvider = storeSinkWriteProvider;
        this.initialCommitUser = initialCommitUser;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        String commitUser = StateUtils.getSingleValueFromState(context, "commit_user_state", String.class, this.initialCommitUser);
        boolean containLogSystem = this.containLogSystem();
        int numTasks = this.getRuntimeContext().getNumberOfParallelSubtasks();
        StoreSinkWriteState.StateValueFilter stateFilter = (tableName, partition, bucket) -> {
            int task = containLogSystem ? ChannelComputer.select(bucket, numTasks) : ChannelComputer.select(partition, bucket, numTasks);
            return task == this.getRuntimeContext().getIndexOfThisSubtask();
        };
        this.initStateAndWriter(context, stateFilter, this.getContainingTask().getEnvironment().getIOManager(), commitUser);
    }

    @VisibleForTesting
    void initStateAndWriter(StateInitializationContext context, StoreSinkWriteState.StateValueFilter stateFilter, IOManager ioManager, String commitUser) throws Exception {
        this.state = new StoreSinkWriteState(context, stateFilter);
        this.write = this.storeSinkWriteProvider.provide(this.table, commitUser, this.state, ioManager, this.memoryPool, (MetricGroup)this.getMetricGroup());
    }

    protected abstract boolean containLogSystem();

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.write.snapshotState();
        this.state.snapshotState();
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.write != null) {
            this.write.close();
        }
    }

    @Override
    protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) throws IOException {
        return this.write.prepareCommit(waitCompaction, checkpointId);
    }

    @VisibleForTesting
    public StoreSinkWrite getWrite() {
        return this.write;
    }
}

