/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.ProcessRecordAttributesUtil;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.flink.sink.StoreSinkWriteStateImpl;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;

public abstract class TableWriteOperator<IN>
extends PrepareCommitOperator<IN, Committable> {
    protected FileStoreTable table;
    private final StoreSinkWrite.Provider storeSinkWriteProvider;
    private final String initialCommitUser;
    private transient StoreSinkWriteState state;
    protected transient StoreSinkWrite write;

    public TableWriteOperator(StreamOperatorParameters<Committable> parameters, FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
        super(parameters, Options.fromMap(table.options()));
        this.table = table;
        this.storeSinkWriteProvider = storeSinkWriteProvider;
        this.initialCommitUser = initialCommitUser;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        boolean containLogSystem = this.containLogSystem();
        int numTasks = RuntimeContextUtils.getNumberOfParallelSubtasks((RuntimeContext)this.getRuntimeContext());
        StoreSinkWriteState.StateValueFilter stateFilter = (tableName, partition, bucket) -> {
            int task = containLogSystem ? ChannelComputer.select(bucket, numTasks) : ChannelComputer.select(partition, bucket, numTasks);
            return task == RuntimeContextUtils.getIndexOfThisSubtask((RuntimeContext)this.getRuntimeContext());
        };
        this.state = this.createState(context, stateFilter);
        this.write = this.storeSinkWriteProvider.provide(this.table, this.getCommitUser(context), this.state, this.getContainingTask().getEnvironment().getIOManager(), this.memoryPool, (MetricGroup)this.getMetricGroup());
    }

    protected StoreSinkWriteState createState(StateInitializationContext context, StoreSinkWriteState.StateValueFilter stateFilter) throws Exception {
        return new StoreSinkWriteStateImpl(context, stateFilter);
    }

    protected String getCommitUser(StateInitializationContext context) throws Exception {
        return StateUtils.getSingleValueFromState(context, "commit_user_state", String.class, this.initialCommitUser);
    }

    public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
        ProcessRecordAttributesUtil.processWithWrite(recordAttributes, this.write);
        super.processRecordAttributes(recordAttributes);
    }

    protected abstract boolean containLogSystem();

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.write.snapshotState();
        this.state.snapshotState();
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.write != null) {
            this.write.close();
        }
    }

    @Override
    protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) throws IOException {
        return this.write.prepareCommit(waitCompaction, checkpointId);
    }

    @VisibleForTesting
    public StoreSinkWrite getWrite() {
        return this.write;
    }

    protected static abstract class Factory<IN>
    extends PrepareCommitOperator.Factory<IN, Committable> {
        protected final FileStoreTable table;
        protected final StoreSinkWrite.Provider storeSinkWriteProvider;
        protected final String initialCommitUser;

        protected Factory(FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
            super(Options.fromMap(table.options()));
            this.table = table;
            this.storeSinkWriteProvider = storeSinkWriteProvider;
            this.initialCommitUser = initialCommitUser;
        }
    }
}

