/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigtable.beam;

import com.google.bigtable.repackaged.com.google.api.gax.rpc.WatchdogTimeoutException;
import com.google.bigtable.repackaged.com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.repackaged.com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.repackaged.com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.repackaged.com.google.bigtable.v2.RowRange;
import com.google.bigtable.repackaged.com.google.bigtable.v2.RowSet;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
import com.google.bigtable.repackaged.com.google.common.collect.ImmutableList;
import com.google.bigtable.repackaged.com.google.protobuf.ByteString;
import com.google.bigtable.repackaged.com.google.protobuf.BytesValue;
import com.google.bigtable.repackaged.com.google.protobuf.StringValue;
import com.google.bigtable.repackaged.io.grpc.BindableService;
import com.google.bigtable.repackaged.io.grpc.Server;
import com.google.bigtable.repackaged.io.grpc.ServerBuilder;
import com.google.bigtable.repackaged.io.grpc.stub.StreamObserver;
import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import com.google.cloud.bigtable.hbase.adapters.read.RowCell;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

public class CloudBigtableIOReaderTest {
    @Rule
    public final MockitoRule mockitoRule = MockitoJUnit.rule();
    @Mock
    Connection mockConnection;
    @Mock
    ResultScanner mockScanner;
    @Mock
    CloudBigtableIO.AbstractSource mockSource;
    private Server server;
    private FakeService fakeService;
    int port;

    @Before
    public void setup() throws IOException {
        try (ServerSocket ss = new ServerSocket(0);){
            this.port = ss.getLocalPort();
        }
        this.fakeService = new FakeService();
        this.server = ServerBuilder.forPort((int)this.port).addService((BindableService)this.fakeService).build().start();
    }

    @After
    public void close() {
        if (this.server != null) {
            this.server.shutdown();
        }
    }

    private CloudBigtableScanConfiguration.Builder createDefaultConfig() {
        return new CloudBigtableScanConfiguration.Builder().withProjectId("test").withInstanceId("test").withTableId("test").withRequest(ReadRowsRequest.getDefaultInstance()).withKeys(new byte[0], new byte[0]);
    }

    @Test
    public void testBasic() throws IOException {
        CloudBigtableIO.Reader underTest = this.initializeReader(this.createDefaultConfig().build());
        this.setRowKey("a");
        Assert.assertTrue((boolean)underTest.start());
        Assert.assertEquals((long)1L, (long)underTest.getRowsReadCount());
        Mockito.when((Object)this.mockScanner.next()).thenReturn(null);
        Assert.assertFalse((boolean)underTest.advance());
        Assert.assertEquals((long)1L, (long)underTest.getRowsReadCount());
        underTest.close();
    }

    private void setRowKey(String rowKey) throws IOException {
        ByteString rowKeyByteString = ByteString.copyFrom((byte[])Bytes.toBytes((String)rowKey));
        Result row = Result.create((List)ImmutableList.of((Object)new RowCell(Bytes.toBytes((String)rowKey), Bytes.toBytes((String)"cf"), Bytes.toBytes((String)"q"), 10L, Bytes.toBytes((String)"value"), (List)ImmutableList.of((Object)"label"))));
        Mockito.when((Object)this.mockScanner.next()).thenReturn((Object)row);
    }

    private CloudBigtableIO.Reader initializeReader(CloudBigtableScanConfiguration config) {
        Mockito.when((Object)this.mockSource.getConfiguration()).thenReturn((Object)config);
        return new CloudBigtableIO.Reader(this.mockSource){

            void initializeScanner() throws IOException {
                this.setConnection(CloudBigtableIOReaderTest.this.mockConnection);
                this.setScanner(CloudBigtableIOReaderTest.this.mockScanner);
            }
        };
    }

