/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.partitioner;

import java.util.Objects;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.BaseAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.partitioner.BucketAssigner;
import org.apache.hudi.sink.partitioner.BucketAssigners;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.FlinkWriteClients;

public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
extends KeyedProcessFunction<K, I, O>
implements CheckpointedFunction,
CheckpointListener {
    private ValueState<HoodieRecordGlobalLocation> indexState;
    private BucketAssigner bucketAssigner;
    private final Configuration conf;
    private final boolean isChangingRecords;
    private PayloadCreation payloadCreation;
    private final boolean globalIndex;

    public BucketAssignFunction(Configuration conf) {
        this.conf = conf;
        this.isChangingRecords = WriteOperationType.isChangingRecords(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
        this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED) && !conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        HoodieWriteConfig writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true);
        HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new SerializableConfiguration(HadoopConfigurations.getHadoopConf(this.conf)), new FlinkTaskContextSupplier(this.getRuntimeContext()));
        this.bucketAssigner = BucketAssigners.create(this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getMaxNumberOfParallelSubtasks(), this.getRuntimeContext().getNumberOfParallelSubtasks(), this.ignoreSmallFiles(), HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)), context, writeConfig);
        this.payloadCreation = PayloadCreation.instance(this.conf);
    }

    private boolean ignoreSmallFiles() {
        WriteOperationType operationType = WriteOperationType.fromValue(this.conf.getString(FlinkOptions.OPERATION));
        return WriteOperationType.isOverwrite(operationType);
    }

    public void snapshotState(FunctionSnapshotContext context) {
        this.bucketAssigner.reset();
    }

    public void initializeState(FunctionInitializationContext context) {
        ValueStateDescriptor indexStateDesc = new ValueStateDescriptor("indexState", TypeInformation.of(HoodieRecordGlobalLocation.class));
        double ttl = this.conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24.0 * 60.0 * 60.0 * 1000.0;
        if (ttl > 0.0) {
            indexStateDesc.enableTimeToLive(StateTtlConfig.newBuilder((Time)Time.milliseconds((long)((long)ttl))).build());
        }
        this.indexState = context.getKeyedStateStore().getState(indexStateDesc);
    }

    public void processElement(I value, KeyedProcessFunction.Context ctx, Collector<O> out) throws Exception {
        if (value instanceof IndexRecord) {
            IndexRecord indexRecord = (IndexRecord)value;
            this.indexState.update((Object)((HoodieRecordGlobalLocation)indexRecord.getCurrentLocation()));
        } else {
            this.processRecord((HoodieRecord)value, out);
        }
    }

    private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
        HoodieRecordLocation location;
        HoodieKey hoodieKey = record.getKey();
        String recordKey = hoodieKey.getRecordKey();
        String partitionPath = hoodieKey.getPartitionPath();
        HoodieRecordGlobalLocation oldLoc = (HoodieRecordGlobalLocation)this.indexState.value();
        if (this.isChangingRecords && oldLoc != null) {
            if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
                if (this.globalIndex) {
                    HoodieAvroRecord deleteRecord = new HoodieAvroRecord(new HoodieKey(recordKey, oldLoc.getPartitionPath()), this.payloadCreation.createDeletePayload((BaseAvroPayload)record.getData()));
                    deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
                    deleteRecord.seal();
                    out.collect(deleteRecord);
                }
                location = this.getNewRecordLocation(partitionPath);
            } else {
                location = oldLoc.toLocal("U");
                this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
            }
        } else {
            location = this.getNewRecordLocation(partitionPath);
        }
        if (this.isChangingRecords) {
            this.updateIndexState(partitionPath, location);
        }
        record.setCurrentLocation(location);
        out.collect(record);
    }

    private HoodieRecordLocation getNewRecordLocation(String partitionPath) {
        HoodieRecordLocation location;
        BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
        switch (bucketInfo.getBucketType()) {
            case INSERT: {
                location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
                break;
            }
            case UPDATE: {
                location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
                break;
            }
            default: {
                throw new AssertionError();
            }
        }
        return location;
    }

    private void updateIndexState(String partitionPath, HoodieRecordLocation localLoc) throws Exception {
        this.indexState.update((Object)HoodieRecordGlobalLocation.fromLocal(partitionPath, localLoc));
    }

    public void notifyCheckpointComplete(long checkpointId) {
        this.bucketAssigner.reload(checkpointId);
    }

    public void close() throws Exception {
        this.bucketAssigner.close();
    }
}

