/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigtable.dataflow;

import com.google.api.client.util.Lists;
import com.google.bigtable.repackaged.com.google.cloud.config.BigtableOptions;
import com.google.bigtable.repackaged.com.google.cloud.config.BulkOptions;
import com.google.bigtable.repackaged.com.google.cloud.grpc.BigtableSession;
import com.google.bigtable.repackaged.com.google.cloud.grpc.BigtableTableName;
import com.google.bigtable.repackaged.com.google.cloud.grpc.scanner.ResultScanner;
import com.google.bigtable.repackaged.com.google.cloud.hbase.adapters.Adapters;
import com.google.bigtable.repackaged.com.google.com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.repackaged.com.google.com.google.bigtable.v2.Row;
import com.google.bigtable.repackaged.com.google.com.google.bigtable.v2.RowRange;
import com.google.bigtable.repackaged.com.google.com.google.bigtable.v2.SampleRowKeysRequest;
import com.google.bigtable.repackaged.com.google.com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.bigtable.repackaged.com.google.protobuf.BigtableZeroCopyByteStringUtil;
import com.google.cloud.bigtable.dataflow.AbstractCloudBigtableTableDoFn;
import com.google.cloud.bigtable.dataflow.CloudBigtableConfiguration;
import com.google.cloud.bigtable.dataflow.CloudBigtableScanConfiguration;
import com.google.cloud.bigtable.dataflow.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.dataflow.coders.HBaseMutationCoder;
import com.google.cloud.bigtable.dataflow.coders.HBaseResultArrayCoder;
import com.google.cloud.bigtable.dataflow.coders.HBaseResultCoder;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.range.ByteKey;
import com.google.cloud.dataflow.sdk.io.range.ByteKeyRange;
import com.google.cloud.dataflow.sdk.io.range.ByteKeyRangeTracker;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudBigtableIO {
    private static AtomicCoder<Result> RESULT_CODER = new HBaseResultCoder();
    private static AtomicCoder<Result[]> RESULT_ARRAY_CODER = new HBaseResultArrayCoder();
    private static AtomicCoder HBASE_MUTATION_CODER = new HBaseMutationCoder();
    static final ScanIterator<Result> RESULT_ADVANCER = new ScanIterator<Result>(){
        private static final long serialVersionUID = 1L;

        @Override
        public Result next(ResultScanner<Row> resultScanner) throws IOException {
            Row row = resultScanner.next();
            return row == null ? null : Adapters.ROW_ADAPTER.adaptResponse(row);
        }

        @Override
        public boolean isCompletionMarker(Result result) {
            return result == null;
        }

        @Override
        public long getRowCount(Result result) {
            return result == null ? 0L : 1L;
        }

        @Override
        public ByteString getLatestKey(Result result) {
            return result == null ? null : ByteString.copyFrom((byte[])result.getRow());
        }
    };

    public static Coder getCoder(CoderType type) {
        switch (type) {
            case RESULT: {
                return RESULT_CODER;
            }
            case RESULT_ARRAY: {
                return RESULT_ARRAY_CODER;
            }
        }
        throw new IllegalArgumentException("Can't get a coder for type: " + type.name());
    }

    private static CloudBigtableScanConfiguration augmentConfiguration(CloudBigtableScanConfiguration configuration, byte[] startKey, byte[] stopKey) {
        ReadRowsRequest.Builder builder = configuration.getRequest().toBuilder();
        builder.getRowsBuilder().setRowRanges(0, RowRange.newBuilder().setStartKeyClosed(BigtableZeroCopyByteStringUtil.wrap(startKey)).setEndKeyOpen(BigtableZeroCopyByteStringUtil.wrap(stopKey)));
        return configuration.toBuilder().withRequest(builder.build()).build();
    }

    public static Pipeline initializeForWrite(Pipeline p) {
        CoderRegistry registry = p.getCoderRegistry();
        registry.registerCoder(Put.class, (Coder)HBASE_MUTATION_CODER);
        registry.registerCoder(Delete.class, (Coder)HBASE_MUTATION_CODER);
        registry.registerCoder(Mutation.class, (Coder)HBASE_MUTATION_CODER);
        return p;
    }

    public static PTransform<PCollection<Mutation>, PDone> writeToTable(CloudBigtableTableConfiguration config) {
        CloudBigtableIO.validateTableConfig(config);
        AbstractCloudBigtableTableDoFn writeFn = null;
        writeFn = config.getConfiguration().get((Object)"google.bigtable.dataflow.singletable.serial") != null ? new CloudBigtableSingleTableSerialWriteFn(config) : new CloudBigtableSingleTableBufferedWriteFn(config);
        return new CloudBigtableWriteTransform<Mutation>(writeFn);
    }

    public static PTransform<PCollection<KV<String, Iterable<Mutation>>>, PDone> writeToMultipleTables(CloudBigtableConfiguration config) {
        CloudBigtableIO.validateConfig(config);
        return new CloudBigtableWriteTransform<KV<String, Iterable<Mutation>>>(new CloudBigtableMultiTableWriteFn(config));
    }

    public static BoundedSource<Result> read(CloudBigtableScanConfiguration config) {
        return new Source<Result>(config, CoderType.RESULT, RESULT_ADVANCER, ByteKeyRange.ALL_KEYS);
    }

    public static BoundedSource<Result[]> readBulk(CloudBigtableScanConfiguration config, int resultCount) {
        return new Source<Result[]>(config, CoderType.RESULT_ARRAY, new ResultArrayIterator(resultCount), ByteKeyRange.ALL_KEYS);
    }

    private static void checkNotNullOrEmpty(String value, String type) {
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)value) ? 1 : 0) != 0, (Object)("A " + type + " must be set to configure Bigtable properly."));
    }

    private static void validateTableConfig(CloudBigtableTableConfiguration configuration) {
        CloudBigtableIO.validateConfig(configuration);
        CloudBigtableIO.checkNotNullOrEmpty(configuration.getTableId(), "tableid");
    }

    private static void validateConfig(CloudBigtableConfiguration configuration) {
        CloudBigtableIO.checkNotNullOrEmpty(configuration.getProjectId(), "projectId");
        CloudBigtableIO.checkNotNullOrEmpty(configuration.getInstanceId(), "instanceId");
    }

    public static class CloudBigtableWriteTransform<T>
    extends PTransform<PCollection<T>, PDone> {
        private static final long serialVersionUID = -2888060194257930027L;
        private final DoFn<T, Void> function;

        public CloudBigtableWriteTransform(DoFn<T, Void> function) {
            this.function = function;
        }

        public PDone apply(PCollection<T> input) {
            input.apply((PTransform)ParDo.of(this.function));
            return PDone.in((Pipeline)input.getPipeline());
        }
    }

    public static class CloudBigtableMultiTableWriteFn
    extends AbstractCloudBigtableTableDoFn<KV<String, Iterable<Mutation>>, Void> {
        private static final long serialVersionUID = 2L;
        private final Aggregator<Long, Long> mutationsCounter = this.createAggregator("mutations", (Combine.CombineFn)new Sum.SumLongFn());

        public CloudBigtableMultiTableWriteFn(CloudBigtableConfiguration config) {
            super(config);
        }

        public void processElement(DoFn.ProcessContext context) throws Exception {
            KV element = (KV)context.element();
            String tableName = (String)element.getKey();
            try (Table t = this.getConnection().getTable(TableName.valueOf((String)tableName));){
                ArrayList mutations = Lists.newArrayList((Iterable)((Iterable)element.getValue()));
                int mutationCount = mutations.size();
                t.batch((List)mutations, new Object[mutationCount]);
                this.mutationsCounter.addValue((Object)mutationCount);
            }
            catch (RetriesExhaustedWithDetailsException exception) {
                this.logExceptions((DoFn.Context)context, exception);
                CloudBigtableMultiTableWriteFn.rethrowException(exception);
            }
        }
    }

    public static class CloudBigtableSingleTableSerialWriteFn
    extends AbstractCloudBigtableTableDoFn<Mutation, Void> {
        public static final String DO_SERIAL_WRITES = "google.bigtable.dataflow.singletable.serial";
        private static final long serialVersionUID = 2L;
        private transient Table table;
        private final String tableId;
        private final Aggregator<Long, Long> mutationsCounter;

        public CloudBigtableSingleTableSerialWriteFn(CloudBigtableTableConfiguration config) {
            super(config);
            this.tableId = config.getTableId();
            this.mutationsCounter = this.createAggregator("mutations", (Combine.CombineFn)new Sum.SumLongFn());
        }

        private synchronized Table getTable() throws IOException {
            if (this.table == null) {
                this.table = this.getConnection().getTable(TableName.valueOf((String)this.tableId));
            }
            return this.table;
        }

        public void processElement(DoFn.ProcessContext context) throws Exception {
            Mutation mutation = (Mutation)context.element();
            if (this.DOFN_LOG.isTraceEnabled()) {
                this.DOFN_LOG.trace("Persisting {}", (Object)Bytes.toStringBinary((byte[])mutation.getRow()));
            }
            if (mutation instanceof Put) {
                this.getTable().put((Put)mutation);
            } else if (mutation instanceof Delete) {
                this.getTable().delete((Delete)mutation);
            } else {
                throw new IllegalArgumentException("Encountered unsupported mutation type: " + mutation.getClass());
            }
            this.mutationsCounter.addValue((Object)1L);
        }

        public void finishBundle(DoFn.Context context) throws Exception {
            try {
                if (this.table != null) {
                    this.table.close();
                }
            }
            finally {
                super.finishBundle(context);
            }
        }
    }

    public static class CloudBigtableSingleTableBufferedWriteFn
    extends AbstractCloudBigtableTableDoFn<Mutation, Void> {
        private static final long serialVersionUID = 2L;
        private transient BufferedMutator mutator;
        private final String tableName;
        private final Aggregator<Long, Long> mutationsCounter;
        private final Aggregator<Long, Long> exceptionsCounter;

        public CloudBigtableSingleTableBufferedWriteFn(CloudBigtableTableConfiguration config) {
            super(config);
            this.tableName = config.getTableId();
            this.mutationsCounter = this.createAggregator("mutations", (Combine.CombineFn)new Sum.SumLongFn());
            this.exceptionsCounter = this.createAggregator("exceptions", (Combine.CombineFn)new Sum.SumLongFn());
        }

        private synchronized BufferedMutator getBufferedMutator(DoFn.Context context) throws IOException {
            if (this.mutator == null) {
                BufferedMutator.ExceptionListener listener = this.createExceptionListener(context);
                BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf((String)this.tableName)).writeBufferSize(BulkOptions.BIGTABLE_MAX_MEMORY_DEFAULT).listener(listener);
                this.mutator = this.getConnection().getBufferedMutator(params);
            }
            return this.mutator;
        }

        protected BufferedMutator.ExceptionListener createExceptionListener(final DoFn.Context context) {
            return new BufferedMutator.ExceptionListener(){

                public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
                    CloudBigtableSingleTableBufferedWriteFn.this.logExceptions(context, exception);
                    throw exception;
                }
            };
        }

        public void processElement(DoFn.ProcessContext context) throws Exception {
            Mutation mutation = (Mutation)context.element();
            if (this.DOFN_LOG.isTraceEnabled()) {
                this.DOFN_LOG.trace("Persisting {}", (Object)Bytes.toStringBinary((byte[])mutation.getRow()));
            }
            this.getBufferedMutator((DoFn.Context)context).mutate(mutation);
            this.mutationsCounter.addValue((Object)1L);
        }

        public void finishBundle(DoFn.Context context) throws Exception {
            try {
                if (this.mutator != null) {
                    this.mutator.close();
                }
            }
            catch (RetriesExhaustedWithDetailsException exception) {
                this.exceptionsCounter.addValue((Object)exception.getCauses().size());
                this.logExceptions(context, exception);
                CloudBigtableSingleTableBufferedWriteFn.rethrowException(exception);
            }
            finally {
                super.finishBundle(context);
            }
        }
    }

    private static class Reader<Results>
    extends BoundedSource.BoundedReader<Results> {
        private static final Logger READER_LOG = LoggerFactory.getLogger(Reader.class);
        private AbstractSource<Results> source;
        private final CloudBigtableScanConfiguration config;
        private final ScanIterator<Results> scanIterator;
        private final ByteKeyRangeTracker rangeTracker;
        private volatile BigtableSession session;
        private volatile ResultScanner<Row> scanner;
        private volatile Results current;
        protected long workStart;
        private final AtomicLong rowsRead = new AtomicLong();

        private Reader(AbstractSource<Results> source, CloudBigtableScanConfiguration config, ScanIterator<Results> scanIterator) {
            this.source = source;
            this.config = config;
            this.scanIterator = scanIterator;
            this.rangeTracker = ByteKeyRangeTracker.of((ByteKeyRange)source.getRange());
        }

        public boolean advance() throws IOException {
            this.current = this.scanIterator.next(this.scanner);
            this.rowsRead.addAndGet(this.scanIterator.getRowCount(this.current));
            boolean isComplete = this.scanIterator.isCompletionMarker(this.current);
            if (!isComplete) {
                this.rangeTracker.tryReturnRecordAt(true, ByteKey.of((ByteString)this.scanIterator.getLatestKey(this.current)));
            }
            return !isComplete;
        }

        public boolean start() throws IOException {
            long connectionStart = System.currentTimeMillis();
            Configuration hbaseConfig = this.config.toHBaseConfig();
            hbaseConfig.set("google.bigtable.grpc.channel.count", "1");
            this.session = new BigtableSession(BigtableOptionsFactory.fromConfiguration(hbaseConfig));
            this.scanner = this.session.getDataClient().readRows(this.config.getRequest());
            this.workStart = System.currentTimeMillis();
            READER_LOG.info("{} Starting work. Creating Scanner took: {} ms.", (Object)this, (Object)(this.workStart - connectionStart));
            return this.advance();
        }

        public void close() throws IOException {
            this.scanner.close();
            this.session.close();
            long totalOps = this.rowsRead.get();
            long elapsedTimeMs = System.currentTimeMillis() - this.workStart;
            long operationsPerSecond = totalOps * 1000L / elapsedTimeMs;
            READER_LOG.info("{} Complete: {} operations in {} ms. That's {} operations/sec", new Object[]{this, totalOps, elapsedTimeMs, operationsPerSecond});
        }

        public final Results getCurrent() throws NoSuchElementException {
            return this.current;
        }

        public final synchronized BoundedSource<Results> getCurrentSource() {
            return this.source;
        }

        public final Double getFractionConsumed() {
            return this.rangeTracker.getFractionConsumed();
        }

        public final synchronized BoundedSource<Results> splitAtFraction(double fraction) {
            long estimatedSizeBytes;
            ByteKey splitKey;
            ByteKeyRange originalRange = this.source.getRange();
            try {
                splitKey = originalRange.interpolateKey(fraction);
            }
            catch (IllegalArgumentException e) {
                READER_LOG.info("%s: Failed to interpolate key for fraction %s.", (Object)originalRange, (Object)fraction);
                return null;
            }
            READER_LOG.debug("Proposing to split {} at fraction {} (key {})", new Object[]{this.rangeTracker, fraction, splitKey});
            try {
                estimatedSizeBytes = this.source.getEstimatedSizeBytes(null);
            }
            catch (IOException e) {
                return null;
            }
            byte[] originalStart = originalRange.getStartKey().getBytes();
            byte[] originalEnd = originalRange.getEndKey().getBytes();
            SourceWithKeys<Results> primary = this.source.createSourceWithKeys(originalStart, splitKey.getBytes(), (long)((double)estimatedSizeBytes * fraction));
            SourceWithKeys<Results> residual = this.source.createSourceWithKeys(splitKey.getBytes(), originalEnd, (long)((double)estimatedSizeBytes * (1.0 - fraction)));
            if (!this.rangeTracker.trySplitAtPosition(splitKey)) {
                return null;
            }
            this.source = primary;
            return residual;
        }

        public String toString() {
            return String.format("Reader for: ['%s' - '%s'], range: %s", Bytes.toStringBinary((byte[])this.config.getStartRow()), Bytes.toStringBinary((byte[])this.config.getStopRow()), this.rangeTracker);
        }
    }

    protected static class SourceWithKeys<ResultOutputType>
    extends AbstractSource<ResultOutputType> {
        private static final long serialVersionUID = 1L;
        private final long estimatedSize;

        protected SourceWithKeys(CloudBigtableScanConfiguration configuration, CoderType coderType, ScanIterator<ResultOutputType> scanIterator, long estimatedSize, ByteKeyRange range) {
            super(configuration, coderType, scanIterator, range);
            byte[] startRow = configuration.getStartRow();
            byte[] stopRow = configuration.getStopRow();
            if (stopRow.length > 0) {
                if (Bytes.compareTo((byte[])startRow, (byte[])stopRow) >= 0) {
                    throw new IllegalArgumentException(String.format("Source keys not in order: [%s, %s]", Bytes.toStringBinary((byte[])startRow), Bytes.toStringBinary((byte[])stopRow)));
                }
                com.google.api.client.util.Preconditions.checkState((estimatedSize > 0L ? 1 : 0) != 0, (String)"Source size must be positive", (Object[])new Object[]{estimatedSize});
            }
            this.estimatedSize = estimatedSize;
            SOURCE_LOG.debug("Source with split: {}.", (Object)this);
        }

        @Override
        public long getEstimatedSizeBytes(PipelineOptions options) {
            return this.estimatedSize;
        }

        public long getEstimatedSize() {
            return this.estimatedSize;
        }

        public List<? extends BoundedSource<ResultOutputType>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
            List newSplits = this.split(this.estimatedSize, desiredBundleSizeBytes, this.configuration.getStartRow(), this.configuration.getStopRow());
            SOURCE_LOG.trace("Splitting split {} into {}", (Object)this, newSplits);
            return newSplits;
        }

        public byte[] getStartRow() {
            return this.configuration.getStartRow();
        }

        public byte[] getStopRow() {
            return this.configuration.getStopRow();
        }

        public String toString() {
            return String.format("Split start: '%s', end: '%s', size: %d, range: %s", Bytes.toStringBinary((byte[])this.configuration.getStartRow()), Bytes.toStringBinary((byte[])this.configuration.getStopRow()), this.estimatedSize, this.range);
        }
    }

    public static class Source<ResultOutputType>
    extends AbstractSource<ResultOutputType> {
        private static final long serialVersionUID = -5580115943635114126L;

        Source(CloudBigtableScanConfiguration configuration, CoderType coderType, ScanIterator<ResultOutputType> scanIterator, ByteKeyRange range) {
            super(configuration, coderType, scanIterator, range);
        }

        public List<? extends BoundedSource<ResultOutputType>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
            List splits = this.getSplits(desiredBundleSizeBytes);
            SOURCE_LOG.info("Creating {} splits.", (Object)splits.size());
            SOURCE_LOG.debug("Created splits {}.", splits);
            return splits;
        }

        @Override
        public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
            long totalEstimatedSizeBytes = 0L;
            byte[] scanStartKey = this.configuration.getStartRow();
            byte[] scanEndKey = this.configuration.getStopRow();
            byte[] startKey = HConstants.EMPTY_START_ROW;
            long lastOffset = 0L;
            for (SampleRowKeysResponse response : this.getSampleRowKeys()) {
                byte[] currentEndKey = response.getRowKey().toByteArray();
                if (Bytes.equals((byte[])startKey, (byte[])currentEndKey) && startKey.length != 0) continue;
                long offset = response.getOffsetBytes();
                if (Source.isWithinRange(scanStartKey, scanEndKey, startKey, currentEndKey)) {
                    totalEstimatedSizeBytes += offset - lastOffset;
                }
                lastOffset = offset;
                startKey = currentEndKey;
            }
            SOURCE_LOG.info("Estimated size in bytes: " + totalEstimatedSizeBytes);
            return totalEstimatedSizeBytes;
        }
    }

    static abstract class AbstractSource<ResultOutputType>
    extends BoundedSource<ResultOutputType> {
        protected static final Logger SOURCE_LOG = LoggerFactory.getLogger(AbstractSource.class);
        protected static final long SIZED_BASED_MAX_SPLIT_COUNT = 4000L;
        static final long COUNT_MAX_SPLIT_COUNT = 15360L;
        protected final CloudBigtableScanConfiguration configuration;
        protected final int coderTypeOrdinal;
        protected final ScanIterator<ResultOutputType> scanIterator;
        protected final ByteKeyRange range;
        private transient List<SampleRowKeysResponse> sampleRowKeys;

        AbstractSource(CloudBigtableScanConfiguration configuration, CoderType coderType, ScanIterator<ResultOutputType> scanIterator, ByteKeyRange range) {
            this.configuration = configuration;
            this.coderTypeOrdinal = coderType.ordinal();
            this.scanIterator = scanIterator;
            this.range = range;
        }

        public Coder<ResultOutputType> getDefaultOutputCoder() {
            return CloudBigtableIO.getCoder(CoderType.values()[this.coderTypeOrdinal]);
        }

        protected List<SourceWithKeys<ResultOutputType>> getSplits(long desiredBundleSizeBytes) throws Exception {
            desiredBundleSizeBytes = Math.max(this.getEstimatedSizeBytes(null) / 4000L, desiredBundleSizeBytes);
            byte[] scanStartKey = this.configuration.getStartRow();
            byte[] scanEndKey = this.configuration.getStopRow();
            ArrayList<SourceWithKeys<ResultOutputType>> splits = new ArrayList<SourceWithKeys<ResultOutputType>>();
            byte[] startKey = HConstants.EMPTY_START_ROW;
            long lastOffset = 0L;
            for (SampleRowKeysResponse response : this.getSampleRowKeys()) {
                byte[] endKey = response.getRowKey().toByteArray();
                if (Bytes.equals((byte[])startKey, (byte[])endKey) && startKey.length > 0) continue;
                long offset = response.getOffsetBytes();
                if (AbstractSource.isWithinRange(scanStartKey, scanEndKey, startKey, endKey)) {
                    byte[] splitStart = null;
                    byte[] splitStop = null;
                    splitStart = scanStartKey.length == 0 || Bytes.compareTo((byte[])startKey, (byte[])scanStartKey) >= 0 ? startKey : scanStartKey;
                    splitStop = (scanEndKey.length == 0 || Bytes.compareTo((byte[])endKey, (byte[])scanEndKey) <= 0) && endKey.length > 0 ? endKey : scanEndKey;
                    splits.addAll(this.split(offset - lastOffset, desiredBundleSizeBytes, splitStart, splitStop));
                }
                lastOffset = offset;
                startKey = endKey;
            }
            byte[] endKey = HConstants.EMPTY_END_ROW;
            if (!Bytes.equals((byte[])startKey, (byte[])endKey) && scanEndKey.length == 0) {
                splits.add(this.createSourceWithKeys(startKey, endKey, 0L));
            }
            return this.reduceSplits(splits);
        }

        private List<SourceWithKeys<ResultOutputType>> reduceSplits(List<SourceWithKeys<ResultOutputType>> splits) {
            if ((long)splits.size() < 15360L) {
                return splits;
            }
            ArrayList<SourceWithKeys<ResultOutputType>> reducedSplits = new ArrayList<SourceWithKeys<ResultOutputType>>();
            SourceWithKeys<ResultOutputType> start = null;
            SourceWithKeys<ResultOutputType> lastSeen = null;
            int numberToCombine = (int)(((long)splits.size() + 15360L - 1L) / 15360L);
            int counter = 0;
            long size = 0L;
            for (SourceWithKeys<ResultOutputType> source : splits) {
                if (counter == 0) {
                    start = source;
                }
                size += source.getEstimatedSize();
                lastSeen = source;
                if (++counter != numberToCombine) continue;
                reducedSplits.add(this.createSourceWithKeys(start.getStartRow(), source.getStopRow(), size));
                counter = 0;
                size = 0L;
                start = null;
            }
            if (start != null) {
                reducedSplits.add(this.createSourceWithKeys(start.getStartRow(), lastSeen.getStopRow(), size));
            }
            return reducedSplits;
        }

        protected static boolean isWithinRange(byte[] scanStartKey, byte[] scanEndKey, byte[] startKey, byte[] endKey) {
            return !(scanStartKey.length != 0 && endKey.length != 0 && Bytes.compareTo((byte[])scanStartKey, (byte[])endKey) >= 0 || scanEndKey.length != 0 && Bytes.compareTo((byte[])scanEndKey, (byte[])startKey) <= 0);
        }

        public synchronized List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
            if (this.sampleRowKeys == null) {
                BigtableOptions bigtableOptions = this.configuration.toBigtableOptions();
                try (BigtableSession session = new BigtableSession(bigtableOptions);){
                    BigtableTableName tableName = bigtableOptions.getInstanceName().toTableName(this.configuration.getTableId());
                    SampleRowKeysRequest request = SampleRowKeysRequest.newBuilder().setTableName(tableName.toString()).build();
                    this.sampleRowKeys = session.getDataClient().sampleRowKeys(request);
                }
            }
            return this.sampleRowKeys;
        }

        @VisibleForTesting
        void setSampleRowKeys(List<SampleRowKeysResponse> sampleRowKeys) {
            this.sampleRowKeys = sampleRowKeys;
        }

        public void validate() {
            CloudBigtableIO.validateTableConfig(this.configuration);
        }

        public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
            long totalEstimatedSizeBytes = 0L;
            byte[] scanStartKey = this.configuration.getStartRow();
            byte[] scanEndKey = this.configuration.getStopRow();
            byte[] startKey = HConstants.EMPTY_START_ROW;
            long lastOffset = 0L;
            for (SampleRowKeysResponse response : this.getSampleRowKeys()) {
                byte[] currentEndKey = response.getRowKey().toByteArray();
                if (Bytes.equals((byte[])startKey, (byte[])currentEndKey) && startKey.length != 0) continue;
                long offset = response.getOffsetBytes();
                if (AbstractSource.isWithinRange(scanStartKey, scanEndKey, startKey, currentEndKey)) {
                    totalEstimatedSizeBytes += offset - lastOffset;
                }
                lastOffset = offset;
                startKey = currentEndKey;
            }
            SOURCE_LOG.info("Estimated size in bytes: " + totalEstimatedSizeBytes);
            return totalEstimatedSizeBytes;
        }

        public boolean producesSortedKeys(PipelineOptions options) throws Exception {
            return true;
        }

        protected List<SourceWithKeys<ResultOutputType>> split(long regionSize, long desiredBundleSizeBytes, byte[] startKey, byte[] stopKey) throws IOException {
            byte[][] splitKeys;
            com.google.api.client.util.Preconditions.checkState((desiredBundleSizeBytes > 0L ? 1 : 0) != 0);
            int splitCount = (int)Math.ceil((double)regionSize / (double)desiredBundleSizeBytes);
            if (splitCount < 2 || stopKey.length == 0) {
                return Collections.singletonList(this.createSourceWithKeys(startKey, stopKey, regionSize));
            }
            if (stopKey.length > 0) {
                com.google.api.client.util.Preconditions.checkState((Bytes.compareTo((byte[])startKey, (byte[])stopKey) <= 0 ? 1 : 0) != 0, (String)"Source keys not in order: [%s, %s]", (Object[])new Object[]{Bytes.toStringBinary((byte[])startKey), Bytes.toStringBinary((byte[])stopKey)});
                com.google.api.client.util.Preconditions.checkState((regionSize > 0L ? 1 : 0) != 0, (String)"Source size must be positive", (Object[])new Object[]{regionSize});
            }
            com.google.api.client.util.Preconditions.checkState((splitCount + 1 == (splitKeys = Bytes.split((byte[])startKey, (byte[])stopKey, (int)(splitCount - 1))).length ? 1 : 0) != 0);
            ArrayList<SourceWithKeys<ResultOutputType>> result = new ArrayList<SourceWithKeys<ResultOutputType>>();
            for (int i = 0; i < splitCount; ++i) {
                result.add(this.createSourceWithKeys(splitKeys[i], splitKeys[i + 1], regionSize));
            }
            return result;
        }

        @VisibleForTesting
        SourceWithKeys<ResultOutputType> createSourceWithKeys(byte[] startKey, byte[] stopKey, long size) {
            CloudBigtableScanConfiguration updatedConfig = CloudBigtableIO.augmentConfiguration(this.configuration, startKey, stopKey);
            ByteKeyRange range = ByteKeyRange.of((ByteKey)ByteKey.copyFrom((byte[])startKey), (ByteKey)ByteKey.copyFrom((byte[])stopKey));
            return new SourceWithKeys<ResultOutputType>(updatedConfig, CoderType.values()[this.coderTypeOrdinal], this.scanIterator, size, range);
        }

        public BoundedSource.BoundedReader<ResultOutputType> createReader(PipelineOptions options) throws IOException {
            return new Reader(this, this.configuration, this.scanIterator);
        }

        public ByteKeyRange getRange() {
            return this.range;
        }
    }

    static final class ResultArrayIterator
    implements ScanIterator<Result[]> {
        private static final long serialVersionUID = 1L;
        private final int arraySize;

        public ResultArrayIterator(int arraySize) {
            this.arraySize = arraySize;
        }

        @Override
        public Result[] next(ResultScanner<Row> resultScanner) throws IOException {
            Row[] next = resultScanner.next(this.arraySize);
            Result[] results = new Result[next.length];
            for (int i = 0; i < next.length; ++i) {
                results[i] = Adapters.ROW_ADAPTER.adaptResponse(next[i]);
            }
            return results;
        }

        @Override
        public boolean isCompletionMarker(Result[] result) {
            return result == null || result.length == 0;
        }

        @Override
        public long getRowCount(Result[] result) {
            return result == null ? 0L : (long)result.length;
        }

        @Override
        public ByteString getLatestKey(Result[] result) {
            return this.isCompletionMarker(result) ? null : ByteString.copyFrom((byte[])result[result.length - 1].getRow());
        }
    }

    private static interface ScanIterator<ResultOutputType>
    extends Serializable {
        public ResultOutputType next(ResultScanner<Row> var1) throws IOException;

        public boolean isCompletionMarker(ResultOutputType var1);

        public long getRowCount(ResultOutputType var1);

        public ByteString getLatestKey(ResultOutputType var1);
    }

    static enum CoderType {
        RESULT,
        RESULT_ARRAY;

    }
}

