/*
 * Decompiled with CFR 0.152.
 */
package com.playtika.janusgraph.aerospike.operations;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.ScanCallback;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator;
import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
import org.janusgraph.diskstorage.util.RecordIterator;
import org.janusgraph.diskstorage.util.StaticArrayBuffer;
import org.janusgraph.diskstorage.util.StaticArrayEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AerospikeKeyIterator
implements KeyIterator,
ScanCallback {
    private static Logger logger = LoggerFactory.getLogger(AerospikeKeyIterator.class);
    private final SliceQuery query;
    private final BlockingQueue<KeyRecord> queue = new LinkedBlockingQueue<KeyRecord>(100);
    private KeyRecord next;
    private Iterator<Entry> entriesIt;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private Thread thread;
    private static final KeyRecord TERMINATE_VALUE = new KeyRecord(null, null);

    public AerospikeKeyIterator(SliceQuery query) {
        this.query = query;
    }

    public RecordIterator<Entry> getEntries() {
        return new RecordIterator<Entry>(){

            public boolean hasNext() {
                return AerospikeKeyIterator.this.entriesIt.hasNext();
            }

            public Entry next() {
                return AerospikeKeyIterator.this.entriesIt.next();
            }

            public void close() {
            }
        };
    }

    public void close() {
        this.closed.set(true);
        try {
            this.queue.put(TERMINATE_VALUE);
            this.thread.interrupt();
        }
        catch (InterruptedException e) {
            throw new RuntimeException();
        }
    }

    public boolean hasNext() {
        try {
            if (this.next == TERMINATE_VALUE) {
                return false;
            }
            return this.next != null || (this.next = this.takeNext()) != TERMINATE_VALUE;
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public StaticBuffer next() {
        if (this.next == null) {
            try {
                this.next = this.takeNext();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        if (this.next == TERMINATE_VALUE) {
            throw new NoSuchElementException();
        }
        try {
            StaticArrayBuffer staticArrayBuffer = AerospikeKeyIterator.keyToBuffer(this.next.key);
            return staticArrayBuffer;
        }
        finally {
            this.next = null;
        }
    }

    private KeyRecord takeNext() throws InterruptedException {
        do {
            this.next = this.queue.take();
            if (this.next == TERMINATE_VALUE) {
                this.entriesIt = null;
                break;
            }
            this.entriesIt = this.entriesIt(this.next);
        } while (!this.entriesIt.hasNext());
        return this.next;
    }

    private Iterator<Entry> entriesIt(KeyRecord keyRecord) {
        return keyRecord.record.getMap("entries").entrySet().stream().map(o -> {
            Map.Entry entry = o;
            StaticArrayBuffer column = StaticArrayBuffer.of((ByteBuffer)((ByteBuffer)entry.getKey()));
            StaticArrayBuffer value = StaticArrayBuffer.of((byte[])((byte[])entry.getValue()));
            return StaticArrayEntry.of((StaticBuffer)column, (StaticBuffer)value);
        }).filter(entry -> this.query.contains(entry.getColumn())).limit(this.query.getLimit()).iterator();
    }

    public void scanCallback(Key key, Record record) throws AerospikeException {
        if (this.closed.get()) {
            logger.info("AerospikeKeyIterator get closed, terminate scan");
            throw new AerospikeException.ScanTerminated();
        }
        try {
            this.queue.put(new KeyRecord(key, record));
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static StaticArrayBuffer keyToBuffer(Key key) {
        return new StaticArrayBuffer((byte[])key.userKey.getObject());
    }

    public void setThread(Thread thread) {
        this.thread = thread;
    }

    private static class KeyRecord {
        final Key key;
        final Record record;

        private KeyRecord(Key key, Record record) {
            this.key = key;
            this.record = record;
        }
    }
}

