/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.window.incremental;

import com.antgroup.geaflow.api.function.base.KeySelector;
import com.antgroup.geaflow.api.function.base.ReduceFunction;
import com.antgroup.geaflow.api.trait.CheckpointTrait;
import com.antgroup.geaflow.api.trait.TransactionTrait;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.FrameworkConfigKeys;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.base.window.AbstractOneInputOperator;
import com.antgroup.geaflow.state.KeyValueState;
import com.antgroup.geaflow.state.StateFactory;
import com.antgroup.geaflow.state.descriptor.KeyValueStateDescriptor;
import com.antgroup.geaflow.utils.keygroup.IKeyGroupAssigner;
import com.antgroup.geaflow.utils.keygroup.KeyGroup;
import com.antgroup.geaflow.utils.keygroup.KeyGroupAssignerFactory;
import com.antgroup.geaflow.utils.keygroup.KeyGroupAssignment;

public class IncrReduceOperator<KEY, T>
extends AbstractOneInputOperator<T, ReduceFunction<T>>
implements TransactionTrait,
CheckpointTrait {
    private transient KeyValueState<KEY, T> aggregatingState;
    private final KeySelector<T, KEY> keySelector;

    public IncrReduceOperator(ReduceFunction<T> function, KeySelector<T, KEY> keySelector) {
        super(function);
        this.keySelector = keySelector;
    }

    @Override
    public void open(Operator.OpContext opContext) {
        super.open(opContext);
        KeyValueStateDescriptor descriptor = KeyValueStateDescriptor.build((String)this.getIdentify(), (String)this.runtimeContext.getConfiguration().getString(FrameworkConfigKeys.SYSTEM_STATE_BACKEND_TYPE));
        int taskIndex = this.runtimeContext.getTaskArgs().getTaskIndex();
        int parallelism = this.runtimeContext.getTaskArgs().getParallelism();
        int maxParallelism = this.runtimeContext.getTaskArgs().getMaxParallelism();
        KeyGroup keyGroup = KeyGroupAssignment.computeKeyGroupRangeForOperatorIndex((int)maxParallelism, (int)parallelism, (int)taskIndex);
        descriptor.withKeyGroup(keyGroup);
        IKeyGroupAssigner keyGroupAssigner = KeyGroupAssignerFactory.createKeyGroupAssigner((KeyGroup)keyGroup, (int)taskIndex, (int)maxParallelism);
        descriptor.withKeyGroupAssigner(keyGroupAssigner);
        this.aggregatingState = StateFactory.buildKeyValueState((KeyValueStateDescriptor)descriptor, (Configuration)this.runtimeContext.getConfiguration());
    }

    @Override
    protected void process(T value) throws Exception {
        Object key = this.keySelector.getKey(value);
        Object oldValue = this.aggregatingState.get(key);
        Object newValue = oldValue == null ? value : ((ReduceFunction)this.function).reduce(oldValue, value);
        this.aggregatingState.put(key, newValue);
        if (newValue != null) {
            this.collectValue(newValue);
        }
    }

    public void finish(long windowId) {
    }

    public void checkpoint(long windowId) {
        this.aggregatingState.manage().operate().setCheckpointId(windowId);
        this.aggregatingState.manage().operate().finish();
        this.aggregatingState.manage().operate().archive();
    }

    public void rollback(long windowId) {
        this.aggregatingState.manage().operate().setCheckpointId(windowId);
        this.aggregatingState.manage().operate().recover();
    }
}

