/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigquery.storage.v1;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.test.Test;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker;
import com.google.cloud.bigquery.storage.v1.FakeBigQueryWrite;
import com.google.cloud.bigquery.storage.v1.FakeScheduledExecutorService;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.common.truth.Truth;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.StatusRuntimeException;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(value=JUnit4.class)
public class ConnectionWorkerTest {
    private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
    private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/s1";
    private static final String TEST_STREAM_2 = "projects/p2/datasets/d2/tables/t2/streams/s2";
    private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
    private static final RetrySettings retrySettings = RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis((long)500L)).setRetryDelayMultiplier(1.1).setMaxAttempts(3).setMaxRetryDelay(Duration.ofMinutes((long)5L)).build();
    private FakeBigQueryWrite testBigQueryWrite;
    private FakeScheduledExecutorService fakeExecutor;
    private static MockServiceHelper serviceHelper;
    private BigQueryWriteClient client;

    @Before
    public void setUp() throws Exception {
        this.testBigQueryWrite = new FakeBigQueryWrite();
        ConnectionWorker.setMaxInflightQueueWaitTime((long)300000L);
        ConnectionWorker.setMaxInflightRequestWaitTime((java.time.Duration)java.time.Duration.ofMinutes(10L));
        serviceHelper = new MockServiceHelper(UUID.randomUUID().toString(), Arrays.asList(this.testBigQueryWrite));
        serviceHelper.start();
        this.fakeExecutor = new FakeScheduledExecutorService();
        this.testBigQueryWrite.setExecutor(this.fakeExecutor);
        this.client = BigQueryWriteClient.create((BigQueryWriteSettings)((BigQueryWriteSettings.Builder)((BigQueryWriteSettings.Builder)BigQueryWriteSettings.newBuilder().setCredentialsProvider((CredentialsProvider)NoCredentialsProvider.create())).setTransportChannelProvider((TransportChannelProvider)serviceHelper.createChannelProvider())).build());
    }

    @Test
    public void testMultiplexedAppendSuccess() throws Exception {
        try (ConnectionWorker connectionWorker = this.createConnectionWorker();){
            long appendCount = 20L;
            for (long i = 0L; i < appendCount; ++i) {
                this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i));
            }
            ArrayList<ApiFuture<AppendRowsResponse>> futures = new ArrayList<ApiFuture<AppendRowsResponse>>();
            StreamWriter sw1 = StreamWriter.newBuilder((String)TEST_STREAM_1, (BigQueryWriteClient)this.client).setWriterSchema(this.createProtoSchema("foo")).setLocation("us").build();
            StreamWriter sw2 = StreamWriter.newBuilder((String)TEST_STREAM_2, (BigQueryWriteClient)this.client).setWriterSchema(this.createProtoSchema("complicate")).setLocation("us").build();
            block16: for (long i = 0L; i < appendCount; ++i) {
                switch ((int)i % 4) {
                    case 0: 
                    case 1: {
                        ProtoRows rows = this.createFooProtoRows(new String[]{String.valueOf(i)});
                        futures.add(this.sendTestMessage(connectionWorker, sw1, this.createFooProtoRows(new String[]{String.valueOf(i)}), i));
                        continue block16;
                    }
                    case 2: 
                    case 3: {
                        futures.add(this.sendTestMessage(connectionWorker, sw2, this.createComplicateTypeProtoRows(new String[]{String.valueOf(i)}), i));
                        continue block16;
                    }
                }
            }
            int i = 0;
            while ((long)i < appendCount) {
                Int64Value offset = ((AppendRowsResponse)((ApiFuture)futures.get(i)).get()).getAppendResult().getOffset();
                Truth.assertThat((Object)offset).isEqualTo((Object)Int64Value.of((long)i));
                ++i;
            }
            Truth.assertThat((Integer)this.testBigQueryWrite.getAppendRequests().size()).isEqualTo((Object)appendCount);
            i = 0;
            while ((long)i < appendCount) {
                AppendRowsRequest serverRequest = this.testBigQueryWrite.getAppendRequests().get(i);
                Truth.assertThat((Integer)serverRequest.getProtoRows().getRows().getSerializedRowsCount()).isGreaterThan((Comparable)Integer.valueOf(0));
                Truth.assertThat((Long)serverRequest.getOffset().getValue()).isEqualTo((Object)i);
                switch (i % 4) {
                    case 0: {
                        Truth.assertThat((String)serverRequest.getWriteStream()).isEqualTo((Object)TEST_STREAM_1);
                        Truth.assertThat((String)serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()).isEqualTo((Object)"foo");
                        break;
                    }
                    case 1: {
                        if (i == 1) {
                            Truth.assertThat((String)serverRequest.getWriteStream()).isEmpty();
                        } else {
                            Truth.assertThat((String)serverRequest.getWriteStream()).isEqualTo((Object)TEST_STREAM_1);
                        }
                        Truth.assertThat((Boolean)serverRequest.getProtoRows().hasWriterSchema()).isFalse();
                        break;
                    }
                    case 2: {
                        Truth.assertThat((String)serverRequest.getWriteStream()).isEqualTo((Object)TEST_STREAM_2);
                        Truth.assertThat((String)serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()).isEqualTo((Object)"complicate");
                        break;
                    }
                    case 3: {
                        Truth.assertThat((Boolean)serverRequest.getProtoRows().hasWriterSchema()).isFalse();
                        Truth.assertThat((String)serverRequest.getWriteStream()).isEqualTo((Object)TEST_STREAM_2);
                        break;
                    }
                }
                ++i;
            }
            Truth.assertThat((Long)connectionWorker.getLoad().destinationCount()).isEqualTo((Object)2);
            Truth.assertThat((Long)connectionWorker.getLoad().inFlightRequestsBytes()).isEqualTo((Object)0);
        }
    }

    @Test
    public void testAppendInSameStream_switchSchema() throws Exception {
        try (ConnectionWorker connectionWorker = this.createConnectionWorker();){
            long appendCount = 20L;
            for (long i = 0L; i < appendCount; ++i) {
                this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i));
            }
            ArrayList<ApiFuture<AppendRowsResponse>> futures = new ArrayList<ApiFuture<AppendRowsResponse>>();
            ProtoSchema schema1 = this.createProtoSchema("foo");
            ProtoSchema schema2 = this.createProtoSchema("foo");
            ProtoSchema schema3 = this.createProtoSchema("bar");
            StreamWriter sw1 = StreamWriter.newBuilder((String)TEST_STREAM_1, (BigQueryWriteClient)this.client).setLocation("us").setWriterSchema(schema1).build();
            StreamWriter sw2 = StreamWriter.newBuilder((String)TEST_STREAM_1, (BigQueryWriteClient)this.client).setLocation("us").setWriterSchema(schema2).build();
            StreamWriter sw3 = StreamWriter.newBuilder((String)TEST_STREAM_1, (BigQueryWriteClient)this.client).setLocation("us").setWriterSchema(schema3).build();
            block17: for (long i = 0L; i < appendCount; ++i) {
                switch ((int)i % 4) {
                    case 0: {
                        futures.add(this.sendTestMessage(connectionWorker, sw1, this.createFooProtoRows(new String[]{String.valueOf(i)}), i));
                        continue block17;
                    }
                    case 1: {
                        futures.add(this.sendTestMessage(connectionWorker, sw2, this.createFooProtoRows(new String[]{String.valueOf(i)}), i));
                        continue block17;
                    }
                    case 2: 
                    case 3: {
                        futures.add(this.sendTestMessage(connectionWorker, sw3, this.createFooProtoRows(new String[]{String.valueOf(i)}), i));
                        continue block17;
                    }
                }
            }
            int i = 0;
            while ((long)i < appendCount) {
                Int64Value offset = ((AppendRowsResponse)((ApiFuture)futures.get(i)).get()).getAppendResult().getOffset();
                Truth.assertThat((Object)offset).isEqualTo((Object)Int64Value.of((long)i));
                ++i;
            }
            Truth.assertThat((Integer)this.testBigQueryWrite.getAppendRequests().size()).isEqualTo((Object)appendCount);
            i = 0;
            while ((long)i < appendCount) {
                AppendRowsRequest serverRequest = this.testBigQueryWrite.getAppendRequests().get(i);
                Truth.assertThat((Integer)serverRequest.getProtoRows().getRows().getSerializedRowsCount()).isGreaterThan((Comparable)Integer.valueOf(0));
                Truth.assertThat((Long)serverRequest.getOffset().getValue()).isEqualTo((Object)i);
                switch (i % 4) {
                    case 0: {
                        if (i == 0) {
                            Truth.assertThat((String)serverRequest.getWriteStream()).isEqualTo((Object)TEST_STREAM_1);
                        }
                        Truth.assertThat((String)serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()).isEqualTo((Object)"foo");
                        break;
                    }
                    case 1: {
                        if (i == 1) {
                            Truth.assertThat((String)serverRequest.getWriteStream()).isEmpty();
                        } else {
                            Truth.assertThat((String)serverRequest.getWriteStream()).isEqualTo((Object)TEST_STREAM_1);
                        }
                        Truth.assertThat((Boolean)serverRequest.getProtoRows().hasWriterSchema()).isFalse();
                        break;
                    }
                    case 2: {
                        Truth.assertThat((String)serverRequest.getWriteStream()).isEqualTo((Object)TEST_STREAM_1);
                        Truth.assertThat((String)serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()).isEqualTo((Object)"bar");
                        break;
                    }
                    case 3: {
                        Truth.assertThat((String)serverRequest.getWriteStream()).isEqualTo((Object)TEST_STREAM_1);
                        Truth.assertThat((Boolean)serverRequest.getProtoRows().hasWriterSchema()).isFalse();
                        break;
                    }
                }
                ++i;
            }
            Truth.assertThat((Long)connectionWorker.getLoad().destinationCount()).isEqualTo((Object)1);
            Truth.assertThat((Long)connectionWorker.getLoad().inFlightRequestsBytes()).isEqualTo((Object)0);
        }
    }

    @Test
    public void testAppendButInflightQueueFull() throws Exception {
        ProtoSchema schema1 = this.createProtoSchema("foo");
        StreamWriter sw1 = StreamWriter.newBuilder((String)TEST_STREAM_1, (BigQueryWriteClient)this.client).setLocation("us").setWriterSchema(schema1).build();
        ConnectionWorker connectionWorker = new ConnectionWorker(TEST_STREAM_1, "us", this.createProtoSchema("foo"), 6L, 100000L, java.time.Duration.ofSeconds(100L), FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, this.client.getSettings(), retrySettings, false, false);
        this.testBigQueryWrite.setResponseSleep(Duration.ofSeconds((long)1L));
        ConnectionWorker.setMaxInflightQueueWaitTime((long)500L);
        long appendCount = 6L;
        int i = 0;
        while ((long)i < appendCount) {
            this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i));
            ++i;
        }
        ArrayList<ApiFuture<AppendRowsResponse>> futures = new ArrayList<ApiFuture<AppendRowsResponse>>();
        int i2 = 0;
        while ((long)i2 < appendCount) {
            long startTime = System.currentTimeMillis();
            if (i2 == 5) {
                Assert.assertThrows(StatusRuntimeException.class, () -> this.sendTestMessage(connectionWorker, sw1, this.createFooProtoRows(new String[]{String.valueOf(5)}), 5L));
                long timeDiff = System.currentTimeMillis() - startTime;
                Assert.assertEquals((long)connectionWorker.getLoad().inFlightRequestsCount(), (long)5L);
                Assert.assertTrue((timeDiff > 500L ? 1 : 0) != 0);
            } else {
                futures.add(this.sendTestMessage(connectionWorker, sw1, this.createFooProtoRows(new String[]{String.valueOf(i2)}), i2));
                Assert.assertEquals((long)connectionWorker.getLoad().inFlightRequestsCount(), (long)(i2 + 1));
            }
            ++i2;
        }
        i2 = 0;
        while ((long)i2 < appendCount - 1L) {
            Assert.assertEquals((long)i2, (long)((AppendRowsResponse)((ApiFuture)futures.get(i2)).get()).getAppendResult().getOffset().getValue());
            ++i2;
        }
    }

    @Test
    public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
        ProtoSchema schema1 = this.createProtoSchema("foo");
        StreamWriter sw1 = StreamWriter.newBuilder((String)TEST_STREAM_1, (BigQueryWriteClient)this.client).setLocation("us").setWriterSchema(schema1).build();
        ConnectionWorker connectionWorker = new ConnectionWorker(TEST_STREAM_1, "us", this.createProtoSchema("foo"), 100000L, 100000L, java.time.Duration.ofSeconds(100L), FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, this.client.getSettings(), retrySettings, false, false);
        this.testBigQueryWrite.setResponseSleep(Duration.ofSeconds((long)1L));
        ConnectionWorker.setMaxInflightQueueWaitTime((long)500L);
        long appendCount = 10L;
        int i = 0;
        while ((long)i < appendCount) {
            this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i));
            ++i;
        }
        connectionWorker.setTestOnlyRunTimeExceptionInAppendLoop(new RuntimeException("Any exception can happen."));
        connectionWorker.setTestOnlyAppendLoopSleepTime(1000L);
        ArrayList<ApiFuture<AppendRowsResponse>> futures = new ArrayList<ApiFuture<AppendRowsResponse>>();
        int i2 = 0;
        while ((long)i2 < appendCount) {
            futures.add(this.sendTestMessage(connectionWorker, sw1, this.createFooProtoRows(new String[]{String.valueOf(i2)}), i2));
            Assert.assertEquals((long)connectionWorker.getLoad().inFlightRequestsCount(), (long)(i2 + 1));
            ++i2;
        }
        i2 = 0;
        while ((long)i2 < appendCount) {
            int finalI = i2;
            ExecutionException ex = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> ((AppendRowsResponse)((ApiFuture)futures.get(finalI)).get()).getAppendResult().getOffset().getValue());
            if (i2 == 0) {
                Truth.assertThat((Throwable)ex.getCause()).hasMessageThat().contains((CharSequence)"Any exception can happen.");
            } else {
                Truth.assertThat((Throwable)ex.getCause()).hasMessageThat().contains((CharSequence)"Connection is aborted due to ");
            }
            ++i2;
        }
        ExecutionException ex = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> this.sendTestMessage(connectionWorker, sw1, this.createFooProtoRows(new String[]{String.valueOf(100)}), 100L).get());
        Truth.assertThat((Throwable)ex.getCause()).hasMessageThat().contains((CharSequence)"Any exception can happen.");
    }

    @Test
    public void testLocationMismatch() throws Exception {
        ProtoSchema schema1 = this.createProtoSchema("foo");
        StreamWriter sw1 = StreamWriter.newBuilder((String)TEST_STREAM_1, (BigQueryWriteClient)this.client).setWriterSchema(schema1).setLocation("eu").build();
        ConnectionWorker connectionWorker = new ConnectionWorker(TEST_STREAM_1, "us", this.createProtoSchema("foo"), 100000L, 100000L, java.time.Duration.ofSeconds(100L), FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, this.client.getSettings(), retrySettings, false, false);
        StatusRuntimeException ex = (StatusRuntimeException)Assert.assertThrows(StatusRuntimeException.class, () -> this.sendTestMessage(connectionWorker, sw1, this.createFooProtoRows(new String[]{String.valueOf(0)}), 0L));
        Assert.assertEquals((Object)"INVALID_ARGUMENT: StreamWriter with location eu is scheduled to use a connection with location us", (Object)ex.getMessage());
    }

    @Test
    public void testStreamNameMismatch() throws Exception {
        ProtoSchema schema1 = this.createProtoSchema("foo");
        StreamWriter sw1 = StreamWriter.newBuilder((String)TEST_STREAM_1, (BigQueryWriteClient)this.client).setWriterSchema(schema1).build();
        ConnectionWorker connectionWorker = new ConnectionWorker(TEST_STREAM_2, null, this.createProtoSchema("foo"), 100000L, 100000L, java.time.Duration.ofSeconds(100L), FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, this.client.getSettings(), retrySettings, false, false);
        StatusRuntimeException ex = (StatusRuntimeException)Assert.assertThrows(StatusRuntimeException.class, () -> this.sendTestMessage(connectionWorker, sw1, this.createFooProtoRows(new String[]{String.valueOf(0)}), 0L));
        Assert.assertEquals((Object)"INVALID_ARGUMENT: StreamWriter with stream name projects/p1/datasets/d1/tables/t1/streams/s1 is scheduled to use a connection with stream name projects/p2/datasets/d2/tables/t2/streams/s2", (Object)ex.getMessage());
    }

    @Test
    public void testExponentialBackoff() throws Exception {
        Truth.assertThat((Long)ConnectionWorker.calculateSleepTimeMilli((long)0L)).isEqualTo((Object)50);
        Truth.assertThat((Long)ConnectionWorker.calculateSleepTimeMilli((long)5L)).isEqualTo((Object)1600);
        Truth.assertThat((Long)ConnectionWorker.calculateSleepTimeMilli((long)100L)).isEqualTo((Object)60000);
    }

    private AppendRowsResponse createAppendResponse(long offset) {
        return AppendRowsResponse.newBuilder().setAppendResult(AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of((long)offset)).build()).build();
    }

    private ConnectionWorker createConnectionWorker() throws IOException {
        return this.createConnectionWorker(TEST_STREAM_1, TEST_TRACE_ID, 100L, 1000L, java.time.Duration.ofSeconds(5L));
    }

    private ConnectionWorker createConnectionWorker(String streamName, String traceId, long maxRequests, long maxBytes, java.time.Duration maxRetryDuration) throws IOException {
        return new ConnectionWorker(streamName, "us", this.createProtoSchema("foo"), maxRequests, maxBytes, maxRetryDuration, FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, this.client.getSettings(), retrySettings, false, false);
    }

    private ProtoSchema createProtoSchema(String protoName) {
        return ProtoSchema.newBuilder().setProtoDescriptor(DescriptorProtos.DescriptorProto.newBuilder().setName(protoName).addField(DescriptorProtos.FieldDescriptorProto.newBuilder().setName("foo").setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING).setNumber(1).build()).build()).build();
    }

    private ApiFuture<AppendRowsResponse> sendTestMessage(ConnectionWorker connectionWorker, StreamWriter streamWriter, ProtoRows protoRows, long offset) {
        return connectionWorker.append(streamWriter, protoRows, offset, "request_" + offset);
    }

    private ProtoRows createFooProtoRows(String[] messages) {
        ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
        for (String message : messages) {
            Test.FooType foo = Test.FooType.newBuilder().setFoo(message).build();
            rowsBuilder.addSerializedRows(foo.toByteString());
        }
        return rowsBuilder.build();
    }

    private ProtoRows createComplicateTypeProtoRows(String[] messages) {
        ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
        for (String message : messages) {
            Test.ComplicateType complicateType = Test.ComplicateType.newBuilder().setInnerType(Test.InnerType.newBuilder().addValue(message)).build();
            rowsBuilder.addSerializedRows(complicateType.toByteString());
        }
        return rowsBuilder.build();
    }

    @Test
    public void testLoadCompare_compareLoad() {
        ConnectionWorker.Load load1 = ConnectionWorker.Load.create((long)1000L, (long)2000L, (long)100L, (long)1000L, (long)10L);
        ConnectionWorker.Load load2 = ConnectionWorker.Load.create((long)2000L, (long)1000L, (long)10L, (long)1000L, (long)10L);
        Truth.assertThat((Integer)ConnectionWorker.Load.LOAD_COMPARATOR.compare(load1, load2)).isLessThan((Comparable)Integer.valueOf(0));
        ConnectionWorker.Load load3 = ConnectionWorker.Load.create((long)1L, (long)300L, (long)10L, (long)0L, (long)10L);
        ConnectionWorker.Load load4 = ConnectionWorker.Load.create((long)10L, (long)1L, (long)10L, (long)0L, (long)10L);
        Truth.assertThat((Integer)ConnectionWorker.Load.LOAD_COMPARATOR.compare(load3, load4)).isGreaterThan((Comparable)Integer.valueOf(0));
        ConnectionWorker.Load load5 = ConnectionWorker.Load.create((long)200L, (long)1L, (long)10L, (long)1000L, (long)10L);
        ConnectionWorker.Load load6 = ConnectionWorker.Load.create((long)100L, (long)10L, (long)10L, (long)1000L, (long)10L);
        Truth.assertThat((Boolean)(ConnectionWorker.Load.LOAD_COMPARATOR.compare(load5, load6) == 0 ? 1 : 0)).isTrue();
    }

    @Test
    public void testLoadIsOverWhelmed() {
        ConnectionWorker.Load load1 = ConnectionWorker.Load.create((long)60L, (long)10L, (long)100L, (long)90L, (long)100L);
        Truth.assertThat((Boolean)load1.isOverwhelmed()).isTrue();
        ConnectionWorker.Load load2 = ConnectionWorker.Load.create((long)1L, (long)1L, (long)100L, (long)100L, (long)100L);
        Truth.assertThat((Boolean)load2.isOverwhelmed()).isFalse();
    }

    @Test
    public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception {
        ProtoSchema schema1 = this.createProtoSchema("foo");
        ConnectionWorker.setMaxInflightRequestWaitTime((java.time.Duration)java.time.Duration.ofSeconds(1L));
        StreamWriter sw1 = StreamWriter.newBuilder((String)TEST_STREAM_1, (BigQueryWriteClient)this.client).setWriterSchema(schema1).build();
        ConnectionWorker connectionWorker = new ConnectionWorker(TEST_STREAM_1, null, this.createProtoSchema("foo"), 100000L, 100000L, java.time.Duration.ofSeconds(100L), FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, this.client.getSettings(), retrySettings, false, false);
        Duration durationSleep = Duration.ofSeconds((long)2L);
        this.testBigQueryWrite.setResponseSleep(durationSleep);
        long appendCount = 2L;
        int i = 0;
        while ((long)i < appendCount) {
            this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i));
            ++i;
        }
        ArrayList<ApiFuture<AppendRowsResponse>> futures = new ArrayList<ApiFuture<AppendRowsResponse>>();
        int i2 = 0;
        while ((long)i2 < appendCount) {
            futures.add(this.sendTestMessage(connectionWorker, sw1, this.createFooProtoRows(new String[]{String.valueOf(i2)}), i2));
            Assert.assertEquals((long)connectionWorker.getLoad().inFlightRequestsCount(), (long)(i2 + 1));
            ++i2;
        }
        i2 = 0;
        while ((long)i2 < appendCount) {
            int finalI = i2;
            ExecutionException ex = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> ((AppendRowsResponse)((ApiFuture)futures.get(finalI)).get()).getAppendResult().getOffset().getValue());
            if (i2 == 0) {
                Truth.assertThat((Throwable)ex.getCause()).hasMessageThat().contains((CharSequence)"Request has waited in inflight queue");
            } else {
                Truth.assertThat((Throwable)ex.getCause()).hasMessageThat().contains((CharSequence)"Connection is aborted due to ");
            }
            ++i2;
        }
        ExecutionException ex = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> this.sendTestMessage(connectionWorker, sw1, this.createFooProtoRows(new String[]{String.valueOf(100)}), 100L).get());
        Truth.assertThat((Throwable)ex.getCause()).hasMessageThat().contains((CharSequence)"Request has waited in inflight queue");
        long startCloseTime = System.currentTimeMillis();
        connectionWorker.close();
        long timeDiff = System.currentTimeMillis() - startCloseTime;
        Assert.assertTrue((String)("timeDiff: " + timeDiff + " is more than total durationSleep: " + appendCount * durationSleep.toMillis()), (timeDiff <= appendCount * durationSleep.toMillis() ? 1 : 0) != 0);
        Assert.assertTrue((boolean)connectionWorker.isUserClosed());
    }

    @Test
    public void testLongTimeIdleWontFail() throws Exception {
        ProtoSchema schema1 = this.createProtoSchema("foo");
        ConnectionWorker.setMaxInflightRequestWaitTime((java.time.Duration)java.time.Duration.ofSeconds(1L));
        StreamWriter sw1 = StreamWriter.newBuilder((String)TEST_STREAM_1, (BigQueryWriteClient)this.client).setWriterSchema(schema1).build();
        ConnectionWorker connectionWorker = new ConnectionWorker(TEST_STREAM_1, null, this.createProtoSchema("foo"), 100000L, 100000L, java.time.Duration.ofSeconds(100L), FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, this.client.getSettings(), retrySettings, false, false);
        long appendCount = 10L;
        int i = 0;
        while ((long)i < appendCount * 2L) {
            this.testBigQueryWrite.addResponse((AbstractMessage)this.createAppendResponse(i));
            ++i;
        }
        ArrayList<ApiFuture<AppendRowsResponse>> futures = new ArrayList<ApiFuture<AppendRowsResponse>>();
        int i2 = 0;
        while ((long)i2 < appendCount) {
            futures.add(this.sendTestMessage(connectionWorker, sw1, this.createFooProtoRows(new String[]{String.valueOf(i2)}), i2));
            ++i2;
        }
        Thread.sleep(2000L);
        Assert.assertEquals((long)connectionWorker.getLoad().inFlightRequestsCount(), (long)0L);
        i2 = 0;
        while ((long)i2 < appendCount) {
            futures.add(this.sendTestMessage(connectionWorker, sw1, this.createFooProtoRows(new String[]{String.valueOf(i2)}), (long)i2 + appendCount));
            ++i2;
        }
        i2 = 0;
        while ((long)i2 < appendCount * 2L) {
            Assert.assertEquals((long)i2, (long)((AppendRowsResponse)((ApiFuture)futures.get(i2)).get()).getAppendResult().getOffset().getValue());
            ++i2;
        }
    }

    private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, String expected) throws Exception {
        ProtoSchema schema1 = this.createProtoSchema("foo");
        ConnectionWorker connectionWorker = new ConnectionWorker(streamName, null, schema1, 100000L, 100000L, java.time.Duration.ofSeconds(100L), FlowController.LimitExceededBehavior.Block, null, null, this.client.getSettings(), retrySettings, false, false);
        Attributes attributes = connectionWorker.getTelemetryAttributes();
        String attributesTableId = (String)attributes.get(ConnectionWorker.telemetryKeyTableId);
        Assert.assertEquals((Object)expected, (Object)attributesTableId);
    }

    @Test
    public void testOpenTelemetryAttributesWithStreamNames() throws Exception {
        this.exerciseOpenTelemetryAttributesWithStreamNames("projects/my_project/datasets/my_dataset/tables/my_table/streams/my_stream", "projects/my_project/datasets/my_dataset/tables/my_table");
        this.exerciseOpenTelemetryAttributesWithStreamNames("projects/my_project/datasets/my_dataset/tables/my_table/", "projects/my_project/datasets/my_dataset/tables/my_table");
        this.exerciseOpenTelemetryAttributesWithStreamNames("projects/my_project/datasets/my_dataset/tables/", null);
    }

    void checkOpenTelemetryTraceIdAttribute(Attributes attributes, int index, String expected) {
        String attributesTraceId = (String)attributes.get((AttributeKey)ConnectionWorker.telemetryKeysTraceId.get(index));
        Assert.assertEquals((Object)expected, (Object)attributesTraceId);
    }

    void exerciseOpenTelemetryAttributesWithTraceId(String traceId, String expectedField1, String expectedField2, String expectedField3) throws Exception {
        ProtoSchema schema1 = this.createProtoSchema("foo");
        ConnectionWorker connectionWorker = new ConnectionWorker(TEST_STREAM_1, null, schema1, 100000L, 100000L, java.time.Duration.ofSeconds(100L), FlowController.LimitExceededBehavior.Block, traceId, null, this.client.getSettings(), retrySettings, false, false);
        Attributes attributes = connectionWorker.getTelemetryAttributes();
        this.checkOpenTelemetryTraceIdAttribute(attributes, 0, expectedField1);
        this.checkOpenTelemetryTraceIdAttribute(attributes, 1, expectedField2);
        this.checkOpenTelemetryTraceIdAttribute(attributes, 2, expectedField3);
    }

    @Test
    public void testOpenTelemetryAttributesWithTraceId() throws Exception {
        this.exerciseOpenTelemetryAttributesWithTraceId(null, null, null, null);
        this.exerciseOpenTelemetryAttributesWithTraceId("a:b:c", null, null, null);
        this.exerciseOpenTelemetryAttributesWithTraceId("java-streamwriter:HEAD+20240508-1544 Dataflow:monorail-c-multi:2024-05-08_11_44_34-6968230696879535523:1972585693681960752", "monorail-c-multi", "2024-05-08_11_44_34-6968230696879535523", "1972585693681960752");
        this.exerciseOpenTelemetryAttributesWithTraceId("Dataflow:2024-04-26_23_19_08-12221961051154168466", "2024-04-26_23_19_08-12221961051154168466", null, null);
        this.exerciseOpenTelemetryAttributesWithTraceId("Dataflow:writeapi3:2024-04-03_03_49_33-845412829237675723:63737042897365355", "writeapi3", "2024-04-03_03_49_33-845412829237675723", "63737042897365355");
        this.exerciseOpenTelemetryAttributesWithTraceId("java-streamwriter Dataflow:pubsub-to-bq-staging-tongruil-1024-static:2024-05-14_15_13_14-5530509399715326669:4531186922674871499", "pubsub-to-bq-staging-tongruil-1024-static", "2024-05-14_15_13_14-5530509399715326669", "4531186922674871499");
        this.exerciseOpenTelemetryAttributesWithTraceId("a:b dataflow :c", null, null, null);
        this.exerciseOpenTelemetryAttributesWithTraceId("a:b dataflow:c:d", "c", "d", null);
    }

    @Test
    public void testLocationName() throws Exception {
        Assert.assertEquals((Object)"projects/p1/locations/us", (Object)ConnectionWorker.getRoutingHeader((String)TEST_STREAM_1, (String)"us"));
    }
}

