/*
 * Decompiled with CFR 0.152.
 */
package com.huawei.dli.sdk.read.impl;

import com.huawei.dli.sdk.exception.DLIException;
import com.huawei.dli.sdk.meta.Row;
import com.huawei.dli.sdk.read.ResultSet;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CachedResultSet
implements ResultSet {
    private static final Logger log = LoggerFactory.getLogger(CachedResultSet.class);
    private static final int POLL_WAIT_SECONDS = 3;
    private static final int DOTTING_THRESHOLD = 5000;
    private final ArrayBlockingQueue<Row> buffQueue = new ArrayBlockingQueue(10000);
    private final List<ResultSet> readers;
    private final long resultCount;
    private ExecutorService threadPool;
    private long fetchCount = 0L;

    public CachedResultSet(List<ResultSet> readers, long resultCount) {
        this.readers = readers;
        this.resultCount = resultCount;
    }

    @Override
    public void init() throws DLIException {
        if (!this.readers.isEmpty()) {
            for (ResultSet reader : this.readers) {
                reader.init();
            }
            this.threadPool = Executors.newFixedThreadPool(this.readers.size());
            this.readers.forEach(innerReader -> {
                ProducerTask readerTask = new ProducerTask(this.buffQueue, (ResultSet)innerReader);
                this.threadPool.submit(readerTask);
            });
        }
    }

    @Override
    public Row read() throws DLIException {
        if (this.hasNext()) {
            try {
                Row row = this.buffQueue.poll(3L, TimeUnit.SECONDS);
                if (++this.fetchCount % 5000L == 0L) {
                    log.info("Current buffer size: {}", (Object)this.buffQueue.size());
                }
                return row;
            }
            catch (InterruptedException e) {
                throw new DLIException("Fetch data failed", e);
            }
        }
        throw new DLIException("There is no new data to fetch, please check");
    }

    @Override
    public void close() throws DLIException {
        for (ResultSet r : this.readers) {
            r.close();
        }
        this.buffQueue.clear();
        if (this.threadPool != null) {
            this.threadPool.shutdown();
        }
    }

    @Override
    public boolean hasNext() throws DLIException {
        return this.fetchCount < this.resultCount;
    }

    static class ProducerTask
    implements Callable<Void> {
        private static final int OFFER_WAIT_SECONDS = 28;
        private final ArrayBlockingQueue<Row> buffer;
        private final ResultSet reader;

        ProducerTask(ArrayBlockingQueue<Row> buffer, ResultSet reader) {
            this.buffer = buffer;
            this.reader = reader;
        }

        @Override
        public Void call() throws Exception {
            while (this.reader.hasNext()) {
                if (this.buffer.offer(this.reader.read(), 28L, TimeUnit.SECONDS)) continue;
                throw new DLIException("Write data to buff timeout!!!");
            }
            return null;
        }
    }
}

