/*
 * Decompiled with CFR 0.152.
 */
package io.brackit.query.block;

import io.brackit.query.BrackitQueryContext;
import io.brackit.query.QueryContext;
import io.brackit.query.QueryException;
import io.brackit.query.Tuple;
import io.brackit.query.atomic.Atomic;
import io.brackit.query.atomic.Int32;
import io.brackit.query.atomic.Str;
import io.brackit.query.block.Block;
import io.brackit.query.block.BlockChain;
import io.brackit.query.block.ChainedSink;
import io.brackit.query.block.ConcurrentSink;
import io.brackit.query.block.FJControl;
import io.brackit.query.block.ForBind;
import io.brackit.query.block.LetBind;
import io.brackit.query.block.SerialSink;
import io.brackit.query.block.Sink;
import io.brackit.query.compiler.translator.Reference;
import io.brackit.query.expr.BlockExpr;
import io.brackit.query.expr.PrintExpr;
import io.brackit.query.expr.RangeExpr;
import io.brackit.query.expr.SequenceExpr;
import io.brackit.query.function.FunctionExpr;
import io.brackit.query.function.bit.Delay;
import io.brackit.query.jdm.Item;
import io.brackit.query.jdm.Iter;
import io.brackit.query.jdm.Sequence;
import io.brackit.query.operator.TupleImpl;
import io.brackit.query.util.aggregator.Aggregate;
import io.brackit.query.util.aggregator.Grouping;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

