/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigtable.data.v2.stub.changestream;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.StreamContinuationToken;
import com.google.bigtable.v2.StreamContinuationTokens;
import com.google.bigtable.v2.StreamPartition;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.CloseStream;
import com.google.cloud.bigtable.data.v2.models.Heartbeat;
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.truth.Truth;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.grpc.BindableService;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcServerRule;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Instant;

@RunWith(value=JUnit4.class)
public class ReadChangeStreamRetryTest {
    private static final String PROJECT_ID = "fake-project";
    private static final String INSTANCE_ID = "fake-instance";
    private static final String TABLE_ID = "fake-table";
    private static final String START_KEY_CLOSED = "a";
    private static final String END_KEY_OPEN = "b";
    private static final String HEARTBEAT_TOKEN = "heartbeat-token";
    private static final String CLOSE_STREAM_TOKEN = "close-stream-token";
    private static final String DATA_CHANGE_TOKEN = "data-change-token";
    private static final Instant REQUEST_START_TIME = Instant.ofEpochSecond((long)0L, (long)1000L);
    @Rule
    public GrpcServerRule serverRule = new GrpcServerRule();
    private TestBigtableService service;
    private BigtableDataClient client;

    @Before
    public void setUp() throws IOException {
        this.service = new TestBigtableService();
        this.serverRule.getServiceRegistry().addService((BindableService)this.service);
        BigtableDataSettings.Builder settings = BigtableDataSettings.newBuilderForEmulator((int)this.serverRule.getServer().getPort()).setProjectId(PROJECT_ID).setInstanceId(INSTANCE_ID).setCredentialsProvider((CredentialsProvider)NoCredentialsProvider.create());
        ((EnhancedBigtableStubSettings.Builder)settings.stubSettings().setTransportChannelProvider((TransportChannelProvider)FixedTransportChannelProvider.create((TransportChannel)GrpcTransportChannel.create((ManagedChannel)this.serverRule.getChannel())))).build();
        this.client = BigtableDataClient.create((BigtableDataSettings)settings.build());
    }

    @After
    public void tearDown() {
        if (this.client != null) {
            this.client.close();
        }
    }

    private StreamContinuationToken createStreamContinuationToken(@Nonnull String token) {
        return StreamContinuationToken.newBuilder().setPartition(StreamPartition.newBuilder().setRowRange(RowRange.newBuilder().setStartKeyClosed(ByteString.copyFromUtf8((String)START_KEY_CLOSED)).setEndKeyOpen(ByteString.copyFromUtf8((String)END_KEY_OPEN)).build()).build()).setToken(token).build();
    }

    private StreamPartition createNewPartitionForCloseStream() {
        return StreamPartition.newBuilder().setRowRange(RowRange.newBuilder().setStartKeyClosed(ByteString.copyFromUtf8((String)START_KEY_CLOSED)).setEndKeyOpen(ByteString.copyFromUtf8((String)END_KEY_OPEN))).build();
    }

    private ReadChangeStreamResponse.Heartbeat createHeartbeat(StreamContinuationToken streamContinuationToken) {
        return ReadChangeStreamResponse.Heartbeat.newBuilder().setContinuationToken(streamContinuationToken).setEstimatedLowWatermark(Timestamp.newBuilder().setSeconds(1000L).build()).build();
    }

    private ReadChangeStreamResponse.CloseStream createCloseStream(boolean isOk) {
        ReadChangeStreamResponse.CloseStream.Builder builder = ReadChangeStreamResponse.CloseStream.newBuilder();
        if (isOk) {
            builder.setStatus(com.google.rpc.Status.newBuilder().setCode(0));
        } else {
            builder.setStatus(com.google.rpc.Status.newBuilder().setCode(11)).addContinuationTokens(this.createStreamContinuationToken(CLOSE_STREAM_TOKEN)).addNewPartitions(this.createNewPartitionForCloseStream());
        }
        return builder.build();
    }

