/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.pdata.stream.window;

import com.antgroup.geaflow.api.function.base.AggregateFunction;
import com.antgroup.geaflow.api.function.base.KeySelector;
import com.antgroup.geaflow.api.function.base.ReduceFunction;
import com.antgroup.geaflow.api.pdata.stream.view.PIncStreamView;
import com.antgroup.geaflow.api.pdata.stream.window.PWindowKeyStream;
import com.antgroup.geaflow.api.pdata.stream.window.PWindowStream;
import com.antgroup.geaflow.common.config.keys.FrameworkConfigKeys;
import com.antgroup.geaflow.common.encoder.EncoderResolver;
import com.antgroup.geaflow.common.encoder.IEncoder;
import com.antgroup.geaflow.context.AbstractPipelineContext;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.base.AbstractOperator;
import com.antgroup.geaflow.operator.impl.window.WindowAggregateOperator;
import com.antgroup.geaflow.operator.impl.window.WindowReduceOperator;
import com.antgroup.geaflow.partitioner.IPartitioner;
import com.antgroup.geaflow.partitioner.impl.KeyPartitioner;
import com.antgroup.geaflow.pdata.stream.view.IncStreamView;
import com.antgroup.geaflow.pdata.stream.window.WindowDataStream;
import com.antgroup.geaflow.pipeline.context.IPipelineContext;
import com.google.common.base.Preconditions;
import java.util.Map;

public class WindowKeyDataStream<KEY, T>
extends WindowDataStream<T>
implements PWindowKeyStream<KEY, T> {
    private KeySelector<T, KEY> keySelector;
    private boolean materializeDisable;

    public WindowKeyDataStream(IPipelineContext context, WindowDataStream dataStream, AbstractOperator operator, KeySelector<T, KEY> keySelector) {
        super(context, dataStream, (Operator)operator);
        this.keySelector = keySelector;
        this.materializeDisable = ((AbstractPipelineContext)context).getConfig().getBoolean(FrameworkConfigKeys.INC_STREAM_MATERIALIZE_DISABLE);
    }

    public <ACC, OUT> PWindowStream<OUT> aggregate(AggregateFunction<T, ACC, OUT> aggregateFunction) {
        if (!this.materializeDisable) {
            return this.materialize().aggregate(aggregateFunction);
        }
        Preconditions.checkArgument((aggregateFunction != null ? 1 : 0) != 0, (Object)" aggregate Function must not be null");
        IEncoder resultEncoder = EncoderResolver.resolveFunction(AggregateFunction.class, aggregateFunction, (int)2);
        return new WindowDataStream(this.context, this, (Operator)new WindowAggregateOperator(aggregateFunction, this.keySelector)).withEncoder(resultEncoder);
    }

    public PWindowStream<T> reduce(ReduceFunction<T> reduceFunction) {
        if (!this.materializeDisable) {
            return this.materialize().reduce(reduceFunction);
        }
        Preconditions.checkArgument((reduceFunction != null ? 1 : 0) != 0, (Object)" Reduce Function must not be null");
        return new WindowDataStream(this.context, this, (Operator)new WindowReduceOperator(reduceFunction, this.keySelector)).withEncoder(this.encoder);
    }

    public PIncStreamView<T> materialize() {
        IncStreamView incStreamView = new IncStreamView(this.context, this.keySelector);
        return incStreamView.append(this);
    }

    public PWindowKeyStream<KEY, T> withConfig(Map config) {
        this.opArgs.setConfig(config);
        return this;
    }

    public PWindowKeyStream<KEY, T> withConfig(String key, String value) {
        this.opArgs.getConfig().put(key, value);
        return this;
    }

    public PWindowKeyStream<KEY, T> withName(String name) {
        this.setName(name);
        return this;
    }

    public PWindowKeyStream<KEY, T> withParallelism(int parallelism) {
        this.setParallelism(parallelism);
        return this;
    }

    @Override
    public IPartitioner<T> getPartition() {
        return new KeyPartitioner(this.getId());
    }

    public WindowKeyDataStream<KEY, T> withEncoder(IEncoder<T> encoder) {
        this.encoder = encoder;
        return this;
    }
}

