/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark;

import com.clearspring.analytics.util.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList;
import org.apache.hadoop.hive.ql.exec.spark.HiveKVResultCache;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
import org.junit.Assert;
import org.junit.Test;
import scala.Tuple2;

public class TestHiveKVResultCache {
    @Test
    public void testSimple() throws Exception {
        HiveKVResultCache cache = new HiveKVResultCache();
        HiveKey key = new HiveKey("key".getBytes(), "key".hashCode());
        BytesWritable value = new BytesWritable("value".getBytes());
        cache.add(key, value);
        Assert.assertTrue((String)"KV result cache should have at least one element", (boolean)cache.hasNext());
        Tuple2 row = cache.next();
        Assert.assertTrue((String)"Incorrect key", (boolean)((HiveKey)row._1()).equals((Object)key));
        Assert.assertTrue((String)"Incorrect value", (boolean)((BytesWritable)row._2()).equals((Object)value));
        Assert.assertTrue((String)"Cache shouldn't have more records", (!cache.hasNext() ? 1 : 0) != 0);
    }

    @Test
    public void testSpilling() throws Exception {
        HiveKVResultCache cache = new HiveKVResultCache();
        int recordCount = 3072;
        this.testSpillingHelper(cache, 3072);
        this.testSpillingHelper(cache, 1);
        this.testSpillingHelper(cache, 3072);
    }

    private void testSpillingHelper(HiveKVResultCache cache, int numRecords) {
        String value;
        String key;
        for (int i = 0; i < numRecords; ++i) {
            key = "key_" + i;
            value = "value_" + i;
            cache.add(new HiveKey(key.getBytes(), key.hashCode()), new BytesWritable(value.getBytes()));
        }
        int recordsSeen = 0;
        while (cache.hasNext()) {
            key = "key_" + recordsSeen;
            value = "value_" + recordsSeen;
            Tuple2 row = cache.next();
            Assert.assertTrue((String)("Unexpected key at position: " + recordsSeen), (boolean)new String(((HiveKey)row._1()).getBytes()).equals(key));
            Assert.assertTrue((String)("Unexpected value at position: " + recordsSeen), (boolean)new String(((BytesWritable)row._2()).getBytes()).equals(value));
            ++recordsSeen;
        }
        Assert.assertTrue((String)"Retrieved record count doesn't match inserted record count", (numRecords == recordsSeen ? 1 : 0) != 0);
        cache.clear();
    }

    @Test
    public void testResultList() throws Exception {
        TestHiveKVResultCache.scanAndVerify(10000L, 0, 0, "a", "b");
        TestHiveKVResultCache.scanAndVerify(10000L, 511, 0, "a", "b");
        TestHiveKVResultCache.scanAndVerify(10000L, 1022, 0, "a", "b");
        TestHiveKVResultCache.scanAndVerify(10000L, 511, 10, "a", "b");
        TestHiveKVResultCache.scanAndVerify(10000L, 1022, 10, "a", "b");
        TestHiveKVResultCache.scanAndVerify(10000L, 512, 0, "a", "b");
        TestHiveKVResultCache.scanAndVerify(10000L, 1024, 0, "a", "b");
        TestHiveKVResultCache.scanAndVerify(10000L, 512, 3, "a", "b");
        TestHiveKVResultCache.scanAndVerify(10000L, 3072, 10, "a", "b");
        TestHiveKVResultCache.scanAndVerify(10000L, 3584, 5, "a", "b");
        TestHiveKVResultCache.scanAndVerify(10000L, 4608, 19, "a", "b");
        TestHiveKVResultCache.scanAndVerify(10000L, 1, 0, "a", "b");
        TestHiveKVResultCache.scanAndVerify(10000L, 1, 1, "a", "b");
    }

    private static void scanAndVerify(long rows, int threshold, int separate, String prefix1, String prefix2) {
        ArrayList<Tuple2<HiveKey, BytesWritable>> output = new ArrayList<Tuple2<HiveKey, BytesWritable>>((int)rows);
        TestHiveKVResultCache.scanResultList(rows, threshold, separate, output, prefix1, prefix2);
        Assert.assertEquals((long)rows, (long)output.size());
        long primaryRows = rows * (long)(100 - separate) / 100L;
        long separateRows = rows - primaryRows;
        HashSet<Long> primaryRowKeys = new HashSet<Long>();
        HashSet<Long> separateRowKeys = new HashSet<Long>();
        for (Tuple2<HiveKey, BytesWritable> item : output) {
            String key = TestHiveKVResultCache.bytesWritableToString((BytesWritable)item._1);
            String value = TestHiveKVResultCache.bytesWritableToString((BytesWritable)item._2);
            String prefix = key.substring(0, key.indexOf(95));
            Long id = Long.valueOf(key.substring(5 + prefix.length()));
            if (prefix.equals(prefix1)) {
                Assert.assertTrue((id >= 0L && id < primaryRows ? 1 : 0) != 0);
                primaryRowKeys.add(id);
            } else {
                Assert.assertEquals((Object)prefix2, (Object)prefix);
                Assert.assertTrue((id >= 0L && id < separateRows ? 1 : 0) != 0);
                separateRowKeys.add(id);
            }
            Assert.assertEquals((Object)(prefix + "_value_" + id), (Object)value);
        }
        Assert.assertEquals((long)separateRows, (long)separateRowKeys.size());
        Assert.assertEquals((long)primaryRows, (long)primaryRowKeys.size());
    }

    private static String bytesWritableToString(BytesWritable bw) {
        int size = bw.getLength();
        byte[] bytes = new byte[size];
        System.arraycopy(bw.getBytes(), 0, bytes, 0, size);
        return new String(bytes);
    }