public class GroupBy
implements Block {
    final int[] groupSpecs;
    final int[] addAggSpecs;
    final Aggregate defaultAgg;
    final Aggregate[] addAggs;
    final boolean sequential;

    public GroupBy(Aggregate dftAgg, Aggregate[] addAggs, int grpSpecCnt, boolean sequential) {
        this.defaultAgg = dftAgg;
        this.addAggs = addAggs;
        this.groupSpecs = new int[grpSpecCnt];
        this.addAggSpecs = new int[addAggs.length];
        this.sequential = sequential;
    }

    @Override
    public int outputWidth(int initSize) {
        return initSize + this.addAggs.length;
    }

    @Override
    public Sink create(QueryContext ctx, Sink sink) throws QueryException {
        if (this.sequential) {
            return new SequentialGroupBy(FJControl.PERMITS, sink);
        }
        return new HashGroupBy(sink);
    }

    public Reference group(int groupSpecNo) {
        return pos -> {
            this.groupSpecs[groupSpecNo] = pos;
        };
    }

    public Reference aggregate(int addAggNo) {
        return pos -> {
            this.addAggSpecs[addAggNo] = pos;
        };
    }

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 20; ++i) {
            Item item;
            FJControl.resizePool(4);
            ForBind forBind = new ForBind(new RangeExpr(new Int32(1), new Int32(10000000)), false);
            ForBind forBind2 = new ForBind(new SequenceExpr(new Str("a"), new Str("b"), new Str("c")), false);
            forBind.bindVariable(true);
            forBind2.bindVariable(true);
            GroupBy groupBy = new GroupBy(Aggregate.SINGLE, new Aggregate[]{Aggregate.COUNT}, 1, false);
            groupBy.group(0).setPos(0);
            LetBind delay = new LetBind(new FunctionExpr(null, new Delay(), Int32.ONE));
            BlockChain block = new BlockChain(new Block[]{forBind2, forBind, delay, groupBy});
            long start = System.currentTimeMillis();
            Sequence res = new BlockExpr(block, new PrintExpr(), true).evaluate(new BrackitQueryContext(), new TupleImpl());
            Iter it = res.iterate();
            int cnt = 0;
            while ((item = it.next()) != null) {
                System.out.println(item);
                ++cnt;
            }
            it.close();
            System.out.println("---");
            System.out.print(cnt);
            System.out.println(" results");
            long end = System.currentTimeMillis();
            System.out.println(end - start + " ms");
        }
    }

    private class SequentialGroupBy
    extends SerialSink {
        final Sink sink;
        final Grouping grp;

        public SequentialGroupBy(int permits, Sink sink) {
            super(permits);
            this.sink = sink;
            this.grp = new Grouping(GroupBy.this.groupSpecs, GroupBy.this.addAggSpecs, GroupBy.this.defaultAgg, GroupBy.this.addAggs);
        }

        private SequentialGroupBy(Semaphore sem, Sink sink, Grouping grp) {
            super(sem);
            this.sink = sink;
            this.grp = grp;
        }

        @Override
        protected ChainedSink doPartition(Sink stopAt) {
            Grouping grp = new Grouping(GroupBy.this.groupSpecs, GroupBy.this.addAggSpecs, GroupBy.this.defaultAgg, GroupBy.this.addAggs);
            return new SequentialGroupBy(this.sem, this.sink.partition(stopAt), grp);
        }

        @Override
        protected SerialSink doFork() {
            return new SequentialGroupBy(this.sem, this.sink, this.grp);
        }

        @Override
        protected void doOutput(Tuple[] buf, int len) throws QueryException {
            for (int i = 0; i < len; ++i) {
                Tuple t = buf[i];
                if (this.grp.add(t)) continue;
                this.outputGroup();
                this.grp.add(t);
            }
        }

        private void outputGroup() throws QueryException {
            Tuple out = this.grp.emit();
            this.sink.output(new Tuple[]{out}, 1);
            this.grp.clear();
        }

        @Override
        protected void doFirstBegin() throws QueryException {
            this.sink.begin();
        }

        @Override
        protected void doFinalEnd() throws QueryException {
            if (this.grp.getSize() > 0) {
                this.outputGroup();
            }
            this.sink.end();
        }
    }

    private class HashGroupBy
    extends ConcurrentSink {
        final Sink sink;
        final ConcurrentHashMap<Key, Grouping> map;

        HashGroupBy(Sink sink) {
            this.sink = sink;
            this.map = new ConcurrentHashMap();
        }

        @Override
        public Sink partition(Sink stopAt) {
            return new HashGroupBy(this.sink.partition(stopAt));
        }

        @Override
        public void output(Tuple[] buf, int len) throws QueryException {
            for (int i = 0; i < len; ++i) {
                Grouping prev;
                Atomic[] gks = Grouping.groupingKeys(GroupBy.this.groupSpecs, buf[i]);
                Key key = new Key(gks);
                Grouping grp = this.map.get(key);
                if (grp == null && (prev = this.map.putIfAbsent(key, grp = new Grouping(GroupBy.this.groupSpecs, GroupBy.this.addAggSpecs, GroupBy.this.defaultAgg, GroupBy.this.addAggs))) != null) {
                    grp = prev;
                }
                grp.add(key.val, buf[i]);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void doEnd() throws QueryException {
            try {
                this.sink.begin();
                Iterator it = ((ConcurrentHashMap.KeySetView)this.map.keySet()).iterator();
                int bufSize = 20;
                Tuple[] buf = new Tuple[bufSize];
                int len = 0;
                while (it.hasNext()) {
                    Key key = (Key)it.next();
                    Grouping grp = this.map.get(key);
                    it.remove();
                    buf[len++] = this.emit(grp);
                    if (len != bufSize) continue;
                    this.sink.output(buf, len);
                    buf = new Tuple[bufSize];
                    len = 0;
                }
                if (len > 0) {
                    this.sink.output(buf, len);
                }
                this.sink.end();
            }
            finally {
                this.map.clear();
            }
        }

        @Override
        protected void doFail() throws QueryException {
            this.sink.fail();
            this.map.clear();
        }

        private Tuple emit(Grouping grp) throws QueryException {
            Tuple t = grp.emit();
            grp.clear();
            return t;
        }
    }

    private static class Key {
        final int hash;
        final Atomic[] val;

        Key(Atomic[] val) {
            this.val = val;
            this.hash = Arrays.hashCode(val);
        }

        public int hashCode() {
            return this.hash;
        }

        public String toString() {
            return Arrays.toString(this.val);
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (obj instanceof Key) {
                Key k = (Key)obj;
                for (int i = 0; i < this.val.length; ++i) {
                    Atomic a1 = this.val[i];
                    Atomic a2 = k.val[i];
                    if ((a1 != null || a2 == null) && a2 != null && a1.atomicCmp(a2) == 0) continue;
                    return false;
                }
                return true;
            }
            return false;
        }
    }
}

