/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.processor.impl;

import com.antgroup.geaflow.api.context.RuntimeContext;
import com.antgroup.geaflow.api.trait.CheckpointTrait;
import com.antgroup.geaflow.api.trait.TransactionTrait;
import com.antgroup.geaflow.collector.ICollector;
import com.antgroup.geaflow.common.config.keys.FrameworkConfigKeys;
import com.antgroup.geaflow.common.utils.CheckpointUtil;
import com.antgroup.geaflow.model.record.BatchRecord;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.base.AbstractOperator;
import com.antgroup.geaflow.processor.impl.AbstractProcessor;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractStreamProcessor<T, R, OP extends Operator>
extends AbstractProcessor<T, R, OP>
implements TransactionTrait {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamProcessor.class);
    protected Object lock = new Object();
    protected List<TransactionTrait> transactionOpList = new ArrayList<TransactionTrait>();
    protected long checkpointDuration;

    public AbstractStreamProcessor(OP operator) {
        super(operator);
        this.addIfTransactionTrait((Operator)operator);
    }

    @Override
    public void open(List<ICollector> collectors, RuntimeContext runtimeContext) {
        super.open(collectors, runtimeContext);
        this.checkpointDuration = this.runtimeContext.getConfiguration().getLong(FrameworkConfigKeys.BATCH_NUMBER_PER_CHECKPOINT);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void finish(long windowId) {
        Object object = this.lock;
        synchronized (object) {
            LOGGER.info("{} do finish {}", (Object)this.runtimeContext.getTaskArgs().getTaskId(), (Object)windowId);
            for (TransactionTrait transactionTrait : this.transactionOpList) {
                transactionTrait.finish(windowId);
                if (!CheckpointUtil.needDoCheckpoint((long)windowId, (long)this.checkpointDuration) || !(transactionTrait instanceof CheckpointTrait)) continue;
                ((CheckpointTrait)transactionTrait).checkpoint(windowId);
            }
            super.finish(windowId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void rollback(long windowId) {
        Object object = this.lock;
        synchronized (object) {
            LOGGER.info("do rollback {}", (Object)windowId);
            for (TransactionTrait transactionTrait : this.transactionOpList) {
                transactionTrait.rollback(windowId);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public R process(T value) {
        Object object = this.lock;
        synchronized (object) {
            return this.processElement((BatchRecord)value);
        }
    }

    protected void addIfTransactionTrait(Operator operator) {
        if (operator == null) {
            return;
        }
        if (operator instanceof TransactionTrait) {
            this.transactionOpList.add((TransactionTrait)operator);
        }
        for (Object subOperator : ((AbstractOperator)operator).getNextOperators()) {
            this.addIfTransactionTrait((Operator)subOperator);
        }
    }

    protected abstract R processElement(BatchRecord var1);
}