    private static long scanResultList(long rows, int threshold, int separate, List<Tuple2<HiveKey, BytesWritable>> output, String prefix1, String prefix2) {
        final long iteratorCount = threshold == 0 ? 1L : rows * (long)(100 - separate) / 100L / (long)threshold;
        MyHiveFunctionResultList resultList = new MyHiveFunctionResultList(new Iterator(){
            private int i = 0;

            @Override
            public boolean hasNext() {
                return (long)this.i++ < iteratorCount;
            }

            public Object next() {
                return this.i;
            }

            @Override
            public void remove() {
            }
        });
        resultList.init(rows, threshold, separate, prefix1, prefix2);
        long startTime = System.currentTimeMillis();
        while (resultList.hasNext()) {
            Tuple2 item = resultList.next();
            if (output == null) continue;
            output.add((Tuple2<HiveKey, BytesWritable>)item);
        }
        long endTime = System.currentTimeMillis();
        return endTime - startTime;
    }

    private static long[] scanResultList(long rows, int threshold, int extra) {
        long t1 = TestHiveKVResultCache.scanResultList(rows, 0, 0, null, "a", "b");
        long t2 = TestHiveKVResultCache.scanResultList(rows, threshold, 0, null, "c", "d");
        long t3 = TestHiveKVResultCache.scanResultList(rows, threshold * 10, 0, null, "e", "f");
        long t4 = TestHiveKVResultCache.scanResultList(rows, threshold, extra, null, "g", "h");
        long t5 = TestHiveKVResultCache.scanResultList(rows, threshold * 10, extra, null, "i", "j");
        return new long[]{t1, t2, t3, t4, t5};
    }

    public static void main(String[] args) throws Exception {
        int i;
        long rows = 1000000L;
        int threshold = 512;
        int extra = 5;
        if (args.length > 0) {
            rows = Long.parseLong(args[0]);
        }
        if (args.length > 1) {
            threshold = Integer.parseInt(args[1]);
        }
        if (args.length > 2) {
            extra = Integer.parseInt(args[2]);
        }
        for (int i2 = 0; i2 < 2; ++i2) {
            TestHiveKVResultCache.scanResultList(rows, threshold, extra);
        }
        int count = 5;
        long[] t = new long[count];
        for (i = 0; i < count; ++i) {
            long[] tmp = TestHiveKVResultCache.scanResultList(rows, threshold, extra);
            for (int k = 0; k < count; ++k) {
                int n = k;
                t[n] = t[n] + tmp[k];
            }
        }
        i = 0;
        while (i < count) {
            int n = i++;
            t[n] = t[n] / (long)count;
        }
        System.out.println(t[0] + "\t" + t[1] + "\t" + t[2] + "\t" + t[3] + "\t" + t[4]);
    }

    private static class MyHiveFunctionResultList
    extends HiveBaseFunctionResultList {
        private static final long serialVersionUID = -1L;
        private long primaryRows;
        private int thresholdRows;
        private long separateRows;
        private Thread separateRowGenerator;
        private long rowsEmitted;
        private long separateRowsEmitted;
        private String prefix1;
        private String prefix2;
        private LinkedBlockingQueue<Boolean> queue;

        MyHiveFunctionResultList(Iterator inputIterator) {
            super(inputIterator);
        }

        void init(long rows, int threshold, int separate, String p1, String p2) {
            Preconditions.checkArgument(((threshold > 0 || separate == 0) && separate < 100 && separate >= 0 && rows > 0L ? 1 : 0) != 0);
            this.primaryRows = rows * (long)(100 - separate) / 100L;
            this.separateRows = rows - this.primaryRows;
            this.thresholdRows = threshold;
            this.prefix1 = p1;
            this.prefix2 = p2;
            if (this.separateRows > 0L) {
                this.separateRowGenerator = new Thread(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            long separateBatchSize = (long)MyHiveFunctionResultList.this.thresholdRows * MyHiveFunctionResultList.this.separateRows / MyHiveFunctionResultList.this.primaryRows;
                            while (!((Boolean)MyHiveFunctionResultList.this.queue.take()).booleanValue()) {
                                int i = 0;
                                while ((long)i < separateBatchSize) {
                                    MyHiveFunctionResultList.this.collect(MyHiveFunctionResultList.this.prefix2, MyHiveFunctionResultList.this.separateRowsEmitted++);
                                    ++i;
                                }
                            }
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        while (MyHiveFunctionResultList.this.separateRowsEmitted < MyHiveFunctionResultList.this.separateRows) {
                            MyHiveFunctionResultList.this.collect(MyHiveFunctionResultList.this.prefix2, MyHiveFunctionResultList.this.separateRowsEmitted++);
                        }
                    }
                });
                this.queue = new LinkedBlockingQueue();
                this.separateRowGenerator.start();
            }
        }

        public void collect(String prefix, long id) {
            String k = prefix + "_key_" + id;
            String v = prefix + "_value_" + id;
            HiveKey key = new HiveKey(k.getBytes(), k.hashCode());
            BytesWritable value = new BytesWritable(v.getBytes());
            try {
                this.collect(key, value);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

        protected void processNextRecord(Object inputRecord) throws IOException {
            for (int i = 0; i < this.thresholdRows; ++i) {
                this.collect(this.prefix1, this.rowsEmitted++);
            }
            if (this.separateRowGenerator != null) {
                this.queue.add(Boolean.FALSE);
            }
        }

        protected boolean processingDone() {
            return false;
        }

        protected void closeRecordProcessor() {
            while (this.rowsEmitted < this.primaryRows) {
                this.collect(this.prefix1, this.rowsEmitted++);
            }
            if (this.separateRowGenerator != null) {
                this.queue.add(Boolean.TRUE);
                try {
                    this.separateRowGenerator.join();
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

