/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.io;

import com.antgroup.geaflow.api.function.io.SourceFunction;
import com.antgroup.geaflow.api.window.IWindow;
import com.antgroup.geaflow.api.window.impl.AllWindow;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.task.TaskArgs;
import com.antgroup.geaflow.operator.OpArgs;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.base.io.SourceOperator;
import com.antgroup.geaflow.operator.base.window.AbstractStreamOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WindowSourceOperator<OUT>
extends AbstractStreamOperator<SourceFunction<OUT>>
implements SourceOperator<OUT, Boolean> {
    private static final Logger LOGGER = LoggerFactory.getLogger(WindowSourceOperator.class);
    protected transient SourceFunction.SourceContext<OUT> sourceCxt;
    protected IWindow<OUT> windowFunction;

    public WindowSourceOperator() {
        this.opArgs.setOpType(OpArgs.OpType.MULTI_WINDOW_SOURCE);
    }

    public WindowSourceOperator(SourceFunction<OUT> sourceFunction) {
        super(sourceFunction);
        this.opArgs.setOpType(OpArgs.OpType.MULTI_WINDOW_SOURCE);
    }

    public WindowSourceOperator(SourceFunction<OUT> sourceFunction, IWindow<OUT> windowFunction) {
        this(sourceFunction);
        this.windowFunction = windowFunction;
        if (windowFunction instanceof AllWindow) {
            this.opArgs.setOpType(OpArgs.OpType.SINGLE_WINDOW_SOURCE);
        } else {
            this.opArgs.setOpType(OpArgs.OpType.MULTI_WINDOW_SOURCE);
        }
    }

    @Override
    public void open(Operator.OpContext opContext) {
        super.open(opContext);
        this.sourceCxt = new StreamSourceContext();
        TaskArgs taskArgs = opContext.getRuntimeContext().getTaskArgs();
        ((SourceFunction)this.function).init(taskArgs.getParallelism(), taskArgs.getTaskIndex());
    }

    @Override
    public Boolean emit(long windowId) throws Exception {
        try {
            this.windowFunction.initWindow(windowId);
            return ((SourceFunction)this.function).fetch(this.windowFunction, this.sourceCxt);
        }
        catch (Exception e) {
            LOGGER.error(e.getMessage(), (Throwable)e);
            throw new GeaflowRuntimeException((Throwable)e);
        }
    }

    @Override
    public void close() {
        super.close();
        ((SourceFunction)this.function).close();
    }

    class StreamSourceContext
    implements SourceFunction.SourceContext<OUT> {
        public boolean collect(OUT element) throws Exception {
            WindowSourceOperator.this.collectValue(element);
            return true;
        }
    }
}

