/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigtable.beam;

import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.beam.CloudBigtableConfiguration;
import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.util.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.values.KV;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class CloudBigtableIOIntegrationTest {
    private static final String BIGTABLE_PROJECT_KEY = "google.bigtable.project.id";
    private static final String BIGTABLE_INSTANCE_KEY = "google.bigtable.instance.id";
    public static final byte[] COLUMN_FAMILY = Bytes.toBytes((String)"test_family");
    public static final byte[] QUALIFIER1 = Bytes.toBytes((String)"qualifier1");
    private static final Logger LOG = new Logger(CloudBigtableIOIntegrationTest.class);
    private static final String projectId = System.getProperty("google.bigtable.project.id");
    private static final String instanceId = System.getProperty("google.bigtable.instance.id");
    private static final int LARGE_VALUE_SIZE = 201326;
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private static Connection connection;
    private static CloudBigtableConfiguration config;

    public static TableName newTestTableName() {
        return TableName.valueOf((String)("test-dataflow-" + UUID.randomUUID()));
    }

    private static TableName createNewTable(Admin admin) throws IOException {
        TableName tableName = CloudBigtableIOIntegrationTest.newTestTableName();
        admin.createTable((TableDescriptor)new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(COLUMN_FAMILY)));
        return tableName;
    }

    @BeforeClass
    public static void setup() throws IOException {
        config = new CloudBigtableConfiguration.Builder().withProjectId(projectId).withInstanceId(instanceId).build();
        connection = BigtableConfiguration.connect((String)projectId, (String)instanceId);
    }

    @AfterClass
    public static void shutdown() throws IOException {
        if (connection != null) {
            connection.close();
        }
    }

    private static CloudBigtableTableConfiguration createTableConfig(TableName tableName) {
        CloudBigtableTableConfiguration.Builder builder = new CloudBigtableTableConfiguration.Builder();
        config.copyConfig((CloudBigtableConfiguration.Builder)builder);
        return builder.withTableId(tableName.getNameAsString()).build();
    }

    private static CloudBigtableScanConfiguration createScanConfig(TableName tableName) {
        CloudBigtableScanConfiguration.Builder builder = new CloudBigtableScanConfiguration.Builder();
        config.copyConfig((CloudBigtableConfiguration.Builder)builder);
        return builder.withTableId(tableName.getNameAsString()).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore
    public void testWriteToTable_dataWrittenBuffered() throws Exception {
        int INSERT_COUNT = 50;
        try (Admin admin = connection.getAdmin();){
            TableName tableName = CloudBigtableIOIntegrationTest.createNewTable(admin);
            CloudBigtableIO.CloudBigtableSingleTableBufferedWriteFn writer = new CloudBigtableIO.CloudBigtableSingleTableBufferedWriteFn(CloudBigtableIOIntegrationTest.createTableConfig(tableName));
            try {
                this.writeThroughDataflow((DoFn<Mutation, Void>)writer, 50);
                this.checkTableRowCount(tableName, 50);
            }
            finally {
                admin.deleteTable(tableName);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore
    public void testWriteToTable_multiTablewrites() throws Exception {
        int INSERT_COUNT_PER_TABLE = 50;
        int BATCH_SIZE = 25;
        try (Admin admin = connection.getAdmin();){
            TableName tableName1 = CloudBigtableIOIntegrationTest.createNewTable(admin);
            TableName tableName2 = CloudBigtableIOIntegrationTest.createNewTable(admin);
            CloudBigtableIO.CloudBigtableMultiTableWriteFn writer = new CloudBigtableIO.CloudBigtableMultiTableWriteFn(config);
            try {
                DoFnTester tester = DoFnTester.of((DoFn)writer);
                tester.processBundle((Object[])new KV[]{this.createKV(tableName1, 0, BATCH_SIZE), this.createKV(tableName2, 0, BATCH_SIZE), this.createKV(tableName1, BATCH_SIZE, BATCH_SIZE), this.createKV(tableName2, BATCH_SIZE, BATCH_SIZE)});
                this.checkTableRowCount(tableName1, 50);
                this.checkTableRowCount(tableName2, 50);
            }
            finally {
                admin.deleteTable(tableName1);
                admin.deleteTable(tableName2);
            }
        }
    }

    protected KV<String, Iterable<Mutation>> createKV(TableName tableName, int start_count, int insertCount) {
        ArrayList<Put> mutations = new ArrayList<Put>();
        for (int i = 0; i < insertCount; ++i) {
            byte[] row = Bytes.toBytes((String)("row_" + (i + start_count)));
            mutations.add(new Put(row).addColumn(COLUMN_FAMILY, QUALIFIER1, Bytes.toBytes((String)RandomStringUtils.randomAlphanumeric((int)8))));
        }
        return KV.of((Object)tableName.getNameAsString(), mutations);
    }

    private void writeThroughDataflow(DoFn<Mutation, Void> writer, int insertCount) throws Exception {
        DoFnTester fnTester = DoFnTester.of(writer);
        for (int i = 0; i < insertCount; ++i) {
            byte[] row = Bytes.toBytes((String)("row_" + i));
            Put mutation = new Put(row).addColumn(COLUMN_FAMILY, QUALIFIER1, Bytes.toBytes((String)RandomStringUtils.randomAlphanumeric((int)8)));
            fnTester.processBundle((Object[])new Mutation[]{mutation});
        }
    }

    private void checkTableRowCount(TableName tableName, int rowCount) throws IOException {
        int readCount = 0;
        try (Table table = connection.getTable(tableName);
             ResultScanner scanner = table.getScanner(new Scan());){
            while (scanner.next() != null) {
                ++readCount;
            }
        }
        Assert.assertEquals((long)rowCount, (long)readCount);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore
    public void testReadFromTable_singleResultDataflowReader() throws Exception {
        int INSERT_COUNT = 50;
        try (Admin admin = connection.getAdmin();){
            TableName tableName = CloudBigtableIOIntegrationTest.createNewTable(admin);
            try {
                this.writeViaTable(tableName, 50);
                this.checkTableRowCountViaDataflowResultReader(tableName, 50);
            }
            finally {
                admin.deleteTable(tableName);
            }
        }
    }

    private void writeViaTable(TableName tableName, int rowCount) throws IOException {
        ArrayList<Put> puts = new ArrayList<Put>();
        for (int i = 0; i < rowCount; ++i) {
            byte[] row = Bytes.toBytes((String)("row_" + i));
            puts.add(new Put(row).addColumn(COLUMN_FAMILY, QUALIFIER1, Bytes.toBytes((String)RandomStringUtils.randomAlphanumeric((int)8))));
        }
        try (Table t = connection.getTable(tableName);){
            t.put(puts);
        }
    }

    private void checkTableRowCountViaDataflowResultReader(TableName tableName, int rowCount) throws Exception {
        BoundedSource source = CloudBigtableIO.read((CloudBigtableScanConfiguration)CloudBigtableIOIntegrationTest.createScanConfig(tableName));
        List splits = source.split(0x100000L, null);
        int count = 0;
        for (BoundedSource sourceWithKeys : splits) {
            BoundedSource.BoundedReader reader = sourceWithKeys.createReader(null);
            Throwable throwable = null;
            try {
                reader.start();
                while (reader.getCurrent() != null) {
                    ++count;
                    reader.advance();
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (reader == null) continue;
                if (throwable != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                reader.close();
            }
        }
        Assert.assertEquals((long)rowCount, (long)count);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore
    public void testEstimatedAndSplitForSmallTable() throws Exception {
        try (Admin admin = connection.getAdmin();){
            LOG.info("Creating table in testEstimatedAndSplitForSmallTable()", new Object[0]);
            TableName tableName = CloudBigtableIOIntegrationTest.createNewTable(admin);
            try (Table table = connection.getTable(tableName);){
                table.put(Arrays.asList(new Put(Bytes.toBytes((String)"row1")).addColumn(COLUMN_FAMILY, QUALIFIER1, Bytes.toBytes((String)"1")), new Put(Bytes.toBytes((String)"row2")).addColumn(COLUMN_FAMILY, QUALIFIER1, Bytes.toBytes((String)"2"))));
            }
            LOG.info("getSampleKeys() in testEstimatedAndSplitForSmallTable()", new Object[0]);
            try {
                CloudBigtableIO.Source source = (CloudBigtableIO.Source)CloudBigtableIO.read((CloudBigtableScanConfiguration)CloudBigtableIOIntegrationTest.createScanConfig(tableName));
                List sampleRowKeys = source.getSampleRowKeys();
                LOG.info("Creating BoundedSource in testEstimatedAndSplitForSmallTable()", new Object[0]);
                long estimatedSizeBytes = source.getEstimatedSizeBytes(null);
                KeyOffset lastSample = (KeyOffset)sampleRowKeys.get(sampleRowKeys.size() - 1);
                Assert.assertEquals((long)lastSample.getOffsetBytes(), (long)estimatedSizeBytes);
                LOG.info("Creating Bundles in testEstimatedAndSplitForSmallTable()", new Object[0]);
                List bundles = source.split(estimatedSizeBytes / 2L + 1L, null);
                LOG.info("Created Bundles in testEstimatedAndSplitForSmallTable()", new Object[0]);
                Assert.assertEquals((long)(sampleRowKeys.size() * 2 - 1), (long)bundles.size());
                Assert.assertSame((Object)sampleRowKeys, (Object)source.getSampleRowKeys());
            }
            finally {
                LOG.info("Deleting table in testEstimatedAndSplitForSmallTable()", new Object[0]);
                admin.deleteTable(tableName);
                LOG.info("Deleted table in testEstimatedAndSplitForSmallTable()", new Object[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEstimatedAndSplitForLargeTable() throws Exception {
        try (Admin admin = connection.getAdmin();){
            LOG.info("Creating table in testEstimatedAndSplitForLargeTable()", new Object[0]);
            TableName tableName = CloudBigtableIOIntegrationTest.createNewTable(admin);
            int rowCount = 1000;
            LOG.info("Adding %d rows in testEstimatedAndSplitForLargeTable()", new Object[]{1000});
            try (BufferedMutator mutator = connection.getBufferedMutator(tableName);){
                for (int i = 0; i < 1000; ++i) {
                    byte[] largeValue = Bytes.toBytes((String)RandomStringUtils.randomAlphanumeric((int)201326));
                    mutator.mutate((Mutation)new Put(Bytes.toBytes((String)("row" + i))).addColumn(COLUMN_FAMILY, QUALIFIER1, largeValue));
                }
            }
            try {
                LOG.info("Getting Source in testEstimatedAndSplitForLargeTable()", new Object[0]);
                CloudBigtableIO.Source source = (CloudBigtableIO.Source)CloudBigtableIO.read((CloudBigtableScanConfiguration)CloudBigtableIOIntegrationTest.createScanConfig(tableName));
                List sampleRowKeys = source.getSampleRowKeys();
                LOG.info("Getting estimated size in testEstimatedAndSplitForLargeTable()", new Object[0]);
                long estimatedSizeBytes = source.getEstimatedSizeBytes(null);
                KeyOffset lastSample = (KeyOffset)sampleRowKeys.get(sampleRowKeys.size() - 1);
                Assert.assertEquals((long)lastSample.getOffsetBytes(), (long)estimatedSizeBytes);
                LOG.info("Getting Bundles in testEstimatedAndSplitForLargeTable()", new Object[0]);
                List bundles = source.split(((KeyOffset)sampleRowKeys.get(0)).getOffsetBytes() / 2L, null);
                Assert.assertEquals((long)(sampleRowKeys.size() * 2 - 1), (long)bundles.size());
                AtomicInteger count = new AtomicInteger();
                LOG.info("Reading Bundles in testEstimatedAndSplitForLargeTable()", new Object[0]);
                ExecutorService es = Executors.newCachedThreadPool();
                try {
                    for (BoundedSource bundle : bundles) {
                        es.submit(() -> {
                            try (BoundedSource.BoundedReader reader = bundle.createReader(null);){
                                reader.start();
                                while (reader.getCurrent() != null) {
                                    count.incrementAndGet();
                                    reader.advance();
                                }
                            }
                            catch (IOException e) {
                                LOG.warn("Could not read bundle: %s", (Throwable)e, new Object[]{bundle});
                            }
                        });
                    }
                }
                finally {
                    LOG.info("Shutting down executor in testEstimatedAndSplitForLargeTable()", new Object[0]);
                    es.shutdown();
                    while (!es.isTerminated()) {
                        es.awaitTermination(1L, TimeUnit.SECONDS);
                    }
                }
                Assert.assertSame((Object)sampleRowKeys, (Object)source.getSampleRowKeys());
                Assert.assertEquals((long)1000L, (long)count.intValue());
            }
            finally {
                LOG.info("Deleting table in testEstimatedAndSplitForLargeTable()", new Object[0]);
                admin.deleteTable(tableName);
            }
        }
    }
}