    @Test
    public void testPercent() throws IOException {
        byte[] start = "aa".getBytes();
        byte[] end = "zz".getBytes();
        CloudBigtableScanConfiguration config = this.createDefaultConfig().withKeys(start, end).build();
        CloudBigtableIO.Reader underTest = this.initializeReader(config);
        ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of((ByteKeyRange)ByteKeyRange.of((ByteKey)ByteKey.copyFrom((byte[])start), (ByteKey)ByteKey.copyFrom((byte[])end)));
        this.testTrackerAtKey(underTest, tracker, "dd", 1);
        this.testTrackerAtKey(underTest, tracker, "qq", 2);
        double splitAtFraction = (1.0 - tracker.getFractionConsumed()) * 0.5 + tracker.getFractionConsumed();
        ByteKey newSplitEnd = tracker.getRange().interpolateKey(splitAtFraction);
        underTest.splitAtFraction(splitAtFraction);
        tracker.trySplitAtPosition(newSplitEnd);
        Assert.assertEquals((double)tracker.getFractionConsumed(), (double)underTest.getFractionConsumed(), (double)1.0E-4);
    }

    private void testTrackerAtKey(CloudBigtableIO.Reader underTest, ByteKeyRangeTracker tracker, String key, int count) throws IOException {
        this.setRowKey(key);
        tracker.tryReturnRecordAt(true, ByteKey.copyFrom((byte[])key.getBytes()));
        Assert.assertTrue((boolean)underTest.start());
        Assert.assertEquals((long)count, (long)underTest.getRowsReadCount());
        Assert.assertEquals((double)tracker.getFractionConsumed(), (double)underTest.getFractionConsumed(), (double)0.001);
    }

    @Test
    public void testSplits() throws IOException {
        byte[] startKey = "AAAAAAA".getBytes();
        byte[] stopKey = "ZZZZZZZ".getBytes();
        CloudBigtableScanConfiguration config = this.createDefaultConfig().withKeys(startKey, stopKey).build();
        CloudBigtableIO.Source source = (CloudBigtableIO.Source)CloudBigtableIO.read((CloudBigtableScanConfiguration)config);
        CloudBigtableIO.SourceWithKeys sourceWithKeys = source.createSourceWithKeys(startKey, stopKey, 10L);
        CloudBigtableIO.Reader reader = (CloudBigtableIO.Reader)sourceWithKeys.createReader(null);
        ByteKey startByteKey = ByteKey.copyFrom((byte[])startKey);
        ByteKey stopByteKey = ByteKey.copyFrom((byte[])stopKey);
        ByteKeyRangeTracker baseRangeTracker = ByteKeyRangeTracker.of((ByteKeyRange)ByteKeyRange.of((ByteKey)startByteKey, (ByteKey)stopByteKey));
        CloudBigtableIOReaderTest.setKey(reader, baseRangeTracker, ByteKey.copyFrom((byte[])"B".getBytes()));
        for (int i = 0; i < 20; ++i) {
            CloudBigtableIOReaderTest.compare(reader, baseRangeTracker);
            CloudBigtableIOReaderTest.bisect(reader, baseRangeTracker);
            this.split(reader, baseRangeTracker);
        }
    }

    @Test
    public void testRetryIdleTimeoutWithScan() throws Exception {
        byte[] startKey = "A".getBytes();
        byte[] endKey = "B".getBytes();
        List<ReadRowsResponse> responses = this.generateResponses(this.fakeService, "A", "B", true, 10);
        responses.remove(responses.size() - 1);
        Scan scan = new Scan().withStartRow(startKey).withStopRow(endKey);
        CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder().withProjectId("project").withInstanceId("instsance").withTableId("table").withScan(scan).withConfiguration("google.bigtable.emulator.endpoint.host", "localhost:" + this.port).withConfiguration("google.bigtable.idle.timeout.ms", "15000").build();
        CloudBigtableIO.Source source = (CloudBigtableIO.Source)CloudBigtableIO.read((CloudBigtableScanConfiguration)config);
        CloudBigtableIO.Reader reader = (CloudBigtableIO.Reader)source.createReader(null);
        ArrayList<Result> actual = new ArrayList<Result>();
        reader.start();
        actual.add(reader.getCurrent());
        int count = 1;
        boolean sleep = false;
        while (reader.advance()) {
            ++count;
            actual.add(reader.getCurrent());
            if (sleep) continue;
            Thread.sleep(20000L);
            sleep = true;
        }
        Assert.assertTrue((this.fakeService.count.get() > 1 ? 1 : 0) != 0);
        Assert.assertEquals((long)responses.size(), (long)count);
        Assert.assertEquals(responses.stream().map(response -> response.getChunks(0).getRowKey().toStringUtf8()).collect(Collectors.toList()), actual.stream().map(result -> ByteString.copyFrom((byte[])result.getRow()).toStringUtf8()).collect(Collectors.toList()));
    }

