/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline.transform;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.Functions;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Partitioner;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.TimestampKind;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.function.KeyedWindowResultFunction;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.datamodel.KeyedWindowResult;
import com.hazelcast.jet.impl.JetEvent;
import com.hazelcast.jet.impl.pipeline.Planner;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.impl.pipeline.transform.WindowAggregateTransform;
import com.hazelcast.jet.pipeline.SessionWindowDefinition;
import com.hazelcast.jet.pipeline.SlidingWindowDefinition;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;

public class WindowGroupTransform<K, R>
extends AbstractTransform {
    private static final KeyedWindowResultFunction JET_EVENT_KEYED_WINDOW_RESULT_FN = (winStart, winEnd, key, windowResult, isEarly) -> JetEvent.jetEvent(winEnd - 1L, new KeyedWindowResult<Object, Object>(winStart, winEnd, key, windowResult, isEarly));
    @Nonnull
    private final WindowDefinition wDef;
    @Nonnull
    private final List<FunctionEx<?, ? extends K>> keyFns;
    @Nonnull
    private final AggregateOperation<?, ? extends R> aggrOp;

    public WindowGroupTransform(@Nonnull List<Transform> upstream, @Nonnull WindowDefinition wDef, @Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull AggregateOperation<?, ? extends R> aggrOp) {
        super(WindowGroupTransform.createName(wDef), upstream);
        this.wDef = wDef;
        this.keyFns = keyFns;
        this.aggrOp = aggrOp;
    }

    private static String createName(WindowDefinition wDef) {
        return WindowAggregateTransform.createName(wDef);
    }

    @Override
    public long preferredWatermarkStride() {
        return WindowAggregateTransform.preferredWatermarkStride(this.wDef);
    }

    @Override
    public void addToDag(Planner p) {
        if (this.wDef instanceof SessionWindowDefinition) {
            this.addSessionWindow(p, (SessionWindowDefinition)this.wDef);
        } else if (this.aggrOp.combineFn() == null || this.wDef.earlyResultsPeriod() > 0L || this.shouldRebalanceAnyInput()) {
            this.addSlidingWindowSingleStage(p, (SlidingWindowDefinition)this.wDef);
        } else {
            this.addSlidingWindowTwoStage(p, (SlidingWindowDefinition)this.wDef);
        }
    }

    private void addSlidingWindowSingleStage(Planner p, SlidingWindowDefinition wDef) {
        Planner.PlannerVertex pv = p.addVertex((Transform)this, this.name(), this.localParallelism(), Processors.aggregateToSlidingWindowP(this.keyFns, Collections.nCopies(this.keyFns.size(), JetEvent::timestamp), TimestampKind.EVENT, SlidingWindowPolicy.slidingWinPolicy(wDef.windowSize(), wDef.slideBy()), wDef.earlyResultsPeriod(), this.aggrOp, WindowGroupTransform.jetEventOfKeyedWindowResultFn()));
        p.addEdges((Transform)this, pv.v, (e, ord) -> e.distributed().partitioned(this.keyFns.get(ord)));
    }

    private void addSlidingWindowTwoStage(Planner p, SlidingWindowDefinition wDef) {
        SlidingWindowPolicy winPolicy = SlidingWindowPolicy.slidingWinPolicy(wDef.windowSize(), wDef.slideBy());
        Vertex v1 = p.dag.newVertex(this.name() + "-prepare", Processors.accumulateByFrameP(this.keyFns, Collections.nCopies(this.keyFns.size(), JetEvent::timestamp), TimestampKind.EVENT, winPolicy, this.aggrOp));
        v1.localParallelism(this.localParallelism());
        Planner.PlannerVertex pv2 = p.addVertex((Transform)this, this.name(), this.localParallelism(), Processors.combineToSlidingWindowP(winPolicy, this.aggrOp, WindowGroupTransform.jetEventOfKeyedWindowResultFn()));
        p.addEdges((Transform)this, v1, (e, ord) -> e.partitioned(this.keyFns.get(ord), Partitioner.HASH_CODE));
        p.dag.edge(Edge.between(v1, pv2.v).distributed().partitioned(Functions.entryKey()));
    }

    private void addSessionWindow(Planner p, SessionWindowDefinition wDef) {
        Planner.PlannerVertex pv = p.addVertex((Transform)this, this.name(), this.localParallelism(), Processors.aggregateToSessionWindowP(wDef.sessionTimeout(), wDef.earlyResultsPeriod(), Collections.nCopies(this.keyFns.size(), JetEvent::timestamp), this.keyFns, this.aggrOp, WindowGroupTransform.jetEventOfKeyedWindowResultFn()));
        p.addEdges((Transform)this, pv.v, (e, ord) -> e.distributed().partitioned(this.keyFns.get(ord)));
    }

    private static <K, R> KeyedWindowResultFunction<K, R, JetEvent<? extends KeyedWindowResult<K, ? extends R>>> jetEventOfKeyedWindowResultFn() {
        return JET_EVENT_KEYED_WINDOW_RESULT_FN;
    }
}

