/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.flink.bigquery.source.split.reader;

import com.codahale.metrics.Reservoir;
import com.codahale.metrics.SlidingWindowReservoir;
import com.google.cloud.bigquery.storage.v1.AvroRows;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions;
import com.google.cloud.flink.bigquery.source.reader.BigQuerySourceReaderContext;
import com.google.cloud.flink.bigquery.source.split.BigQuerySourceSplit;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class BigQuerySourceSplitReader
implements SplitReader<GenericRecord, BigQuerySourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceSplitReader.class);
    private final BigQueryReadOptions readOptions;
    private final BigQuerySourceReaderContext readerContext;
    private final transient Optional<Histogram> readSplitTimeMetric;
    private final Queue<BigQuerySourceSplit> assignedSplits = new ArrayDeque<BigQuerySourceSplit>();
    private final Configuration configuration;
    private Boolean closed = false;
    private Schema avroSchema = null;
    private Long readSoFar = 0L;
    private Long splitStartFetch;
    private Iterator<ReadRowsResponse> readStreamIterator = null;

    public BigQuerySourceSplitReader(BigQueryReadOptions readOptions, BigQuerySourceReaderContext readerContext) {
        this.readOptions = readOptions;
        this.readerContext = readerContext;
        this.configuration = readerContext.getConfiguration();
        this.readSplitTimeMetric = Optional.ofNullable(readerContext.metricGroup()).map(mgroup -> (DropwizardHistogramWrapper)mgroup.histogram("bq.split.read.time.ms", (Histogram)new DropwizardHistogramWrapper(new com.codahale.metrics.Histogram((Reservoir)new SlidingWindowReservoir(500)))));
    }

    Long offsetToFetch(BigQuerySourceSplit split) {
        if (split.getOffset() > 0L) {
            this.readSoFar = split.getOffset();
            this.splitStartFetch = System.currentTimeMillis();
        } else if (this.readSoFar == 0L) {
            this.splitStartFetch = System.currentTimeMillis();
        }
        LOG.debug("[subtask #{}] Offset to fetch from {} for stream {}.", new Object[]{this.readerContext.getIndexOfSubtask(), this.readSoFar, split.getStreamName()});
        return this.readSoFar;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    BigQueryServices.BigQueryServerStream<ReadRowsResponse> retrieveReadStream(BigQuerySourceSplit split) throws IOException {
        try (BigQueryServices.StorageReadClient client = BigQueryServicesFactory.instance((BigQueryConnectOptions)this.readOptions.getBigQueryConnectOptions()).storageRead();){
            ReadRowsRequest readRequest = ReadRowsRequest.newBuilder().setReadStream(split.getStreamName()).setOffset(this.offsetToFetch(split).longValue()).build();
            BigQueryServices.BigQueryServerStream bigQueryServerStream = client.readRows(readRequest);
            return bigQueryServerStream;
        }
        catch (Exception ex) {
            throw new IOException(String.format("[subtask #%d][hostname %s] Problems while opening the stream %s from BigQuery with connection info %s. Current split offset %d, reader offset %d.", this.readerContext.getIndexOfSubtask(), this.readerContext.getLocalHostName(), Optional.ofNullable(split.getStreamName()).orElse("NA"), this.readOptions.toString(), split.getOffset(), this.readSoFar), ex);
        }
    }

    public RecordsWithSplitIds<GenericRecord> fetch() throws IOException {
        if (this.closed.booleanValue()) {
            throw new IllegalStateException("Can't fetch records from a closed split reader.");
        }
        RecordsBySplits.Builder respBuilder = new RecordsBySplits.Builder();
        if (this.assignedSplits.isEmpty()) {
            return respBuilder.build();
        }
        if (this.readerContext.willExceedLimit(0)) {
            LOG.info("Completing reading because we are over limit (context reader count {}).", (Object)this.readerContext.currentReadCount());
            respBuilder.addFinishedSplits((Collection)this.assignedSplits.stream().map(split -> split.splitId()).collect(Collectors.toList()));
            this.assignedSplits.clear();
            return respBuilder.build();
        }
        BigQuerySourceSplit assignedSplit = this.assignedSplits.peek();
        int maxRecordsPerSplitFetch = this.readOptions.getMaxRecordsPerSplitFetch();
        int read = 0;
        Long fetchStartTime = System.currentTimeMillis();
        Boolean truncated = false;
        try {
            if (this.readStreamIterator == null) {
                this.readStreamIterator = this.retrieveReadStream(assignedSplit).iterator();
            }
            Long itStartTime = System.currentTimeMillis();
            while (this.readStreamIterator.hasNext()) {
                ReadRowsResponse response = this.readStreamIterator.next();
                if (!response.hasAvroRows()) {
                    LOG.info("[subtask #{}][hostname {}] The response contained no avro records for stream {}.", new Object[]{this.readerContext.getIndexOfSubtask(), this.readerContext.getLocalHostName(), assignedSplit.getStreamName()});
                }
                if (this.avroSchema == null) {
                    if (response.hasAvroSchema()) {
                        this.avroSchema = new Schema.Parser().parse(response.getAvroSchema().getSchema());
                    } else {
                        throw new IllegalArgumentException("Avro schema not initialized and not available in the response.");
                    }
                }
                Long decodeStart = System.currentTimeMillis();
                List<GenericRecord> recordList = GenericRecordReader.create(this.avroSchema).processRows(response.getAvroRows());
                Long decodeTimeMS = System.currentTimeMillis() - decodeStart;
                LOG.debug("[subtask #{}][hostname %s] Iteration decoded records in {}ms from stream {}.", new Object[]{this.readerContext.getIndexOfSubtask(), decodeTimeMS, assignedSplit.getStreamName()});
                for (GenericRecord record : recordList) {
                    respBuilder.add((SourceSplit)assignedSplit, (Object)record);
                    if (!this.readerContext.willExceedLimit(++read)) continue;
                    break;
                }
                if (this.readerContext.willExceedLimit(read)) break;
                Long itTimeMs = System.currentTimeMillis() - itStartTime;
                LOG.debug("[subtask #{}][hostname {}] Completed reading iteration in {}ms, so far read {} from stream {}.", new Object[]{this.readerContext.getIndexOfSubtask(), this.readerContext.getLocalHostName(), itTimeMs, this.readSoFar + (long)read, assignedSplit.getStreamName()});
                itStartTime = System.currentTimeMillis();
                if (read + recordList.size() <= maxRecordsPerSplitFetch) continue;
                truncated = true;
                break;
            }
            this.readSoFar = this.readSoFar + (long)read;
            if (!truncated.booleanValue()) {
                this.readerContext.updateReadCount(this.readSoFar);
                Long splitTimeMs = System.currentTimeMillis() - this.splitStartFetch;
                this.readSplitTimeMetric.ifPresent(m -> m.update(splitTimeMs.longValue()));
                LOG.info("[subtask #{}][hostname {}] Completed reading split, {} records in {}ms on stream {}.", new Object[]{this.readerContext.getIndexOfSubtask(), this.readerContext.getLocalHostName(), this.readSoFar, splitTimeMs, assignedSplit.splitId()});
                this.readSoFar = 0L;
                this.assignedSplits.poll();
                this.readStreamIterator = null;
                respBuilder.addFinishedSplit(assignedSplit.splitId());
            } else {
                Long fetchTimeMs = System.currentTimeMillis() - fetchStartTime;
                LOG.debug("[subtask #{}][hostname {}] Completed a partial fetch in {}ms, so far read {} from stream {}.", new Object[]{this.readerContext.getIndexOfSubtask(), this.readerContext.getLocalHostName(), fetchTimeMs, this.readSoFar, assignedSplit.getStreamName()});
            }
            return respBuilder.build();
        }
        catch (Exception ex) {
            LOG.error(String.format("[subtask #%d][hostname %s] Problems while reading stream %s from BigQuery with connection info %s. Current split offset %d, reader offset %d. Flink options %s.", this.readerContext.getIndexOfSubtask(), Optional.ofNullable(this.readerContext.getLocalHostName()).orElse("NA"), Optional.ofNullable(assignedSplit.getStreamName()).orElse("NA"), this.readOptions.toString(), assignedSplit.getOffset(), this.readSoFar, this.configuration.toString()), (Throwable)ex);
            this.readStreamIterator = null;
            return new RecordsBySplits.Builder().build();
        }
    }

    public void handleSplitsChanges(SplitsChange<BigQuerySourceSplit> splitsChanges) {
        LOG.debug("Handle split changes {}.", splitsChanges);
        if (!(splitsChanges instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChanges.getClass()));
        }
        this.assignedSplits.addAll(splitsChanges.splits());
    }

    public void wakeUp() {
        LOG.debug("[subtask #{}][hostname %{}] Wake up called.", (Object)this.readerContext.getIndexOfSubtask(), (Object)this.readerContext.getLocalHostName());
    }

    public void close() throws Exception {
        LOG.debug("[subtask #{}][hostname {}] Close called, assigned splits {}.", new Object[]{this.readerContext.getIndexOfSubtask(), this.readerContext.getLocalHostName(), this.assignedSplits.toString()});
        if (!this.closed.booleanValue()) {
            this.closed = true;
            this.readSoFar = 0L;
            this.readStreamIterator = null;
        }
    }

    static class GenericRecordReader {
        private final Schema schema;

        private GenericRecordReader(Schema schema) {
            Preconditions.checkNotNull((Object)schema, (String)"The provided avro schema reference is null.");
            this.schema = schema;
        }

        public static GenericRecordReader create(Schema schema) {
            return new GenericRecordReader(schema);
        }

        public List<GenericRecord> processRows(AvroRows avroRows) throws IOException {
            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), null);
            GenericDatumReader datumReader = new GenericDatumReader(this.schema);
            ArrayList<GenericRecord> records = new ArrayList<GenericRecord>();
            while (!decoder.isEnd()) {
                GenericRecord row = (GenericRecord)datumReader.read(null, (Decoder)decoder);
                records.add(row);
            }
            return records;
        }
    }
}

