/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.window.incremental;

import com.antgroup.geaflow.api.function.base.AggregateFunction;
import com.antgroup.geaflow.api.function.base.KeySelector;
import com.antgroup.geaflow.api.trait.CheckpointTrait;
import com.antgroup.geaflow.api.trait.TransactionTrait;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.FrameworkConfigKeys;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.base.window.AbstractOneInputOperator;
import com.antgroup.geaflow.state.KeyValueState;
import com.antgroup.geaflow.state.StateFactory;
import com.antgroup.geaflow.state.descriptor.KeyValueStateDescriptor;
import com.antgroup.geaflow.utils.keygroup.IKeyGroupAssigner;
import com.antgroup.geaflow.utils.keygroup.KeyGroup;
import com.antgroup.geaflow.utils.keygroup.KeyGroupAssignerFactory;
import com.antgroup.geaflow.utils.keygroup.KeyGroupAssignment;
import java.util.HashSet;
import java.util.Set;

public class IncrAggregateOperator<KEY, IN, ACC, OUT>
extends AbstractOneInputOperator<IN, AggregateFunction<IN, ACC, OUT>>
implements TransactionTrait,
CheckpointTrait {
    private transient KeyValueState<KEY, ACC> aggregatingState;
    private KeySelector<IN, KEY> keySelector;
    private Set<KEY> keySet;

    public IncrAggregateOperator(AggregateFunction<IN, ACC, OUT> aggregateFunction, KeySelector<IN, KEY> keySelector) {
        super(aggregateFunction);
        this.keySelector = keySelector;
    }

    @Override
    public void open(Operator.OpContext opContext) {
        super.open(opContext);
        KeyValueStateDescriptor descriptor = KeyValueStateDescriptor.build((String)this.getIdentify(), (String)this.runtimeContext.getConfiguration().getString(FrameworkConfigKeys.SYSTEM_STATE_BACKEND_TYPE));
        int taskIndex = this.runtimeContext.getTaskArgs().getTaskIndex();
        int parallelism = this.runtimeContext.getTaskArgs().getParallelism();
        int maxParallelism = this.runtimeContext.getTaskArgs().getMaxParallelism();
        KeyGroup keyGroup = KeyGroupAssignment.computeKeyGroupRangeForOperatorIndex((int)maxParallelism, (int)parallelism, (int)taskIndex);
        descriptor.withKeyGroup(keyGroup);
        IKeyGroupAssigner keyGroupAssigner = KeyGroupAssignerFactory.createKeyGroupAssigner((KeyGroup)keyGroup, (int)taskIndex, (int)maxParallelism);
        descriptor.withKeyGroupAssigner(keyGroupAssigner);
        this.aggregatingState = StateFactory.buildKeyValueState((KeyValueStateDescriptor)descriptor, (Configuration)this.runtimeContext.getConfiguration());
        this.keySet = new HashSet<KEY>();
    }

    @Override
    protected void process(IN value) throws Exception {
        Object key = this.keySelector.getKey(value);
        Object acc = this.aggregatingState.get(key);
        if (acc == null) {
            acc = ((AggregateFunction)this.function).createAccumulator();
        }
        this.keySet.add(key);
        ((AggregateFunction)this.function).add(value, acc);
        this.aggregatingState.put(key, acc);
    }

    public void finish(long windowId) {
        for (KEY key : this.keySet) {
            Object acc = this.aggregatingState.get(key);
            Object result = ((AggregateFunction)this.function).getResult(acc);
            if (result == null) continue;
            this.collectValue(result);
        }
        this.keySet.clear();
    }

    public void rollback(long windowId) {
        this.aggregatingState.manage().operate().setCheckpointId(windowId);
        this.aggregatingState.manage().operate().recover();
    }

    public void checkpoint(long windowId) {
        this.aggregatingState.manage().operate().setCheckpointId(windowId);
        this.aggregatingState.manage().operate().finish();
        this.aggregatingState.manage().operate().archive();
    }
}

