/*
 * Decompiled with CFR 0.152.
 */
package com.fluxtion.runtime.dataflow.groupby;

import com.fluxtion.runtime.annotations.OnParentUpdate;
import com.fluxtion.runtime.annotations.OnTrigger;
import com.fluxtion.runtime.annotations.builder.AssignToField;
import com.fluxtion.runtime.dataflow.FlowFunction;
import com.fluxtion.runtime.dataflow.TriggeredFlowFunction;
import com.fluxtion.runtime.dataflow.aggregate.AggregateFlowFunction;
import com.fluxtion.runtime.dataflow.aggregate.function.BucketedSlidingWindow;
import com.fluxtion.runtime.dataflow.function.AbstractFlowFunction;
import com.fluxtion.runtime.dataflow.groupby.GroupBy;
import com.fluxtion.runtime.dataflow.groupby.GroupByFlowFunctionWrapper;
import com.fluxtion.runtime.partition.LambdaReflection;
import com.fluxtion.runtime.time.FixedRateTrigger;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class GroupByTimedSlidingWindow<T, K, V, R, S extends FlowFunction<T>, F extends AggregateFlowFunction<V, R, F>>
extends AbstractFlowFunction<T, GroupBy<K, R>, S>
implements TriggeredFlowFunction<GroupBy<K, R>> {
    private final LambdaReflection.SerializableSupplier<F> windowFunctionSupplier;
    private final LambdaReflection.SerializableFunction<T, K> keyFunction;
    private final LambdaReflection.SerializableFunction<T, V> valueFunction;
    private final int bucketSizeMillis;
    private final int bucketCount;
    public FixedRateTrigger rollTrigger;
    private transient Supplier<GroupByFlowFunctionWrapper<T, K, V, R, F>> groupBySupplier;
    private transient BucketedSlidingWindow<T, GroupBy<K, R>, GroupByFlowFunctionWrapper<T, K, V, R, F>> slidingCalculator;
    private final transient Map<K, R> mapOfValues = new HashMap<K, R>();
    private final transient MyGroupBy results = new MyGroupBy();

    public GroupByTimedSlidingWindow(S inputEventStream, LambdaReflection.SerializableSupplier<F> windowFunctionSupplier, @AssignToField(value="keyFunction") LambdaReflection.SerializableFunction<T, K> keyFunction, @AssignToField(value="valueFunction") LambdaReflection.SerializableFunction<T, V> valueFunction, @AssignToField(value="bucketSizeMillis") int bucketSizeMillis, @AssignToField(value="bucketCount") int bucketCount) {
        super(inputEventStream, null);
        this.windowFunctionSupplier = windowFunctionSupplier;
        this.keyFunction = keyFunction;
        this.valueFunction = valueFunction;
        this.bucketSizeMillis = bucketSizeMillis;
        this.bucketCount = bucketCount;
        this.resetTriggered = false;
        this.rollTrigger = FixedRateTrigger.atMillis(bucketSizeMillis);
        this.groupBySupplier = () -> new GroupByFlowFunctionWrapper(keyFunction, valueFunction, windowFunctionSupplier);
        this.slidingCalculator = new BucketedSlidingWindow(this.groupBySupplier, bucketCount);
    }

    @Override
    public GroupBy<K, R> get() {
        return this.results;
    }

    protected void cacheWindowValue() {
        GroupBy<K, R> value = this.slidingCalculator.get();
        this.mapOfValues.clear();
        this.mapOfValues.putAll(value.toMap());
    }

    protected void aggregateInputValue(S inputEventStream) {
        this.slidingCalculator.aggregate(inputEventStream.get());
    }

    @OnParentUpdate
    public void timeTriggerFired(FixedRateTrigger rollTrigger) {
        this.slidingCalculator.roll(rollTrigger.getTriggerCount());
        if (this.slidingCalculator.isAllBucketsFilled()) {
            this.cacheWindowValue();
            this.publishOverrideTriggered = !this.overridePublishTrigger & !this.overrideUpdateTrigger;
            this.inputStreamTriggered_1 = true;
            this.inputStreamTriggered = true;
        }
    }

    @Override
    @OnParentUpdate
    public void inputUpdated(S inputEventStream) {
        this.aggregateInputValue(inputEventStream);
        this.inputStreamTriggered_1 = false;
        this.inputStreamTriggered = false;
    }

    @Override
    @OnParentUpdate(value="updateTriggerNode")
    public void updateTriggerNodeUpdated(Object triggerNode) {
        super.updateTriggerNodeUpdated(triggerNode);
        this.cacheWindowValue();
    }

    @Override
    protected void resetOperation() {
        this.groupBySupplier = () -> new GroupByFlowFunctionWrapper(this.keyFunction, this.valueFunction, this.windowFunctionSupplier);
        this.slidingCalculator = new BucketedSlidingWindow(this.groupBySupplier, this.bucketCount);
        this.rollTrigger.init();
        this.mapOfValues.clear();
    }

    @Override
    public boolean isStatefulFunction() {
        return true;
    }

    @OnTrigger
    public boolean triggered() {
        return this.fireEventUpdateNotification();
    }

    private class MyGroupBy
    implements GroupBy<K, R> {
        private MyGroupBy() {
        }

        @Override
        public Map<K, R> toMap() {
            return GroupByTimedSlidingWindow.this.mapOfValues;
        }

        @Override
        public Collection<R> values() {
            return GroupByTimedSlidingWindow.this.mapOfValues.values();
        }

        @Override
        public R lastValue() {
            return ((GroupBy)GroupByTimedSlidingWindow.this.slidingCalculator.get()).lastValue();
        }

        @Override
        public GroupBy.KeyValue<K, R> lastKeyValue() {
            return ((GroupBy)GroupByTimedSlidingWindow.this.slidingCalculator.get()).lastKeyValue();
        }
    }
}

