/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigquery.connector.common;

import com.google.cloud.bigquery.connector.common.BigQueryStorageReadRowsTracer;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.parquet.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelArrowReader
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ParallelArrowReader.class);
    private static final Object DONE_SENTINEL = new Object();
    private final BlockingQueue<Object> queue;
    private final Semaphore queueSemaphore;
    private final List<ArrowReader> readers;
    private final ExecutorService executor;
    private final VectorLoader loader;
    private final BigQueryStorageReadRowsTracer rootTracer;
    private final BigQueryStorageReadRowsTracer[] tracers;
    private final AtomicInteger readersReady;
    private Thread readerThread;

    public ParallelArrowReader(List<ArrowReader> list, ExecutorService executorService, VectorLoader vectorLoader, BigQueryStorageReadRowsTracer bigQueryStorageReadRowsTracer) {
        this.readers = list;
        this.queue = new ArrayBlockingQueue<Object>(list.size() + 2);
        this.executor = executorService;
        this.loader = vectorLoader;
        this.rootTracer = bigQueryStorageReadRowsTracer;
        this.queueSemaphore = new Semaphore(list.size());
        this.readersReady = new AtomicInteger(list.size());
        this.tracers = new BigQueryStorageReadRowsTracer[list.size()];
        for (int i = 0; i < list.size(); ++i) {
            this.tracers[i] = this.rootTracer.forkWithPrefix("reader-thread-" + i);
        }
        this.start();
    }

    public boolean next() throws IOException {
        this.rootTracer.nextBatchNeeded();
        this.rootTracer.readRowsResponseRequested();
        ArrowRecordBatch arrowRecordBatch = null;
        try {
            Object object = this.queue.take();
            this.queueSemaphore.release();
            if (object == DONE_SENTINEL) {
                return false;
            }
            if (object instanceof Throwable) {
                if (object instanceof IOException) {
                    throw (IOException)object;
                }
                throw new IOException((Throwable)object);
            }
            Preconditions.checkState((boolean)(object instanceof ArrowRecordBatch), (String)"Expected future object");
            arrowRecordBatch = (ArrowRecordBatch)object;
        }
        catch (InterruptedException interruptedException) {
            log.info("Interrupted when waiting for next batch.");
            return false;
        }
        this.rootTracer.readRowsResponseObtained(0L);
        if (arrowRecordBatch != null) {
            this.rootTracer.rowsParseStarted();
            this.loader.load(arrowRecordBatch);
            this.rootTracer.rowsParseFinished(arrowRecordBatch.getLength());
            arrowRecordBatch.close();
            return true;
        }
        return false;
    }

    private void start() {
        this.readerThread = new Thread(this::consumeReaders);
        this.readerThread.setDaemon(true);
        this.readerThread.start();
        this.rootTracer.startStream();
    }

    private void consumeReaders() {
        try {
            int n;
            AtomicBoolean[] atomicBooleanArray = new AtomicBoolean[this.readers.size()];
            long[] lArray = new long[this.readers.size()];
            VectorUnloader[] vectorUnloaderArray = new VectorUnloader[this.readers.size()];
            VectorSchemaRoot[] vectorSchemaRootArray = new VectorSchemaRoot[this.readers.size()];
            for (n = 0; n < atomicBooleanArray.length; ++n) {
                atomicBooleanArray[n] = new AtomicBoolean();
                atomicBooleanArray[n].set(true);
                lArray[n] = 0L;
                vectorSchemaRootArray[n] = this.readers.get(n).getVectorSchemaRoot();
                vectorUnloaderArray[n] = new VectorUnloader(vectorSchemaRootArray[n], true, false);
                this.tracers[n].startStream();
            }
            while (this.readersReady.get() > 0) {
                for (n = 0; n < this.readers.size(); ++n) {
                    if (!atomicBooleanArray[n].get()) continue;
                    ArrowReader arrowReader = this.readers.get(n);
                    int n2 = n;
                    this.queueSemaphore.acquire();
                    this.executor.submit(() -> {
                        VectorSchemaRoot vectorSchemaRoot = vectorSchemaRootArray[n2];
                        synchronized (vectorSchemaRoot) {
                            if (!atomicBooleanArray[n2].get()) {
                                return;
                            }
                            try {
                                this.tracers[n2].readRowsResponseRequested();
                                atomicBooleanArray[n2].set(arrowReader.loadNextBatch());
                                if (!atomicBooleanArray[n2].get()) {
                                    this.queueSemaphore.release();
                                }
                                long l = arrowReader.bytesRead() - lArray[n2];
                                this.tracers[n2].readRowsResponseObtained(l);
                                lArray[n] = arrowReader.bytesRead();
                            }
                            catch (Throwable throwable) {
                                log.info("Exception caught while consuming reader.", throwable);
                                atomicBooleanArray[n2].set(false);
                                this.readersReady.set(0);
                                Preconditions.checkState((boolean)this.queue.offer(throwable), (String)"Expected space in queue");
                            }
                            ArrowRecordBatch arrowRecordBatch = null;
                            if (!atomicBooleanArray[n2].get()) {
                                this.readersReady.addAndGet(-1);
                                return;
                            }
                            int n2 = 0;
                            try {
                                n2 = arrowReader.getVectorSchemaRoot().getRowCount();
                            }
                            catch (IOException iOException) {
                                this.queue.offer(iOException);
                            }
                            this.tracers[n2].rowsParseStarted();
                            arrowRecordBatch = vectorUnloaderArray[n2].getRecordBatch();
                            this.tracers[n2].rowsParseFinished(n2);
                            try {
                                Preconditions.checkState((boolean)this.queue.offer(arrowRecordBatch), (String)"Expected space in queue");
                            }
                            catch (Exception exception) {
                                arrowRecordBatch.close();
                                throw exception;
                            }
                        }
                    });
                }
            }
        }
        catch (Throwable throwable) {
            log.info("Read ahead caught exceptions", throwable);
            Preconditions.checkState((boolean)this.queue.offer(throwable), (String)"Expected available capacity");
            return;
        }
        Preconditions.checkState((boolean)this.queue.offer(DONE_SENTINEL), (String)"Expected available capacity");
    }

    @Override
    public void close() {
        this.rootTracer.finished();
        if (this.readerThread != null) {
            this.readersReady.set(0);
            this.readerThread.interrupt();
            try {
                this.readerThread.join(10000L);
            }
            catch (InterruptedException interruptedException) {
                log.info("Interrupted while waiting for reader thread to finish.");
            }
            if (this.readerThread.isAlive()) {
                log.warn("Reader thread did not shutdown in 10 seconds.");
            } else {
                log.info("Reader thread stopped.  Queue size: {}", (Object)this.queue.size());
            }
        }
        this.executor.shutdownNow();
        try {
            if (!this.executor.awaitTermination(10L, TimeUnit.SECONDS)) {
                log.warn("executor did not terminate after 10 seconds");
            }
        }
        catch (InterruptedException interruptedException) {
            log.info("Interrupted when awaiting executor termination");
        }
        this.queue.stream().filter(object -> object instanceof ArrowRecordBatch).map(object -> (ArrowRecordBatch)object).forEach(ArrowRecordBatch::close);
        for (BigQueryStorageReadRowsTracer bigQueryStorageReadRowsTracer : this.tracers) {
            bigQueryStorageReadRowsTracer.finished();
        }
        for (ArrowReader arrowReader : this.readers) {
            try {
                arrowReader.close(false);
            }
            catch (Exception exception) {
                log.info("Trouble closing delegate readers", (Throwable)exception);
            }
        }
    }
}