    private ReadChangeStreamResponse.DataChange createDataChange(boolean done) {
        Mutation deleteFromFamily = Mutation.newBuilder().setDeleteFromFamily(Mutation.DeleteFromFamily.newBuilder().setFamilyName("fake-family").build()).build();
        ReadChangeStreamResponse.DataChange.Builder dataChangeBuilder = ReadChangeStreamResponse.DataChange.newBuilder().setType(ReadChangeStreamResponse.DataChange.Type.USER).setSourceClusterId("fake-source-cluster-id").setRowKey(ByteString.copyFromUtf8((String)"key")).setCommitTimestamp(Timestamp.newBuilder().setSeconds(100L).build()).setTiebreaker(100).addChunks(ReadChangeStreamResponse.MutationChunk.newBuilder().setMutation(deleteFromFamily));
        if (done) {
            dataChangeBuilder.setDone(true);
            dataChangeBuilder.setEstimatedLowWatermark(Timestamp.newBuilder().setSeconds(1L).build());
            dataChangeBuilder.setToken(DATA_CHANGE_TOKEN);
        }
        return dataChangeBuilder.build();
    }

    @Test
    public void happyPathHeartbeatTest() {
        ReadChangeStreamResponse heartbeatResponse = ReadChangeStreamResponse.newBuilder().setHeartbeat(this.createHeartbeat(this.createStreamContinuationToken(HEARTBEAT_TOKEN))).build();
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWith(heartbeatResponse));
        List<ChangeStreamRecord> actualResults = this.getResults();
        Truth.assertThat((Integer)actualResults.size()).isEqualTo((Object)1);
        Assert.assertTrue((boolean)(actualResults.get(0) instanceof Heartbeat));
    }

    @Test
    public void happyPathCloseStreamTest() {
        ReadChangeStreamResponse closeStreamResponse = ReadChangeStreamResponse.newBuilder().setCloseStream(this.createCloseStream(true)).build();
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWith(closeStreamResponse));
        List<ChangeStreamRecord> actualResults = this.getResults();
        Truth.assertThat((Integer)actualResults.size()).isEqualTo((Object)1);
        Assert.assertTrue((boolean)(actualResults.get(0) instanceof CloseStream));
    }

    @Test
    public void happyPathCompleteDataChangeTest() {
        ReadChangeStreamResponse dataChangeResponse = ReadChangeStreamResponse.newBuilder().setDataChange(this.createDataChange(true)).build();
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWith(dataChangeResponse));
        List<ChangeStreamRecord> actualResults = this.getResults();
        Truth.assertThat((Integer)actualResults.size()).isEqualTo((Object)1);
        Assert.assertTrue((boolean)(actualResults.get(0) instanceof ChangeStreamMutation));
    }

    @Test
    public void singleHeartbeatImmediateRetryTest() {
        ReadChangeStreamResponse heartbeatResponse = ReadChangeStreamResponse.newBuilder().setHeartbeat(this.createHeartbeat(this.createStreamContinuationToken(HEARTBEAT_TOKEN))).build();
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWithStatus(Status.Code.UNAVAILABLE));
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWith(heartbeatResponse));
        List<ChangeStreamRecord> actualResults = this.getResults();
        Truth.assertThat((Integer)actualResults.size()).isEqualTo((Object)1);
        Assert.assertTrue((boolean)(actualResults.get(0) instanceof Heartbeat));
    }

    @Test
    public void singleCloseStreamImmediateRetryTest() {
        ReadChangeStreamResponse closeStreamResponse = ReadChangeStreamResponse.newBuilder().setCloseStream(this.createCloseStream(false)).build();
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWithStatus(Status.Code.UNAVAILABLE));
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWith(closeStreamResponse));
        List<ChangeStreamRecord> actualResults = this.getResults();
        Truth.assertThat((Integer)actualResults.size()).isEqualTo((Object)1);
        Assert.assertTrue((boolean)(actualResults.get(0) instanceof CloseStream));
    }

    @Test
    public void singleCompleteDataChangeImmediateRetryTest() {
        ReadChangeStreamResponse dataChangeResponse = ReadChangeStreamResponse.newBuilder().setDataChange(this.createDataChange(true)).build();
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWithStatus(Status.Code.UNAVAILABLE));
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWith(dataChangeResponse));
        List<ChangeStreamRecord> actualResults = this.getResults();
        Truth.assertThat((Integer)actualResults.size()).isEqualTo((Object)1);
        Assert.assertTrue((boolean)(actualResults.get(0) instanceof ChangeStreamMutation));
    }

    @Test
    public void errorAfterHeartbeatShouldResumeWithTokenTest() {
        StreamContinuationToken streamContinuationToken = this.createStreamContinuationToken(HEARTBEAT_TOKEN);
        ReadChangeStreamResponse heartbeatResponse = ReadChangeStreamResponse.newBuilder().setHeartbeat(this.createHeartbeat(streamContinuationToken)).build();
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWith(heartbeatResponse).respondWithStatus(Status.Code.UNAVAILABLE));
        this.service.expectations.add(RpcExpectation.create().expectRequest(StreamContinuationTokens.newBuilder().addTokens(streamContinuationToken).build()));
        List<ChangeStreamRecord> actualResults = this.getResults();
        Truth.assertThat((Integer)actualResults.size()).isEqualTo((Object)1);
        Assert.assertTrue((boolean)(actualResults.get(0) instanceof Heartbeat));
    }

    @Test
    public void errorAfterDataChangeWithDoneShouldResumeWithTokenTest() {
        ReadChangeStreamResponse dataChangeResponse = ReadChangeStreamResponse.newBuilder().setDataChange(this.createDataChange(true)).build();
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWith(dataChangeResponse).respondWithStatus(Status.Code.UNAVAILABLE));
        this.service.expectations.add(RpcExpectation.create().expectRequest(StreamContinuationTokens.newBuilder().addTokens(this.createStreamContinuationToken(DATA_CHANGE_TOKEN)).build()));
        List<ChangeStreamRecord> actualResults = this.getResults();
        Truth.assertThat((Integer)actualResults.size()).isEqualTo((Object)1);
        Assert.assertTrue((boolean)(actualResults.get(0) instanceof ChangeStreamMutation));
    }

    @Test
    public void errorAfterDataChangeWithoutDoneShouldResumeWithTokenTest() {
        ReadChangeStreamResponse dataChangeResponse = ReadChangeStreamResponse.newBuilder().setDataChange(this.createDataChange(false)).build();
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWith(dataChangeResponse).respondWithStatus(Status.Code.UNAVAILABLE));
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest());
        List<ChangeStreamRecord> actualResults = this.getResults();
        Truth.assertThat(actualResults).isEmpty();
    }

    @Test
    public void shouldResumeWithLastTokenTest() {
        ReadChangeStreamResponse dataChangeResponse = ReadChangeStreamResponse.newBuilder().setDataChange(this.createDataChange(true)).build();
        ReadChangeStreamResponse heartbeatResponse = ReadChangeStreamResponse.newBuilder().setHeartbeat(this.createHeartbeat(this.createStreamContinuationToken(HEARTBEAT_TOKEN))).build();
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWith(dataChangeResponse).respondWith(heartbeatResponse).respondWithStatus(Status.Code.UNAVAILABLE));
        this.service.expectations.add(RpcExpectation.create().expectRequest(StreamContinuationTokens.newBuilder().addTokens(this.createStreamContinuationToken(HEARTBEAT_TOKEN)).build()));
        List<ChangeStreamRecord> actualResults = this.getResults();
        Truth.assertThat((Integer)actualResults.size()).isEqualTo((Object)2);
        Assert.assertTrue((boolean)(actualResults.get(0) instanceof ChangeStreamMutation));
        Assert.assertTrue((boolean)(actualResults.get(1) instanceof Heartbeat));
    }

    @Test
    public void retryRstStreamExceptionTest() {
        InternalException exception = new InternalException((Throwable)new StatusRuntimeException(Status.INTERNAL.withDescription("INTERNAL: HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")), (StatusCode)GrpcStatusCode.of((Status.Code)Status.Code.INTERNAL), false);
        ReadChangeStreamResponse heartbeatResponse = ReadChangeStreamResponse.newBuilder().setHeartbeat(this.createHeartbeat(this.createStreamContinuationToken(HEARTBEAT_TOKEN))).build();
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWithException(Status.Code.INTERNAL, (ApiException)exception));
        this.service.expectations.add(RpcExpectation.create().expectInitialRequest().respondWith(heartbeatResponse));
        List<ChangeStreamRecord> actualResults = this.getResults();
        Truth.assertThat((Integer)actualResults.size()).isEqualTo((Object)1);
        Assert.assertTrue((boolean)(actualResults.get(0) instanceof Heartbeat));
    }

    private List<ChangeStreamRecord> getResults() {
        ReadChangeStreamQuery query = ReadChangeStreamQuery.create((String)TABLE_ID).startTime(REQUEST_START_TIME);
        ServerStream actualRecords = this.client.readChangeStream(query.streamPartition(START_KEY_CLOSED, END_KEY_OPEN));
        ArrayList actualValues = Lists.newArrayList();
        for (ChangeStreamRecord record : actualRecords) {
            actualValues.add(record);
        }
        return actualValues;
    }

    private static class TestBigtableService
    extends BigtableGrpc.BigtableImplBase {
        Queue<RpcExpectation> expectations = Queues.newArrayDeque();
        int i = -1;

        private TestBigtableService() {
        }

        public void readChangeStream(ReadChangeStreamRequest request, StreamObserver<ReadChangeStreamResponse> responseObserver) {
            RpcExpectation expectedRpc = this.expectations.poll();
            ++this.i;
            Truth.assertWithMessage((String)("Unexpected request#" + this.i + ":" + request.toString())).that((Object)expectedRpc).isNotNull();
            Truth.assertWithMessage((String)("Unexpected request#" + this.i)).that((Object)request).isEqualTo((Object)expectedRpc.getExpectedRequest());
            for (ReadChangeStreamResponse response : expectedRpc.responses) {
                responseObserver.onNext((Object)response);
            }
            if (expectedRpc.statusCode.toStatus().isOk()) {
                responseObserver.onCompleted();
            } else if (expectedRpc.exception != null) {
                responseObserver.onError((Throwable)expectedRpc.exception);
            } else {
                responseObserver.onError((Throwable)expectedRpc.statusCode.toStatus().asRuntimeException());
            }
        }
    }

    private static class RpcExpectation {
        ReadChangeStreamRequest.Builder requestBuilder = ReadChangeStreamRequest.newBuilder().setTableName(NameUtil.formatTableName((String)"fake-project", (String)"fake-instance", (String)"fake-table")).setPartition(StreamPartition.newBuilder().setRowRange(RowRange.newBuilder().setStartKeyClosed(ByteString.copyFromUtf8((String)"a")).setEndKeyOpen(ByteString.copyFromUtf8((String)"b")).build()).build());
        Status.Code statusCode = Status.Code.OK;
        ApiException exception;
        List<ReadChangeStreamResponse> responses = Lists.newArrayList();

        private RpcExpectation() {
        }

        static RpcExpectation create() {
            return new RpcExpectation();
        }

        RpcExpectation expectInitialRequest() {
            this.requestBuilder.setStartTime(Timestamp.newBuilder().setSeconds(REQUEST_START_TIME.getEpochSecond()).setNanos(REQUEST_START_TIME.getNano()));
            return this;
        }

        RpcExpectation expectRequest(StreamContinuationTokens continuationTokens) {
            this.requestBuilder.setContinuationTokens(continuationTokens);
            return this;
        }

        RpcExpectation respondWithStatus(Status.Code code) {
            this.statusCode = code;
            return this;
        }

        RpcExpectation respondWithException(Status.Code code, ApiException exception) {
            this.statusCode = code;
            this.exception = exception;
            return this;
        }

        RpcExpectation respondWith(ReadChangeStreamResponse ... responses) {
            Collections.addAll(this.responses, responses);
            return this;
        }

        ReadChangeStreamRequest getExpectedRequest() {
            return this.requestBuilder.build();
        }
    }
}

