/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigquery.storage.v1;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.test.Test;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool;
import com.google.cloud.bigquery.storage.v1.FakeBigQueryWrite;
import com.google.cloud.bigquery.storage.v1.FakeBigQueryWriteImpl;
import com.google.cloud.bigquery.storage.v1.FakeScheduledExecutorService;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(value=JUnit4.class)
public class ConnectionWorkerPoolTest {
    private FakeBigQueryWrite testBigQueryWrite;
    private FakeScheduledExecutorService fakeExecutor;
    private static MockServiceHelper serviceHelper;
    private BigQueryWriteSettings clientSettings;
    private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
    private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/_default";
    private static final String TEST_STREAM_2 = "projects/p1/datasets/d1/tables/t2/streams/_default";
    private static final int MAX_RETRY_NUM_ATTEMPTS = 3;
    private static final long INITIAL_RETRY_MILLIS = 500L;
    private static final double RETRY_MULTIPLIER = 1.3;
    private static final int MAX_RETRY_DELAY_MINUTES = 5;
    private static final RetrySettings retrySettings;

    @Before
    public void setUp() throws Exception {
        this.testBigQueryWrite = new FakeBigQueryWrite();
        serviceHelper = new MockServiceHelper(UUID.randomUUID().toString(), Arrays.asList(this.testBigQueryWrite));
        serviceHelper.start();
        this.fakeExecutor = new FakeScheduledExecutorService();
        this.testBigQueryWrite.setExecutor(this.fakeExecutor);
        this.clientSettings = ((BigQueryWriteSettings.Builder)((BigQueryWriteSettings.Builder)BigQueryWriteSettings.newBuilder().setCredentialsProvider((CredentialsProvider)NoCredentialsProvider.create())).setTransportChannelProvider((TransportChannelProvider)serviceHelper.createChannelProvider())).build();
        ConnectionWorker.Load.setOverwhelmedCountsThreshold((double)0.5);
        ConnectionWorker.Load.setOverwhelmedBytesThreshold((double)0.6);
    }

    @Test
    public void testSingleTableConnection_noOverwhelmedConnection() throws Exception {
        this.testSendRequestsToMultiTable(100, 100000, 8, 1, 1);
    }

    @Test
    public void testSingleTableConnections_overwhelmed() throws Exception {
        this.testSendRequestsToMultiTable(100, 10, 8, 8, 1);
    }

    @Test
    public void testMultiTableConnection_noOverwhelmedConnection() throws Exception {
        this.testSendRequestsToMultiTable(100, 100000, 8, 2, 4);
    }

    @Test
    public void testMultiTableConnections_overwhelmed_reachingMaximum() throws Exception {
        this.testSendRequestsToMultiTable(100, 10, 8, 8, 4);
    }

    @Test
    public void testMultiTableConnections_overwhelmed_overTotalLimit() throws Exception {
        this.testSendRequestsToMultiTable(200, 10, 8, 8, 10);
    }

    @Test
    public void testMultiTableConnections_overwhelmed_notReachingMaximum() throws Exception {
        this.testSendRequestsToMultiTable(20, 10, 8, 4, 4);
    }

