/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.flink.ProcessRecordAttributesUtil;
import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.index.BucketAssigner;
import org.apache.paimon.index.HashBucketAssigner;
import org.apache.paimon.index.SimpleHashBucketAssigner;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.utils.MathUtils;
import org.apache.paimon.utils.SerializableFunction;

public class HashBucketAssignerOperator<T>
extends AbstractStreamOperator<Tuple2<T, Integer>>
implements OneInputStreamOperator<T, Tuple2<T, Integer>> {
    private static final long serialVersionUID = 1L;
    private final String initialCommitUser;
    private final FileStoreTable table;
    private final Integer numAssigners;
    private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction;
    private final boolean overwrite;
    private transient BucketAssigner assigner;
    private transient PartitionKeyExtractor<T> extractor;

    public HashBucketAssignerOperator(String commitUser, Table table, Integer numAssigners, SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction, boolean overwrite) {
        this.initialCommitUser = commitUser;
        this.table = (FileStoreTable)table;
        this.numAssigners = numAssigners;
        this.extractorFunction = extractorFunction;
        this.overwrite = overwrite;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        String commitUser = StateUtils.getSingleValueFromState(context, "commit_user_state", String.class, this.initialCommitUser);
        int numberTasks = RuntimeContextUtils.getNumberOfParallelSubtasks((RuntimeContext)this.getRuntimeContext());
        int taskId = RuntimeContextUtils.getIndexOfThisSubtask((RuntimeContext)this.getRuntimeContext());
        long targetRowNum = this.table.coreOptions().dynamicBucketTargetRowNum();
        this.assigner = this.overwrite ? new SimpleHashBucketAssigner(numberTasks, taskId, targetRowNum) : new HashBucketAssigner(this.table.snapshotManager(), commitUser, this.table.store().newIndexFileHandler(), numberTasks, MathUtils.min(this.numAssigners, numberTasks), taskId, targetRowNum);
        this.extractor = (PartitionKeyExtractor)this.extractorFunction.apply(this.table.schema());
    }

    public void processElement(StreamRecord<T> streamRecord) throws Exception {
        Object value = streamRecord.getValue();
        int bucket = this.assigner.assign(this.extractor.partition(value), this.extractor.trimmedPrimaryKey(value).hashCode());
        this.output.collect((Object)new StreamRecord((Object)new Tuple2(value, (Object)bucket)));
    }

    public void processRecordAttributes(RecordAttributes recordAttributes) {
        ProcessRecordAttributesUtil.processWithOutput(recordAttributes, this.output);
    }

    public void prepareSnapshotPreBarrier(long checkpointId) {
        this.assigner.prepareCommit(checkpointId);
    }
}