    @Test
    public void testRetryIdleTimeoutWithReadRowsRequest() throws Exception {
        byte[] startKey = "A".getBytes();
        byte[] endKey = "B".getBytes();
        List<ReadRowsResponse> responses = this.generateResponses(this.fakeService, "A", "B", true, 10);
        responses.remove(responses.size() - 1);
        ReadRowsRequest request = ReadRowsRequest.newBuilder().setTableName("projects/project/instances/instance/tables/table").setRows(RowSet.newBuilder().addRowRanges(RowRange.newBuilder().setStartKeyClosed(ByteString.copyFrom((byte[])startKey)).setEndKeyOpen(ByteString.copyFrom((byte[])endKey)).build()).build()).build();
        CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder().withProjectId("project").withInstanceId("instsance").withTableId("table").withRequest(request).withConfiguration("google.bigtable.emulator.endpoint.host", "localhost:" + this.port).withConfiguration("google.bigtable.idle.timeout.ms", "15000").build();
        CloudBigtableIO.Source source = (CloudBigtableIO.Source)CloudBigtableIO.read((CloudBigtableScanConfiguration)config);
        CloudBigtableIO.Reader reader = (CloudBigtableIO.Reader)source.createReader(null);
        ArrayList<Result> actual = new ArrayList<Result>();
        reader.start();
        actual.add(reader.getCurrent());
        int count = 1;
        boolean sleep = false;
        while (reader.advance()) {
            ++count;
            actual.add(reader.getCurrent());
            if (sleep) continue;
            Thread.sleep(20000L);
            sleep = true;
        }
        Assert.assertTrue((this.fakeService.count.get() > 1 ? 1 : 0) != 0);
        Assert.assertEquals((long)responses.size(), (long)count);
        Assert.assertEquals(responses.stream().map(response -> response.getChunks(0).getRowKey()).collect(Collectors.toList()), actual.stream().map(result -> ByteString.copyFrom((byte[])result.getRow())).collect(Collectors.toList()));
    }

    @Test
    public void testDisableRetryIdleTimeout() throws Exception {
        byte[] startKey = "A".getBytes();
        byte[] endKey = "B".getBytes();
        List<ReadRowsResponse> responses = this.generateResponses(this.fakeService, "A", "B", true, 10);
        responses.remove(responses.size() - 1);
        ReadRowsRequest request = ReadRowsRequest.newBuilder().setTableName("projects/project/instances/instance/tables/table").setRows(RowSet.newBuilder().addRowRanges(RowRange.newBuilder().setStartKeyClosed(ByteString.copyFrom((byte[])startKey)).setEndKeyOpen(ByteString.copyFrom((byte[])endKey)).build()).build()).build();
        CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder().withProjectId("project").withInstanceId("instsance").withTableId("table").withRequest(request).withConfiguration("google.bigtable.emulator.endpoint.host", "localhost:" + this.port).withConfiguration("google.bigtable.idle.timeout.ms", "15000").withConfiguration("google.cloud.bigtable.retry.idle.timeout", "false").build();
        CloudBigtableIO.Source source = (CloudBigtableIO.Source)CloudBigtableIO.read((CloudBigtableScanConfiguration)config);
        CloudBigtableIO.Reader reader = (CloudBigtableIO.Reader)source.createReader(null);
        reader.start();
        boolean sleep = false;
        try {
            while (reader.advance()) {
                if (sleep) continue;
                Thread.sleep(20000L);
                sleep = true;
            }
            Assert.fail((String)"Should throw idle timeout exception");
        }
        catch (Throwable e) {
            Throwable throwable = CloudBigtableIO.Reader.findCause((Throwable)e, WatchdogTimeoutException.class);
            Assert.assertNotNull((Object)throwable);
            Assert.assertTrue((boolean)throwable.getMessage().contains("idle"));
        }
    }

