/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigquery.storage.v1beta2;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.test.Test;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1beta2.FakeBigQueryWrite;
import com.google.cloud.bigquery.storage.v1beta2.FakeScheduledExecutorService;
import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
import com.google.cloud.bigquery.storage.v1beta2.ProtoSchema;
import com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2;
import com.google.common.base.Strings;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class StreamWriterV2Test {
    private static final Logger log = Logger.getLogger(StreamWriterV2Test.class.getName());
    private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
    private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
    private FakeScheduledExecutorService fakeExecutor;
    private FakeBigQueryWrite testBigQueryWrite;
    private static MockServiceHelper serviceHelper;
    private BigQueryWriteClient client;

    @Before
    public void setUp() throws Exception {
        this.testBigQueryWrite = new FakeBigQueryWrite();
        serviceHelper = new MockServiceHelper(UUID.randomUUID().toString(), Arrays.asList(this.testBigQueryWrite));
        serviceHelper.start();
        this.fakeExecutor = new FakeScheduledExecutorService();
        this.testBigQueryWrite.setExecutor(this.fakeExecutor);
        this.client = BigQueryWriteClient.create((BigQueryWriteSettings)((BigQueryWriteSettings.Builder)((BigQueryWriteSettings.Builder)BigQueryWriteSettings.newBuilder().setCredentialsProvider((CredentialsProvider)NoCredentialsProvider.create())).setTransportChannelProvider((TransportChannelProvider)serviceHelper.createChannelProvider())).build());
    }

    @After
    public void tearDown() throws Exception {
        log.info("tearDown called");
        this.client.close();
        serviceHelper.stop();
    }

    private StreamWriterV2 getTestStreamWriterV2() throws IOException {
        return StreamWriterV2.newBuilder((String)TEST_STREAM, (BigQueryWriteClient)this.client).setWriterSchema(this.createProtoSchema()).setTraceId(TEST_TRACE_ID).build();
    }

    private ProtoSchema createProtoSchema() {
        return ProtoSchema.newBuilder().setProtoDescriptor(DescriptorProtos.DescriptorProto.newBuilder().setName("Message").addField(DescriptorProtos.FieldDescriptorProto.newBuilder().setName("foo").setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING).setNumber(1).build()).build()).build();
    }

    private ProtoRows createProtoRows(String[] messages) {
        ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
        for (String message : messages) {
            Test.FooType foo = Test.FooType.newBuilder().setFoo(message).build();
            rowsBuilder.addSerializedRows(foo.toByteString());
        }
        return rowsBuilder.build();
    }

    private AppendRowsResponse createAppendResponse(long offset) {
        return AppendRowsResponse.newBuilder().setAppendResult(AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of((long)offset)).build()).build();
    }

    private AppendRowsResponse createAppendResponseWithError(Status.Code code, String message) {
        return AppendRowsResponse.newBuilder().setError(com.google.rpc.Status.newBuilder().setCode(code.value()).setMessage(message)).build();
    }

    private ApiFuture<AppendRowsResponse> sendTestMessage(StreamWriterV2 writer, String[] messages) {
        return writer.append(this.createProtoRows(messages), -1L);
    }

    private static <T extends Throwable> T assertFutureException(Class<T> expectedThrowable, final Future<?> future) {
        return (T)Assert.assertThrows(expectedThrowable, (ThrowingRunnable)new ThrowingRunnable(){

            public void run() throws Throwable {
                try {
                    future.get();
                }
                catch (ExecutionException ex) {
                    throw ex.getCause();
                }
            }
        });
    }

    private void verifyAppendIsBlocked(final StreamWriterV2 writer) throws Exception {
        Thread appendThread = new Thread(new Runnable(){

            @Override
            public void run() {
                StreamWriterV2Test.this.sendTestMessage(writer, new String[]{"A"});
            }
        });
        appendThread.start();
        TimeUnit.SECONDS.sleep(2L);
        Assert.assertTrue((boolean)appendThread.isAlive());
        appendThread.interrupt();
    }

    private void verifyAppendRequests(long appendCount) {
        Assert.assertEquals((long)appendCount, (long)this.testBigQueryWrite.getAppendRequests().size());
        int i = 0;
        while ((long)i < appendCount) {
            AppendRowsRequest serverRequest = this.testBigQueryWrite.getAppendRequests().get(i);
            Assert.assertTrue((serverRequest.getProtoRows().getRows().getSerializedRowsCount() > 0 ? 1 : 0) != 0);
            Assert.assertEquals((long)i, (long)serverRequest.getOffset().getValue());
            if (i == 0) {
                Assert.assertTrue((boolean)serverRequest.getProtoRows().hasWriterSchema());
                Assert.assertEquals((Object)serverRequest.getWriteStream(), (Object)TEST_STREAM);
                Assert.assertEquals((Object)serverRequest.getTraceId(), (Object)TEST_TRACE_ID);
            } else {
                Assert.assertFalse((boolean)serverRequest.getProtoRows().hasWriterSchema());
                Assert.assertEquals((Object)serverRequest.getWriteStream(), (Object)"");
                Assert.assertEquals((Object)serverRequest.getTraceId(), (Object)"");
            }
            ++i;
        }
    }

    @Test
    public void testBuildBigQueryWriteClientInWriter() throws Exception {
        StreamWriterV2 writer = StreamWriterV2.newBuilder((String)TEST_STREAM).setCredentialsProvider((CredentialsProvider)NoCredentialsProvider.create()).setChannelProvider((TransportChannelProvider)serviceHelper.createChannelProvider()).setWriterSchema(this.createProtoSchema()).build();
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(0L));
        ApiFuture<AppendRowsResponse> appendFuture1 = this.sendTestMessage(writer, new String[]{"A"});
        Assert.assertEquals((long)0L, (long)((AppendRowsResponse)appendFuture1.get()).getAppendResult().getOffset().getValue());
        writer.close();
    }

    @Test
    public void testAppendSuccess() throws Exception {
        StreamWriterV2 writer = this.getTestStreamWriterV2();
        long appendCount = 100L;
        int i = 0;
        while ((long)i < appendCount) {
            this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i));
            ++i;
        }
        ArrayList<ApiFuture> futures = new ArrayList<ApiFuture>();
        int i2 = 0;
        while ((long)i2 < appendCount) {
            futures.add(writer.append(this.createProtoRows(new String[]{String.valueOf(i2)}), (long)i2));
            ++i2;
        }
        i2 = 0;
        while ((long)i2 < appendCount) {
            Assert.assertEquals((long)i2, (long)((AppendRowsResponse)((ApiFuture)futures.get(i2)).get()).getAppendResult().getOffset().getValue());
            ++i2;
        }
        this.verifyAppendRequests(appendCount);
        writer.close();
    }

    @Test
    public void testNoSchema() throws Exception {
        StatusRuntimeException ex = (StatusRuntimeException)Assert.assertThrows(StatusRuntimeException.class, (ThrowingRunnable)new ThrowingRunnable(){

            public void run() throws Throwable {
                StreamWriterV2.newBuilder((String)StreamWriterV2Test.TEST_STREAM, (BigQueryWriteClient)StreamWriterV2Test.this.client).build();
            }
        });
        Assert.assertEquals((Object)ex.getStatus().getCode(), (Object)Status.INVALID_ARGUMENT.getCode());
        Assert.assertTrue((boolean)ex.getStatus().getDescription().contains("Writer schema must be provided"));
    }

    @Test
    public void testInvalidTraceId() throws Exception {
        Assert.assertThrows(IllegalArgumentException.class, (ThrowingRunnable)new ThrowingRunnable(){

            public void run() throws Throwable {
                StreamWriterV2.newBuilder((String)StreamWriterV2Test.TEST_STREAM).setTraceId("abc");
            }
        });
        Assert.assertThrows(IllegalArgumentException.class, (ThrowingRunnable)new ThrowingRunnable(){

            public void run() throws Throwable {
                StreamWriterV2.newBuilder((String)StreamWriterV2Test.TEST_STREAM).setTraceId("abc:");
            }
        });
        Assert.assertThrows(IllegalArgumentException.class, (ThrowingRunnable)new ThrowingRunnable(){

            public void run() throws Throwable {
                StreamWriterV2.newBuilder((String)StreamWriterV2Test.TEST_STREAM).setTraceId(":abc");
            }
        });
    }

    @Test
    public void testAppendSuccessAndConnectionError() throws Exception {
        StreamWriterV2 writer = this.getTestStreamWriterV2();
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(0L));
        this.testBigQueryWrite.addException((Exception)Status.INTERNAL.asException());
        ApiFuture<AppendRowsResponse> appendFuture1 = this.sendTestMessage(writer, new String[]{"A"});
        ApiFuture<AppendRowsResponse> appendFuture2 = this.sendTestMessage(writer, new String[]{"B"});
        Assert.assertEquals((long)0L, (long)((AppendRowsResponse)appendFuture1.get()).getAppendResult().getOffset().getValue());
        ApiException actualError = StreamWriterV2Test.assertFutureException(ApiException.class, appendFuture2);
        Assert.assertEquals((Object)StatusCode.Code.INTERNAL, (Object)actualError.getStatusCode().getCode());
        writer.close();
    }

    @Test
    public void testAppendSuccessAndInStreamError() throws Exception {
        StreamWriterV2 writer = this.getTestStreamWriterV2();
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(0L));
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponseWithError(Status.INVALID_ARGUMENT.getCode(), "test message"));
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(1L));
        ApiFuture<AppendRowsResponse> appendFuture1 = this.sendTestMessage(writer, new String[]{"A"});
        ApiFuture<AppendRowsResponse> appendFuture2 = this.sendTestMessage(writer, new String[]{"B"});
        ApiFuture<AppendRowsResponse> appendFuture3 = this.sendTestMessage(writer, new String[]{"C"});
        Assert.assertEquals((long)0L, (long)((AppendRowsResponse)appendFuture1.get()).getAppendResult().getOffset().getValue());
        StatusRuntimeException actualError = StreamWriterV2Test.assertFutureException(StatusRuntimeException.class, appendFuture2);
        Assert.assertEquals((Object)Status.Code.INVALID_ARGUMENT, (Object)actualError.getStatus().getCode());
        Assert.assertEquals((Object)"test message", (Object)actualError.getStatus().getDescription());
        Assert.assertEquals((long)1L, (long)((AppendRowsResponse)appendFuture3.get()).getAppendResult().getOffset().getValue());
        writer.close();
    }

    @Test
    public void longIdleBetweenAppends() throws Exception {
        StreamWriterV2 writer = this.getTestStreamWriterV2();
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(0L));
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(1L));
        ApiFuture<AppendRowsResponse> appendFuture1 = this.sendTestMessage(writer, new String[]{"A"});
        Assert.assertEquals((long)0L, (long)((AppendRowsResponse)appendFuture1.get()).getAppendResult().getOffset().getValue());
        TimeUnit.SECONDS.sleep(3L);
        ApiFuture<AppendRowsResponse> appendFuture2 = this.sendTestMessage(writer, new String[]{"B"});
        Assert.assertEquals((long)1L, (long)((AppendRowsResponse)appendFuture2.get()).getAppendResult().getOffset().getValue());
        writer.close();
    }

    @Test
    public void testAppendAfterUserClose() throws Exception {
        StreamWriterV2 writer = this.getTestStreamWriterV2();
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(0L));
        ApiFuture<AppendRowsResponse> appendFuture1 = this.sendTestMessage(writer, new String[]{"A"});
        writer.close();
        ApiFuture<AppendRowsResponse> appendFuture2 = this.sendTestMessage(writer, new String[]{"B"});
        Assert.assertEquals((long)0L, (long)((AppendRowsResponse)appendFuture1.get()).getAppendResult().getOffset().getValue());
        Assert.assertTrue((boolean)appendFuture2.isDone());
        StatusRuntimeException actualError = StreamWriterV2Test.assertFutureException(StatusRuntimeException.class, appendFuture2);
        Assert.assertEquals((Object)Status.Code.FAILED_PRECONDITION, (Object)actualError.getStatus().getCode());
    }

    @Test
    public void testAppendAfterServerClose() throws Exception {
        StreamWriterV2 writer = this.getTestStreamWriterV2();
        this.testBigQueryWrite.addException((Exception)Status.INTERNAL.asException());
        ApiFuture<AppendRowsResponse> appendFuture1 = this.sendTestMessage(writer, new String[]{"A"});
        ApiException error1 = StreamWriterV2Test.assertFutureException(ApiException.class, appendFuture1);
        Assert.assertEquals((Object)StatusCode.Code.INTERNAL, (Object)error1.getStatusCode().getCode());
        ApiFuture<AppendRowsResponse> appendFuture2 = this.sendTestMessage(writer, new String[]{"B"});
        Assert.assertTrue((boolean)appendFuture2.isDone());
        StatusRuntimeException error2 = StreamWriterV2Test.assertFutureException(StatusRuntimeException.class, appendFuture2);
        Assert.assertEquals((Object)Status.Code.FAILED_PRECONDITION, (Object)error2.getStatus().getCode());
        writer.close();
    }

    @Test
    public void userCloseWhileRequestInflight() throws Exception {
        final StreamWriterV2 writer = this.getTestStreamWriterV2();
        this.testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2L));
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(0L));
        final ApiFuture<AppendRowsResponse> appendFuture1 = this.sendTestMessage(writer, new String[]{"A"});
        Thread closeThread = new Thread(new Runnable(){

            @Override
            public void run() {
                writer.close();
            }
        });
        closeThread.start();
        Assert.assertThrows(TimeoutException.class, (ThrowingRunnable)new ThrowingRunnable(){

            public void run() throws Throwable {
                appendFuture1.get(1L, TimeUnit.SECONDS);
            }
        });
        closeThread.join(2000L);
        Assert.assertTrue((boolean)appendFuture1.isDone());
        Assert.assertEquals((long)0L, (long)((AppendRowsResponse)appendFuture1.get()).getAppendResult().getOffset().getValue());
    }

    @Test
    public void serverCloseWhileRequestsInflight() throws Exception {
        int i;
        StreamWriterV2 writer = this.getTestStreamWriterV2();
        this.testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2L));
        this.testBigQueryWrite.addException((Exception)Status.INTERNAL.asException());
        int appendCount = 10;
        ArrayList<ApiFuture<AppendRowsResponse>> futures = new ArrayList<ApiFuture<AppendRowsResponse>>();
        for (i = 0; i < appendCount; ++i) {
            futures.add(this.sendTestMessage(writer, new String[]{String.valueOf(i)}));
        }
        for (i = 0; i < appendCount; ++i) {
            ApiException actualError = StreamWriterV2Test.assertFutureException(ApiException.class, (Future)futures.get(i));
            Assert.assertEquals((Object)StatusCode.Code.INTERNAL, (Object)actualError.getStatusCode().getCode());
        }
        writer.close();
    }

    @Test
    public void testZeroMaxInflightRequests() throws Exception {
        StreamWriterV2 writer = StreamWriterV2.newBuilder((String)TEST_STREAM, (BigQueryWriteClient)this.client).setWriterSchema(this.createProtoSchema()).setMaxInflightRequests(0L).build();
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(0L));
        this.verifyAppendIsBlocked(writer);
        writer.close();
    }

    @Test
    public void testZeroMaxInflightBytes() throws Exception {
        StreamWriterV2 writer = StreamWriterV2.newBuilder((String)TEST_STREAM, (BigQueryWriteClient)this.client).setWriterSchema(this.createProtoSchema()).setMaxInflightBytes(0L).build();
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(0L));
        this.verifyAppendIsBlocked(writer);
        writer.close();
    }

    @Test
    public void testOneMaxInflightRequests() throws Exception {
        StreamWriterV2 writer = StreamWriterV2.newBuilder((String)TEST_STREAM, (BigQueryWriteClient)this.client).setWriterSchema(this.createProtoSchema()).setMaxInflightRequests(1L).build();
        this.testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1L));
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(0L));
        long appendStartTimeMs = System.currentTimeMillis();
        ApiFuture<AppendRowsResponse> appendFuture1 = this.sendTestMessage(writer, new String[]{"A"});
        long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs;
        Assert.assertTrue((appendElapsedMs >= 1000L ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)((AppendRowsResponse)appendFuture1.get()).getAppendResult().getOffset().getValue());
        writer.close();
    }

    @Test
    public void testAppendsWithTinyMaxInflightBytes() throws Exception {
        StreamWriterV2 writer = StreamWriterV2.newBuilder((String)TEST_STREAM, (BigQueryWriteClient)this.client).setWriterSchema(this.createProtoSchema()).setMaxInflightBytes(1L).build();
        this.testBigQueryWrite.setResponseSleep(Duration.ofMillis(100L));
        long appendCount = 10L;
        int i = 0;
        while ((long)i < appendCount) {
            this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i));
            ++i;
        }
        ArrayList<ApiFuture> futures = new ArrayList<ApiFuture>();
        long appendStartTimeMs = System.currentTimeMillis();
        int i2 = 0;
        while ((long)i2 < appendCount) {
            futures.add(writer.append(this.createProtoRows(new String[]{String.valueOf(i2)}), (long)i2));
            ++i2;
        }
        long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs;
        Assert.assertTrue((appendElapsedMs >= 1000L ? 1 : 0) != 0);
        int i3 = 0;
        while ((long)i3 < appendCount) {
            Assert.assertEquals((long)i3, (long)((AppendRowsResponse)((ApiFuture)futures.get(i3)).get()).getAppendResult().getOffset().getValue());
            ++i3;
        }
        Assert.assertEquals((long)appendCount, (long)this.testBigQueryWrite.getAppendRequests().size());
        i3 = 0;
        while ((long)i3 < appendCount) {
            Assert.assertEquals((long)i3, (long)this.testBigQueryWrite.getAppendRequests().get(i3).getOffset().getValue());
            ++i3;
        }
        writer.close();
    }

    @Test
    public void testMessageTooLarge() throws Exception {
        StreamWriterV2 writer = this.getTestStreamWriterV2();
        String oversized = Strings.repeat((String)"a", (int)((int)(StreamWriterV2.getApiMaxRequestBytes() + 1L)));
        ApiFuture<AppendRowsResponse> appendFuture1 = this.sendTestMessage(writer, new String[]{oversized});
        Assert.assertTrue((boolean)appendFuture1.isDone());
        StatusRuntimeException actualError = StreamWriterV2Test.assertFutureException(StatusRuntimeException.class, appendFuture1);
        Assert.assertEquals((Object)Status.Code.INVALID_ARGUMENT, (Object)actualError.getStatus().getCode());
        Assert.assertTrue((boolean)actualError.getStatus().getDescription().contains("MessageSize is too large"));
        writer.close();
    }
}

