/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.hbase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.hbase.HBaseResultCoder;
import org.apache.beam.sdk.io.hbase.SerializableScan;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdks.java.io.hbase.repackaged.com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class HBaseIO {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseIO.class);

    private HBaseIO() {
    }

    @Experimental
    public static Read read() {
        return new Read(null, "", new SerializableScan(new Scan()));
    }

    public static Write write() {
        return new Write(null, "");
    }

    public static class Write
    extends PTransform<PCollection<Mutation>, PDone> {
        private final String tableId;
        private final SerializableConfiguration serializableConfiguration;

        public Write withConfiguration(Configuration configuration) {
            Preconditions.checkArgument(configuration != null, "configuration can not be null");
            return new Write(new SerializableConfiguration(configuration), this.tableId);
        }

        public Write withTableId(String tableId) {
            Preconditions.checkArgument(tableId != null, "tableIdcan not be null");
            return new Write(this.serializableConfiguration, tableId);
        }

        private Write(SerializableConfiguration serializableConfiguration, String tableId) {
            this.serializableConfiguration = serializableConfiguration;
            this.tableId = tableId;
        }

        public PDone expand(PCollection<Mutation> input) {
            Preconditions.checkArgument(this.serializableConfiguration != null, "withConfiguration() is required");
            Preconditions.checkArgument(this.tableId != null && !this.tableId.isEmpty(), "withTableId() is required");
            try (Connection connection = ConnectionFactory.createConnection((Configuration)this.serializableConfiguration.get());){
                Admin admin = connection.getAdmin();
                Preconditions.checkArgument(admin.tableExists(TableName.valueOf((String)this.tableId)), "Table %s does not exist", (Object)this.tableId);
            }
            catch (IOException e) {
                LOG.warn("Error checking whether table {} exists; proceeding.", (Object)this.tableId, (Object)e);
            }
            input.apply((PTransform)ParDo.of((DoFn)new HBaseWriterFn(this.tableId, this.serializableConfiguration)));
            return PDone.in((Pipeline)input.getPipeline());
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item((String)"configuration", (String)this.serializableConfiguration.get().toString()));
            builder.add(DisplayData.item((String)"tableId", (String)this.tableId));
        }

        public String getTableId() {
            return this.tableId;
        }

        public Configuration getConfiguration() {
            return this.serializableConfiguration.get();
        }

        private class HBaseWriterFn
        extends DoFn<Mutation, Void> {
            private final String tableId;
            private final SerializableConfiguration serializableConfiguration;
            private Connection connection;
            private BufferedMutator mutator;
            private long recordsWritten;

            public HBaseWriterFn(String tableId, SerializableConfiguration serializableConfiguration) {
                this.tableId = Preconditions.checkNotNull(tableId, "tableId");
                this.serializableConfiguration = Preconditions.checkNotNull(serializableConfiguration, "serializableConfiguration");
            }

            @DoFn.Setup
            public void setup() throws Exception {
                this.connection = ConnectionFactory.createConnection((Configuration)this.serializableConfiguration.get());
            }

            @DoFn.StartBundle
            public void startBundle(DoFn.StartBundleContext c) throws IOException {
                BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf((String)this.tableId));
                this.mutator = this.connection.getBufferedMutator(params);
                this.recordsWritten = 0L;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) throws Exception {
                this.mutator.mutate((Mutation)c.element());
                ++this.recordsWritten;
            }

            @DoFn.FinishBundle
            public void finishBundle() throws Exception {
                this.mutator.flush();
                LOG.debug("Wrote {} records", (Object)this.recordsWritten);
            }

            @DoFn.Teardown
            public void tearDown() throws Exception {
                if (this.mutator != null) {
                    this.mutator.close();
                    this.mutator = null;
                }
                if (this.connection != null) {
                    this.connection.close();
                    this.connection = null;
                }
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.delegate((HasDisplayData)Write.this);
            }
        }
    }

    private static class HBaseReader
    extends BoundedSource.BoundedReader<Result> {
        private HBaseSource source;
        private Connection connection;
        private ResultScanner scanner;
        private Iterator<Result> iter;
        private Result current;
        private final ByteKeyRangeTracker rangeTracker;
        private long recordsReturned;

        HBaseReader(HBaseSource source) {
            this.source = source;
            Scan scan = source.read.serializableScan.get();
            ByteKeyRange range = ByteKeyRange.of((ByteKey)ByteKey.copyFrom((byte[])scan.getStartRow()), (ByteKey)ByteKey.copyFrom((byte[])scan.getStopRow()));
            this.rangeTracker = ByteKeyRangeTracker.of((ByteKeyRange)range);
        }

        public boolean start() throws IOException {
            HBaseSource source = this.getCurrentSource();
            Configuration configuration = source.read.serializableConfiguration.get();
            String tableId = source.read.tableId;
            this.connection = ConnectionFactory.createConnection((Configuration)configuration);
            TableName tableName = TableName.valueOf((String)tableId);
            Table table = this.connection.getTable(tableName);
            Scan scanClone = new Scan(source.read.serializableScan.get());
            this.scanner = table.getScanner(scanClone);
            this.iter = this.scanner.iterator();
            return this.advance();
        }

        public Result getCurrent() throws NoSuchElementException {
            return this.current;
        }

        public boolean advance() throws IOException {
            boolean hasRecord;
            if (!this.iter.hasNext()) {
                return this.rangeTracker.markDone();
            }
            Result next = this.iter.next();
            boolean bl = hasRecord = this.rangeTracker.tryReturnRecordAt(true, ByteKey.copyFrom((byte[])next.getRow())) || this.rangeTracker.markDone();
            if (hasRecord) {
                this.current = next;
                ++this.recordsReturned;
            }
            return hasRecord;
        }

        public void close() throws IOException {
            LOG.debug("Closing reader after reading {} records.", (Object)this.recordsReturned);
            if (this.scanner != null) {
                this.scanner.close();
                this.scanner = null;
            }
            if (this.connection != null) {
                this.connection.close();
                this.connection = null;
            }
        }

        public synchronized HBaseSource getCurrentSource() {
            return this.source;
        }

        public final Double getFractionConsumed() {
            return this.rangeTracker.getFractionConsumed();
        }

        public final long getSplitPointsConsumed() {
            return this.rangeTracker.getSplitPointsConsumed();
        }

        @Nullable
        public final synchronized HBaseSource splitAtFraction(double fraction) {
            HBaseSource residual;
            HBaseSource primary;
            ByteKey splitKey;
            try {
                splitKey = this.rangeTracker.getRange().interpolateKey(fraction);
            }
            catch (RuntimeException e) {
                LOG.info("{}: Failed to interpolate key for fraction {}.", new Object[]{this.rangeTracker.getRange(), fraction, e});
                return null;
            }
            LOG.info("Proposing to split {} at fraction {} (key {})", new Object[]{this.rangeTracker, fraction, splitKey});
            try {
                primary = this.source.withEndKey(splitKey);
                residual = this.source.withStartKey(splitKey);
            }
            catch (Exception e) {
                LOG.info("{}: Interpolating for fraction {} yielded invalid split key {}.", new Object[]{this.rangeTracker.getRange(), fraction, splitKey, e});
                return null;
            }
            if (!this.rangeTracker.trySplitAtPosition(splitKey)) {
                return null;
            }
            this.source = primary;
            return residual;
        }
    }

    static class HBaseSource
    extends BoundedSource<Result> {
        private final Read read;
        @Nullable
        private Long estimatedSizeBytes;

        HBaseSource(Read read, @Nullable Long estimatedSizeBytes) {
            this.read = read;
            this.estimatedSizeBytes = estimatedSizeBytes;
        }

        HBaseSource withStartKey(ByteKey startKey) throws IOException {
            Preconditions.checkNotNull(startKey, "startKey");
            Read newRead = new Read(this.read.serializableConfiguration, this.read.tableId, new SerializableScan(new Scan(this.read.serializableScan.get()).setStartRow(startKey.getBytes())));
            return new HBaseSource(newRead, this.estimatedSizeBytes);
        }

        HBaseSource withEndKey(ByteKey endKey) throws IOException {
            Preconditions.checkNotNull(endKey, "endKey");
            Read newRead = new Read(this.read.serializableConfiguration, this.read.tableId, new SerializableScan(new Scan(this.read.serializableScan.get()).setStopRow(endKey.getBytes())));
            return new HBaseSource(newRead, this.estimatedSizeBytes);
        }

        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
            if (this.estimatedSizeBytes == null) {
                this.estimatedSizeBytes = this.estimateSizeBytes();
                LOG.debug("Estimated size {} bytes for table {} and scan {}", new Object[]{this.estimatedSizeBytes, this.read.tableId, this.read.serializableScan.get()});
            }
            return this.estimatedSizeBytes;
        }

        private long estimateSizeBytes() throws Exception {
            long estimatedSizeBytes = 0L;
            Configuration configuration = this.read.serializableConfiguration.get();
            try (Connection connection = ConnectionFactory.createConnection((Configuration)configuration);){
                List<HRegionLocation> regionLocations = this.getRegionLocations(connection);
                TreeSet<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
                for (HRegionLocation regionLocation : regionLocations) {
                    tableRegions.add(regionLocation.getRegionInfo().getRegionName());
                }
                Admin admin = connection.getAdmin();
                ClusterStatus clusterStatus = admin.getClusterStatus();
                Collection servers = clusterStatus.getServers();
                for (ServerName serverName : servers) {
                    ServerLoad serverLoad = clusterStatus.getLoad(serverName);
                    for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
                        byte[] regionId = regionLoad.getName();
                        if (!tableRegions.contains(regionId)) continue;
                        long regionSizeBytes = (long)regionLoad.getStorefileSizeMB() * 0x100000L;
                        estimatedSizeBytes += regionSizeBytes;
                    }
                }
            }
            return estimatedSizeBytes;
        }

        private List<HRegionLocation> getRegionLocations(Connection connection) throws Exception {
            Scan scan = this.read.serializableScan.get();
            byte[] startRow = scan.getStartRow();
            byte[] stopRow = scan.getStopRow();
            ArrayList<HRegionLocation> regionLocations = new ArrayList<HRegionLocation>();
            boolean scanWithNoLowerBound = startRow.length == 0;
            boolean scanWithNoUpperBound = stopRow.length == 0;
            TableName tableName = TableName.valueOf((String)this.read.tableId);
            RegionLocator regionLocator = connection.getRegionLocator(tableName);
            List tableRegionInfos = regionLocator.getAllRegionLocations();
            for (HRegionLocation regionLocation : tableRegionInfos) {
                boolean isLastRegion;
                byte[] startKey = regionLocation.getRegionInfo().getStartKey();
                byte[] endKey = regionLocation.getRegionInfo().getEndKey();
                boolean bl = isLastRegion = endKey.length == 0;
                if (!scanWithNoLowerBound && !isLastRegion && Bytes.compareTo((byte[])startRow, (byte[])endKey) >= 0 || !scanWithNoUpperBound && Bytes.compareTo((byte[])stopRow, (byte[])startKey) <= 0) continue;
                regionLocations.add(regionLocation);
            }
            return regionLocations;
        }

        private List<HBaseSource> splitBasedOnRegions(List<HRegionLocation> regionLocations, int numSplits) throws Exception {
            Scan scan = this.read.serializableScan.get();
            byte[] startRow = scan.getStartRow();
            byte[] stopRow = scan.getStopRow();
            ArrayList<HBaseSource> sources = new ArrayList<HBaseSource>(numSplits);
            boolean scanWithNoLowerBound = startRow.length == 0;
            boolean scanWithNoUpperBound = stopRow.length == 0;
            for (HRegionLocation regionLocation : regionLocations) {
                byte[] startKey = regionLocation.getRegionInfo().getStartKey();
                byte[] endKey = regionLocation.getRegionInfo().getEndKey();
                boolean isLastRegion = endKey.length == 0;
                String host = regionLocation.getHostnamePort();
                byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo((byte[])startKey, (byte[])startRow) >= 0 ? startKey : startRow;
                byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo((byte[])endKey, (byte[])stopRow) <= 0) && !isLastRegion ? endKey : stopRow;
                LOG.debug("{} {} {} {} {}", new Object[]{sources.size(), host, this.read.tableId, Bytes.toString((byte[])splitStart), Bytes.toString((byte[])splitStop)});
                Scan newScan = new Scan(scan).setStartRow(splitStart).setStopRow(splitStop);
                Read newRead = new Read(this.read.serializableConfiguration, this.read.tableId, new SerializableScan(newScan));
                sources.add(new HBaseSource(newRead, this.estimatedSizeBytes));
            }
            return sources;
        }

        public List<? extends BoundedSource<Result>> split(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
            LOG.debug("desiredBundleSize {} bytes", (Object)desiredBundleSizeBytes);
            long estimatedSizeBytes = this.getEstimatedSizeBytes(options);
            int numSplits = 1;
            if (estimatedSizeBytes > 0L && desiredBundleSizeBytes > 0L) {
                numSplits = (int)Math.ceil((double)estimatedSizeBytes / (double)desiredBundleSizeBytes);
            }
            try (Connection connection = ConnectionFactory.createConnection((Configuration)this.read.getConfiguration());){
                List<HRegionLocation> regionLocations = this.getRegionLocations(connection);
                int realNumSplits = numSplits < regionLocations.size() ? regionLocations.size() : numSplits;
                LOG.debug("Suggested {} bundle(s) based on size", (Object)numSplits);
                LOG.debug("Suggested {} bundle(s) based on number of regions", (Object)regionLocations.size());
                List<HBaseSource> sources = this.splitBasedOnRegions(regionLocations, realNumSplits);
                LOG.debug("Split into {} bundle(s)", (Object)sources.size());
                if (numSplits >= 1) {
                    List<HBaseSource> list = sources;
                    return list;
                }
                List<HBaseSource> list = Collections.singletonList(this);
                return list;
            }
        }

        public BoundedSource.BoundedReader<Result> createReader(PipelineOptions pipelineOptions) throws IOException {
            return new HBaseReader(this);
        }

        public void validate() {
            this.read.validate(null);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            this.read.populateDisplayData(builder);
        }

        public Coder<Result> getOutputCoder() {
            return HBaseResultCoder.of();
        }
    }

    public static class Read
    extends PTransform<PBegin, PCollection<Result>> {
        private final SerializableConfiguration serializableConfiguration;
        private final String tableId;
        private final SerializableScan serializableScan;

        public Read withConfiguration(Configuration configuration) {
            Preconditions.checkArgument(configuration != null, "configuration can not be null");
            return new Read(new SerializableConfiguration(configuration), this.tableId, this.serializableScan);
        }

        public Read withTableId(String tableId) {
            Preconditions.checkArgument(tableId != null, "tableIdcan not be null");
            return new Read(this.serializableConfiguration, tableId, this.serializableScan);
        }

        public Read withScan(Scan scan) {
            Preconditions.checkArgument(scan != null, "scancan not be null");
            return new Read(this.serializableConfiguration, this.tableId, new SerializableScan(scan));
        }

        public Read withFilter(Filter filter) {
            Preconditions.checkArgument(filter != null, "filtercan not be null");
            return this.withScan(this.serializableScan.get().setFilter(filter));
        }

        public Read withKeyRange(ByteKeyRange keyRange) {
            Preconditions.checkArgument(keyRange != null, "keyRangecan not be null");
            byte[] startRow = keyRange.getStartKey().getBytes();
            byte[] stopRow = keyRange.getEndKey().getBytes();
            return this.withScan(this.serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
        }

        public Read withKeyRange(byte[] startRow, byte[] stopRow) {
            Preconditions.checkArgument(startRow != null, "startRowcan not be null");
            Preconditions.checkArgument(stopRow != null, "stopRowcan not be null");
            ByteKeyRange keyRange = ByteKeyRange.of((ByteKey)ByteKey.copyFrom((byte[])startRow), (ByteKey)ByteKey.copyFrom((byte[])stopRow));
            return this.withKeyRange(keyRange);
        }

        private Read(SerializableConfiguration serializableConfiguration, String tableId, SerializableScan serializableScan) {
            this.serializableConfiguration = serializableConfiguration;
            this.tableId = tableId;
            this.serializableScan = serializableScan;
        }

        public PCollection<Result> expand(PBegin input) {
            Preconditions.checkArgument(this.serializableConfiguration != null, "withConfiguration() is required");
            Preconditions.checkArgument(!this.tableId.isEmpty(), "withTableId() is required");
            try (Connection connection = ConnectionFactory.createConnection((Configuration)this.serializableConfiguration.get());){
                Admin admin = connection.getAdmin();
                Preconditions.checkArgument(admin.tableExists(TableName.valueOf((String)this.tableId)), "Table %s does not exist", (Object)this.tableId);
            }
            catch (IOException e) {
                LOG.warn("Error checking whether table {} exists; proceeding.", (Object)this.tableId, (Object)e);
            }
            HBaseSource source = new HBaseSource(this, null);
            return (PCollection)input.getPipeline().apply((PTransform)org.apache.beam.sdk.io.Read.from((BoundedSource)source));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item((String)"configuration", (String)this.serializableConfiguration.get().toString()));
            builder.add(DisplayData.item((String)"tableId", (String)this.tableId));
            builder.addIfNotNull(DisplayData.item((String)"scan", (String)this.serializableScan.get().toString()));
        }

        public String getTableId() {
            return this.tableId;
        }

        public Configuration getConfiguration() {
            return this.serializableConfiguration.get();
        }

        public ByteKeyRange getKeyRange() {
            byte[] startRow = this.serializableScan.get().getStartRow();
            byte[] stopRow = this.serializableScan.get().getStopRow();
            return ByteKeyRange.of((ByteKey)ByteKey.copyFrom((byte[])startRow), (ByteKey)ByteKey.copyFrom((byte[])stopRow));
        }
    }
}