    private List<ReadRowsResponse> generateResponses(FakeService fakeService, String starKey, String endKey, boolean addEndKey, int numResponses) {
        ArrayList<ReadRowsResponse> responses = new ArrayList<ReadRowsResponse>();
        for (int i = 0; i < numResponses; ++i) {
            responses.add(ReadRowsResponse.newBuilder().addChunks(ReadRowsResponse.CellChunk.newBuilder().setRowKey(ByteString.copyFromUtf8((String)(starKey + i))).setFamilyName(StringValue.of((String)"cf")).setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFrom((byte[])"q".getBytes()))).setTimestampMicros(1000L).setValue(ByteString.copyFromUtf8((String)"value")).setCommitRow(true).build()).build());
        }
        if (addEndKey) {
            responses.add(ReadRowsResponse.newBuilder().addChunks(ReadRowsResponse.CellChunk.newBuilder().setRowKey(ByteString.copyFromUtf8((String)endKey)).setFamilyName(StringValue.of((String)"cf")).setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFrom((byte[])"q".getBytes()))).setTimestampMicros(1000L).setValue(ByteString.copyFromUtf8((String)"value")).setCommitRow(true).build()).build());
        }
        fakeService.addResponses(responses);
        return responses;
    }

    private void split(CloudBigtableIO.Reader reader, ByteKeyRangeTracker baseRangeTracker) {
        double halfway = CloudBigtableIOReaderTest.bisectPercentage(baseRangeTracker);
        reader.splitAtFraction(halfway);
        ByteKey bisectedKey = baseRangeTracker.getRange().interpolateKey(halfway);
        baseRangeTracker.trySplitAtPosition(bisectedKey);
        CloudBigtableIOReaderTest.compare(reader, baseRangeTracker);
    }

    private static void compare(CloudBigtableIO.Reader reader, ByteKeyRangeTracker baseRangeTracker) {
        Assert.assertEquals((double)baseRangeTracker.getFractionConsumed(), (double)reader.getFractionConsumed(), (double)0.01);
    }

    private static void bisect(CloudBigtableIO.Reader reader, ByteKeyRangeTracker baseRangeTracker) {
        double halfway = CloudBigtableIOReaderTest.bisectPercentage(baseRangeTracker);
        ByteKey bisectedKey = baseRangeTracker.getRange().interpolateKey(halfway);
        CloudBigtableIOReaderTest.setKey(reader, baseRangeTracker, bisectedKey);
        CloudBigtableIOReaderTest.compare(reader, baseRangeTracker);
    }

    private static void setKey(CloudBigtableIO.Reader reader, ByteKeyRangeTracker baseRangeTracker, ByteKey key) {
        reader.getRangeTracker().tryReturnRecordAt(true, key);
        baseRangeTracker.tryReturnRecordAt(true, key);
    }

    private static double bisectPercentage(ByteKeyRangeTracker baseRangeTracker) {
        double fractionConsumed = baseRangeTracker.getFractionConsumed();
        return (1.0 + fractionConsumed) / 2.0;
    }

    static class FakeService
    extends BigtableGrpc.BigtableImplBase {
        List<ReadRowsResponse> responses = new ArrayList<ReadRowsResponse>();
        AtomicInteger count = new AtomicInteger(0);

        FakeService() {
        }

        public void readRows(ReadRowsRequest request, StreamObserver<ReadRowsResponse> responseObserver) {
            this.count.getAndIncrement();
            ByteString startKey = request.getRows().getRowRanges(0).getStartKeyClosed();
            ByteString endKey = request.getRows().getRowRanges(0).getEndKeyOpen();
            int index = 0;
            while (index < this.responses.size()) {
                ReadRowsResponse current;
                if (ByteStringComparator.INSTANCE.compare((current = this.responses.get(index++)).getChunks(0).getRowKey(), startKey) < 0 || ByteStringComparator.INSTANCE.compare(current.getChunks(0).getRowKey(), endKey) >= 0) continue;
                responseObserver.onNext((Object)current);
            }
            responseObserver.onCompleted();
        }

        void addResponses(List<ReadRowsResponse> allResponses) {
            this.responses.addAll(allResponses);
        }
    }
}