    private void testSendRequestsToMultiTable(int requestToSend, int maxRequests, int maxConnections, int expectedConnectionCount, int tableCount) throws IOException, ExecutionException, InterruptedException {
        ConnectionWorkerPool.setOptions((ConnectionWorkerPool.Settings)ConnectionWorkerPool.Settings.builder().setMinConnectionsPerRegion(2).setMaxConnectionsPerRegion(maxConnections).build());
        ConnectionWorkerPool connectionWorkerPool = this.createConnectionWorkerPool(maxRequests, 100000L, java.time.Duration.ofSeconds(5L));
        this.testBigQueryWrite.setResponseSleep(java.time.Duration.ofMillis(50L));
        long appendCount = requestToSend;
        for (long i = 0L; i < appendCount; ++i) {
            this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i));
        }
        ArrayList<ApiFuture<AppendRowsResponse>> futures = new ArrayList<ApiFuture<AppendRowsResponse>>();
        ArrayList<StreamWriter> streamWriterList = new ArrayList<StreamWriter>();
        for (int i = 0; i < tableCount; ++i) {
            streamWriterList.add(this.getTestStreamWriter(String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", i)));
        }
        for (long i = 0L; i < appendCount; ++i) {
            futures.add(this.sendFooStringTestMessage((StreamWriter)streamWriterList.get((int)(i % (long)streamWriterList.size())), connectionWorkerPool, new String[]{String.valueOf(i)}, i));
        }
        int i = 0;
        while ((long)i < appendCount) {
            AppendRowsResponse response = (AppendRowsResponse)((ApiFuture)futures.get(i)).get();
            Truth.assertThat((Long)response.getAppendResult().getOffset().getValue()).isEqualTo((Object)i);
            ++i;
        }
        Truth.assertThat((Integer)connectionWorkerPool.getCreateConnectionCount()).isEqualTo((Object)expectedConnectionCount);
        Truth.assertThat((Integer)this.testBigQueryWrite.getAppendRequests().size()).isEqualTo((Object)appendCount);
        HashSet<Long> offsets = new HashSet<Long>();
        int i2 = 0;
        while ((long)i2 < appendCount) {
            AppendRowsRequest serverRequest = this.testBigQueryWrite.getAppendRequests().get(i2);
            Truth.assertThat((Integer)serverRequest.getProtoRows().getRows().getSerializedRowsCount()).isGreaterThan((Comparable)Integer.valueOf(0));
            offsets.add(serverRequest.getOffset().getValue());
            ++i2;
        }
        Truth.assertThat((Integer)offsets.size()).isEqualTo((Object)appendCount);
    }

    @Test
    public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
        ConnectionWorkerPool.setOptions((ConnectionWorkerPool.Settings)ConnectionWorkerPool.Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
        ConnectionWorkerPool connectionWorkerPool = this.createConnectionWorkerPool(3L, 1000L, java.time.Duration.ofSeconds(5L));
        this.testBigQueryWrite.setResponseSleep(java.time.Duration.ofMillis(50L));
        StreamWriter writeStream1 = this.getTestStreamWriter(TEST_STREAM_1);
        StreamWriter writeStream2 = this.getTestStreamWriter(TEST_STREAM_2);
        long appendCount = 20L;
        for (long i = 0L; i < appendCount; ++i) {
            this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i));
        }
        ArrayList<ApiFuture<AppendRowsResponse>> futures = new ArrayList<ApiFuture<AppendRowsResponse>>();
        int i = 0;
        while ((long)i < appendCount) {
            StreamWriter streamWriter = i % 2 == 0 ? writeStream1 : writeStream2;
            futures.add(this.sendFooStringTestMessage(streamWriter, connectionWorkerPool, new String[]{String.valueOf(i)}, i));
            ++i;
        }
        for (ApiFuture apiFuture : futures) {
            apiFuture.get();
        }
        Truth.assertThat((Integer)connectionWorkerPool.getCreateConnectionCount()).isEqualTo((Object)10);
        Truth.assertThat((Integer)connectionWorkerPool.getTotalConnectionCount()).isEqualTo((Object)10);
        connectionWorkerPool.close(writeStream1);
        Truth.assertThat((Integer)connectionWorkerPool.getTotalConnectionCount()).isEqualTo((Object)8);
        connectionWorkerPool.close(writeStream2);
        Truth.assertThat((Integer)connectionWorkerPool.getTotalConnectionCount()).isEqualTo((Object)0);
    }

    @Test
    public void testMultiStreamAppend_appendWhileClosing() throws Exception {
        ConnectionWorkerPool.setOptions((ConnectionWorkerPool.Settings)ConnectionWorkerPool.Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
        ConnectionWorkerPool connectionWorkerPool = this.createConnectionWorkerPool(3L, 100000L, java.time.Duration.ofSeconds(5L));
        this.testBigQueryWrite.setResponseSleep(java.time.Duration.ofMillis(50L));
        StreamWriter writeStream1 = this.getTestStreamWriter(TEST_STREAM_1);
        StreamWriter writeStream2 = this.getTestStreamWriter(TEST_STREAM_2);
        long appendCount = 10L;
        for (long i = 0L; i < appendCount; ++i) {
            this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i));
        }
        ArrayList<ApiFuture<AppendRowsResponse>> futures = new ArrayList<ApiFuture<AppendRowsResponse>>();
        int i = 0;
        while ((long)i < appendCount) {
            StreamWriter streamWriter = i % 2 == 0 ? writeStream1 : writeStream2;
            futures.add(this.sendFooStringTestMessage(streamWriter, connectionWorkerPool, new String[]{String.valueOf(i)}, i));
            ++i;
        }
        Truth.assertThat((Integer)connectionWorkerPool.getCreateConnectionCount()).isEqualTo((Object)5);
        Truth.assertThat((Integer)connectionWorkerPool.getTotalConnectionCount()).isEqualTo((Object)5);
        connectionWorkerPool.close(writeStream1);
        Truth.assertThat((Integer)connectionWorkerPool.getTotalConnectionCount()).isEqualTo((Object)3);
        Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
        i = 0;
        while ((long)i < appendCount) {
            StreamWriter streamWriter = i % 2 == 0 ? writeStream1 : writeStream2;
            futures.add(this.sendFooStringTestMessage(streamWriter, connectionWorkerPool, new String[]{String.valueOf(i)}, i));
            ++i;
        }
        Truth.assertThat((Integer)connectionWorkerPool.getTotalConnectionCount()).isEqualTo((Object)5);
        for (ApiFuture apiFuture : futures) {
            apiFuture.get();
        }
        connectionWorkerPool.close(writeStream1);
        Truth.assertThat((Integer)connectionWorkerPool.getTotalConnectionCount()).isEqualTo((Object)3);
        connectionWorkerPool.close(writeStream2);
        Truth.assertThat((Integer)connectionWorkerPool.getTotalConnectionCount()).isEqualTo((Object)0);
    }

    @Test
    public void testCloseWhileAppending_noDeadlockHappen() throws Exception {
        int i;
        ConnectionWorkerPool.setOptions((ConnectionWorkerPool.Settings)ConnectionWorkerPool.Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
        ConnectionWorkerPool connectionWorkerPool = this.createConnectionWorkerPool(1500L, 100000L, java.time.Duration.ofSeconds(5L));
        this.testBigQueryWrite.setResponseSleep(java.time.Duration.ofMillis(20L));
        StreamWriter writeStream1 = this.getTestStreamWriter(TEST_STREAM_1);
        ListeningExecutorService threadPool = MoreExecutors.listeningDecorator((ExecutorService)Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AsyncStreamReadThread").build()));
        long appendCount = 10L;
        for (long i2 = 0L; i2 < appendCount; ++i2) {
            this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i2));
        }
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (i = 0; i < 500; ++i) {
            futures.add(threadPool.submit(() -> this.sendFooStringTestMessage(writeStream1, connectionWorkerPool, new String[]{String.valueOf(0)}, 0L)));
        }
        connectionWorkerPool.close(writeStream1);
        for (i = 0; i < 500; ++i) {
            ((Future)futures.get(i)).get();
        }
    }

    @Test
    public void testAppendWithRetry() throws Exception {
        int i;
        ConnectionWorkerPool connectionWorkerPool = this.createConnectionWorkerPool(1500L, 100000L, java.time.Duration.ofSeconds(5L));
        StreamWriter writeStream1 = this.getTestStreamWriter(TEST_STREAM_1);
        this.testBigQueryWrite.addResponse(new DummySupplierWillFailNTimesThenSucceed(3, Status.RESOURCE_EXHAUSTED.getCode(), "test quota error A", 0));
        this.testBigQueryWrite.addResponse(new DummySupplierWillFailNTimesThenSucceed(2, Status.RESOURCE_EXHAUSTED.getCode(), "test quota error B", 1));
        this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(2L));
        ArrayList<ApiFuture<AppendRowsResponse>> futures = new ArrayList<ApiFuture<AppendRowsResponse>>();
        for (i = 0; i < 3; ++i) {
            futures.add(this.sendFooStringTestMessage(writeStream1, connectionWorkerPool, new String[]{String.valueOf(i)}, i));
        }
        for (i = 0; i < 3; ++i) {
            ((Future)futures.get(i)).get();
        }
        connectionWorkerPool.close(writeStream1);
    }

    @Test
    public void testToTableName() {
        Truth.assertThat((String)ConnectionWorkerPool.toTableName((String)"projects/p/datasets/d/tables/t/streams/s")).isEqualTo((Object)"projects/p/datasets/d/tables/t");
        IllegalArgumentException ex = (IllegalArgumentException)Assert.assertThrows(IllegalArgumentException.class, () -> ConnectionWorkerPool.toTableName((String)"projects/p/"));
    }

    @Test
    public void testCloseExternalClient() throws IOException, InterruptedException, ExecutionException {
        StreamWriter sw;
        long i;
        StreamWriter.clearConnectionPool();
        long appendCount = 100L;
        for (long i2 = 0L; i2 < appendCount * 2L; ++i2) {
            this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i2));
        }
        this.testBigQueryWrite.addResponse((AbstractMessage)WriteStream.newBuilder().setLocation("us").build());
        ArrayList<ApiFuture> futures = new ArrayList<ApiFuture>();
        BigQueryWriteClient externalClient = BigQueryWriteClient.create((BigQueryWriteSettings)((BigQueryWriteSettings.Builder)((BigQueryWriteSettings.Builder)BigQueryWriteSettings.newBuilder().setCredentialsProvider((CredentialsProvider)NoCredentialsProvider.create())).setTransportChannelProvider((TransportChannelProvider)serviceHelper.createChannelProvider())).build());
        ArrayList<StreamWriter> streamWriterList = new ArrayList<StreamWriter>();
        for (int i3 = 0; i3 < 4; ++i3) {
            streamWriterList.add(StreamWriter.newBuilder((String)String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", i3), (BigQueryWriteClient)externalClient).setEnableConnectionPool(true).setWriterSchema(this.createProtoSchema()).setTraceId(TEST_TRACE_ID).setLocation("us").setRetrySettings(retrySettings).build());
        }
        for (i = 0L; i < appendCount; ++i) {
            sw = (StreamWriter)streamWriterList.get((int)(i % (long)streamWriterList.size()));
            futures.add(sw.append(this.createProtoRows(new String[]{String.valueOf(i)}), i));
        }
        externalClient.close();
        externalClient.awaitTermination(1L, TimeUnit.MINUTES);
        for (i = appendCount; i < appendCount * 2L; ++i) {
            sw = (StreamWriter)streamWriterList.get((int)(i % (long)streamWriterList.size()));
            futures.add(sw.append(this.createProtoRows(new String[]{String.valueOf(i)}), i));
        }
        int i4 = 0;
        while ((long)i4 < appendCount * 2L) {
            AppendRowsResponse response = (AppendRowsResponse)((ApiFuture)futures.get(i4)).get();
            Truth.assertThat((Long)response.getAppendResult().getOffset().getValue()).isEqualTo((Object)i4);
            ++i4;
        }
        Truth.assertThat((Integer)this.testBigQueryWrite.getAppendRequests().size()).isEqualTo((Object)(appendCount * 2L));
        for (i4 = 0; i4 < streamWriterList.size(); ++i4) {
            ((StreamWriter)streamWriterList.get(i4)).close();
        }
        StreamWriter.clearConnectionPool();
    }

    private AppendRowsResponse createAppendResponse(long offset) {
        return AppendRowsResponse.newBuilder().setAppendResult(AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of((long)offset)).build()).build();
    }

    private AppendRowsResponse createAppendResponseWithError(Status.Code code, String message) {
        return AppendRowsResponse.newBuilder().setError(com.google.rpc.Status.newBuilder().setCode(code.value()).setMessage(message)).build();
    }

    private StreamWriter getTestStreamWriter(String streamName) throws IOException {
        return StreamWriter.newBuilder((String)streamName).setWriterSchema(this.createProtoSchema()).setTraceId(TEST_TRACE_ID).setLocation("us").setCredentialsProvider((CredentialsProvider)NoCredentialsProvider.create()).setChannelProvider((TransportChannelProvider)serviceHelper.createChannelProvider()).build();
    }

    private ProtoSchema createProtoSchema() {
        return ProtoSchema.newBuilder().setProtoDescriptor(DescriptorProtos.DescriptorProto.newBuilder().setName("Message").addField(DescriptorProtos.FieldDescriptorProto.newBuilder().setName("foo").setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING).setNumber(1).build()).build()).build();
    }

    private ApiFuture<AppendRowsResponse> sendFooStringTestMessage(StreamWriter writeStream, ConnectionWorkerPool connectionWorkerPool, String[] messages, long offset) {
        return connectionWorkerPool.append(writeStream, this.createProtoRows(messages), offset, "request_" + offset);
    }

    private ProtoRows createProtoRows(String[] messages) {
        ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
        for (String message : messages) {
            Test.FooType foo = Test.FooType.newBuilder().setFoo(message).build();
            rowsBuilder.addSerializedRows(foo.toByteString());
        }
        return rowsBuilder.build();
    }

    ConnectionWorkerPool createConnectionWorkerPool(long maxRequests, long maxBytes, java.time.Duration maxRetryDuration) {
        ConnectionWorkerPool.enableTestingLogic();
        return new ConnectionWorkerPool(maxRequests, maxBytes, maxRetryDuration, FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, this.clientSettings, retrySettings, false, false);
    }

    static {
        retrySettings = RetrySettings.newBuilder().setInitialRetryDelayDuration(java.time.Duration.ofMillis(500L)).setRetryDelayMultiplier(1.3).setMaxAttempts(3).setMaxRetryDelay(Duration.ofMinutes((long)5L)).build();
    }

    private class DummySupplierWillFailNTimesThenSucceed
    implements Supplier<FakeBigQueryWriteImpl.Response> {
        private int failCount;
        private final Status.Code errorCode;
        private final String errorMessage;
        private final int successOffset;

        DummySupplierWillFailNTimesThenSucceed(int failCount, Status.Code errorCode, String errorMessage, int successOffset) {
            this.failCount = failCount;
            this.errorCode = errorCode;
            this.errorMessage = errorMessage;
            this.successOffset = successOffset;
        }

        @Override
        public FakeBigQueryWriteImpl.Response get() {
            if (this.failCount <= 0) {
                return new FakeBigQueryWriteImpl.Response(ConnectionWorkerPoolTest.this.createAppendResponse(this.successOffset));
            }
            --this.failCount;
            return new FakeBigQueryWriteImpl.Response(ConnectionWorkerPoolTest.this.createAppendResponseWithError(this.errorCode, this.errorMessage));
        }
    }
}

