/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigtable.data.v2.stub.changestream;

import com.google.api.client.util.Lists;
import com.google.api.gax.rpc.ServerStream;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.StreamContinuationToken;
import com.google.bigtable.v2.StreamPartition;
import com.google.bigtable.v2.TimestampRange;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecordAdapter;
import com.google.cloud.bigtable.data.v2.models.CloseStream;
import com.google.cloud.bigtable.data.v2.models.DefaultChangeStreamRecordAdapter;
import com.google.cloud.bigtable.data.v2.models.DeleteCells;
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.Heartbeat;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMergingCallable;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi;
import com.google.cloud.conformance.bigtable.v2.ChangeStreamTestDefinition;
import com.google.common.base.CaseFormat;
import com.google.common.truth.Truth;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.JsonFormat;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ReadChangeStreamMergingAcceptanceTest {
    private static final String TEST_DATA_JSON_RESOURCE = "changestream.json";
    private final ChangeStreamTestDefinition.ReadChangeStreamTest testCase;

    public ReadChangeStreamMergingAcceptanceTest(ChangeStreamTestDefinition.ReadChangeStreamTest testData, String junitName) {
        this.testCase = testData;
    }

    @Parameterized.Parameters(name="{1}")
    public static Collection<Object[]> data() throws IOException {
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        InputStream dataJson = cl.getResourceAsStream(TEST_DATA_JSON_RESOURCE);
        Truth.assertWithMessage((String)"Unable to load test definition: %s", (Object[])new Object[]{TEST_DATA_JSON_RESOURCE}).that((Object)dataJson).isNotNull();
        InputStreamReader reader = new InputStreamReader(dataJson);
        ChangeStreamTestDefinition.ChangeStreamTestFile.Builder testBuilder = ChangeStreamTestDefinition.ChangeStreamTestFile.newBuilder();
        JsonFormat.parser().merge((Reader)reader, (Message.Builder)testBuilder);
        ChangeStreamTestDefinition.ChangeStreamTestFile testDefinition = testBuilder.build();
        List<ChangeStreamTestDefinition.ReadChangeStreamTest> tests = testDefinition.getReadChangeStreamTestsList();
        ArrayList<Object[]> data = new ArrayList<Object[]>(tests.size());
        for (ChangeStreamTestDefinition.ReadChangeStreamTest test : tests) {
            String junitName = CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, test.getDescription().replace(" ", "-"));
            data.add(new Object[]{test, junitName});
        }
        return data;
    }

    @Test
    public void test() throws Exception {
        List<ReadChangeStreamResponse> responses = this.testCase.getApiResponsesList();
        FakeStreamingApi.ServerStreamingStashCallable source = new FakeStreamingApi.ServerStreamingStashCallable(responses);
        ChangeStreamRecordMergingCallable mergingCallable = new ChangeStreamRecordMergingCallable(source, (ChangeStreamRecordAdapter)new DefaultChangeStreamRecordAdapter());
        ServerStream stream = mergingCallable.call((Object)ReadChangeStreamRequest.getDefaultInstance());
        ArrayList actualResults = Lists.newArrayList();
        Exception error = null;
        try {
            for (ChangeStreamRecord record : stream) {
                ChangeStreamTestDefinition.ReadChangeStreamTest.TestChangeStreamMutation.Builder builder;
                if (record instanceof Heartbeat) {
                    Heartbeat heartbeat = (Heartbeat)record;
                    ChangeStreamContinuationToken token = heartbeat.getChangeStreamContinuationToken();
                    ReadChangeStreamResponse.Heartbeat heartbeatProto = ReadChangeStreamResponse.Heartbeat.newBuilder().setContinuationToken(StreamContinuationToken.newBuilder().setPartition(StreamPartition.newBuilder().setRowRange(RowRange.newBuilder().setStartKeyClosed((ByteString)token.getPartition().getStart()).setEndKeyOpen((ByteString)token.getPartition().getEnd()).build()).build()).setToken(heartbeat.getChangeStreamContinuationToken().getToken()).build()).setEstimatedLowWatermark(Timestamp.newBuilder().setSeconds(heartbeat.getEstimatedLowWatermark().getEpochSecond()).setNanos(heartbeat.getEstimatedLowWatermark().getNano()).build()).build();
                    actualResults.add(ChangeStreamTestDefinition.ReadChangeStreamTest.Result.newBuilder().setRecord(ChangeStreamTestDefinition.ReadChangeStreamTest.TestChangeStreamRecord.newBuilder().setHeartbeat(heartbeatProto).build()).build());
                    continue;
                }
                if (record instanceof CloseStream) {
                    CloseStream closeStream = (CloseStream)record;
                    builder = ReadChangeStreamResponse.CloseStream.newBuilder().setStatus(closeStream.getStatus().toProto());
                    for (ChangeStreamContinuationToken token : closeStream.getChangeStreamContinuationTokens()) {
                        builder.addContinuationTokens(StreamContinuationToken.newBuilder().setPartition(StreamPartition.newBuilder().setRowRange(RowRange.newBuilder().setStartKeyClosed((ByteString)token.getPartition().getStart()).setEndKeyOpen((ByteString)token.getPartition().getEnd()).build())).setToken(token.getToken()).build());
                    }
                    for (Range.ByteStringRange newPartition : closeStream.getNewPartitions()) {
                        builder.addNewPartitions(StreamPartition.newBuilder().setRowRange(RowRange.newBuilder().setStartKeyClosed((ByteString)newPartition.getStart()).setEndKeyOpen((ByteString)newPartition.getEnd())));
                    }
                    ReadChangeStreamResponse.CloseStream closeStreamProto = builder.build();
                    actualResults.add(ChangeStreamTestDefinition.ReadChangeStreamTest.Result.newBuilder().setRecord(ChangeStreamTestDefinition.ReadChangeStreamTest.TestChangeStreamRecord.newBuilder().setCloseStream(closeStreamProto).build()).build());
                    continue;
                }
                if (record instanceof ChangeStreamMutation) {
                    ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation)record;
                    builder = ChangeStreamTestDefinition.ReadChangeStreamTest.TestChangeStreamMutation.newBuilder();
                    builder.setRowKey(changeStreamMutation.getRowKey());
                    ReadChangeStreamResponse.DataChange.Type type = ReadChangeStreamResponse.DataChange.Type.UNRECOGNIZED;
                    if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) {
                        type = ReadChangeStreamResponse.DataChange.Type.USER;
                    } else if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
                        type = ReadChangeStreamResponse.DataChange.Type.GARBAGE_COLLECTION;
                    }
                    builder.setType(type);
                    if (changeStreamMutation.getSourceClusterId() != null) {
                        builder.setSourceClusterId(changeStreamMutation.getSourceClusterId());
                    }
                    builder.setCommitTimestamp(Timestamp.newBuilder().setSeconds(changeStreamMutation.getCommitTimestamp().getEpochSecond()).setNanos(changeStreamMutation.getCommitTimestamp().getNano()).build());
                    builder.setTiebreaker(changeStreamMutation.getTieBreaker());
                    builder.setToken(changeStreamMutation.getToken());
                    builder.setEstimatedLowWatermark(Timestamp.newBuilder().setSeconds(changeStreamMutation.getEstimatedLowWatermark().getEpochSecond()).setNanos(changeStreamMutation.getEstimatedLowWatermark().getNano()).build());
                    for (Entry entry : changeStreamMutation.getEntries()) {
                        if (entry instanceof DeleteFamily) {
                            DeleteFamily deleteFamily = (DeleteFamily)entry;
                            builder.addMutations(Mutation.newBuilder().setDeleteFromFamily(Mutation.DeleteFromFamily.newBuilder().setFamilyName(deleteFamily.getFamilyName()).build()));
                            continue;
                        }
                        if (entry instanceof DeleteCells) {
                            DeleteCells deleteCells = (DeleteCells)entry;
                            builder.addMutations(Mutation.newBuilder().setDeleteFromColumn(Mutation.DeleteFromColumn.newBuilder().setFamilyName(deleteCells.getFamilyName()).setColumnQualifier(deleteCells.getQualifier()).setTimeRange(TimestampRange.newBuilder().setStartTimestampMicros(((Long)deleteCells.getTimestampRange().getStart()).longValue()).setEndTimestampMicros(((Long)deleteCells.getTimestampRange().getEnd()).longValue()).build()).build()));
                            continue;
                        }
                        if (entry instanceof SetCell) {
                            SetCell setCell = (SetCell)entry;
                            builder.addMutations(Mutation.newBuilder().setSetCell(Mutation.SetCell.newBuilder().setFamilyName(setCell.getFamilyName()).setColumnQualifier(setCell.getQualifier()).setTimestampMicros(setCell.getTimestamp()).setValue(setCell.getValue())));
                            continue;
                        }
                        throw new IllegalStateException("Unexpected Entry type");
                    }
                    actualResults.add(ChangeStreamTestDefinition.ReadChangeStreamTest.Result.newBuilder().setRecord(ChangeStreamTestDefinition.ReadChangeStreamTest.TestChangeStreamRecord.newBuilder().setChangeStreamMutation(builder)).build());
                    continue;
                }
                throw new IllegalStateException("Unexpected ChangeStreamRecord type");
            }
        }
        catch (Exception e) {
            error = e;
        }
        if (ReadChangeStreamMergingAcceptanceTest.expectsError(this.testCase)) {
            Truth.assertThat((Throwable)error).isNotNull();
        } else if (error != null) {
            throw error;
        }
        Truth.assertThat(ReadChangeStreamMergingAcceptanceTest.getNonExceptionResults(this.testCase)).isEqualTo((Object)actualResults);
    }

    private static boolean expectsError(ChangeStreamTestDefinition.ReadChangeStreamTest testCase) {
        List<ChangeStreamTestDefinition.ReadChangeStreamTest.Result> results = testCase.getResultsList();
        return results != null && !results.isEmpty() && results.get(results.size() - 1).getError();
    }

    private static List<ChangeStreamTestDefinition.ReadChangeStreamTest.Result> getNonExceptionResults(ChangeStreamTestDefinition.ReadChangeStreamTest testCase) {
        List<ChangeStreamTestDefinition.ReadChangeStreamTest.Result> results = testCase.getResultsList();
        ArrayList<ChangeStreamTestDefinition.ReadChangeStreamTest.Result> response = new ArrayList<ChangeStreamTestDefinition.ReadChangeStreamTest.Result>();
        if (results != null) {
            for (ChangeStreamTestDefinition.ReadChangeStreamTest.Result result : results) {
                if (result.getError()) continue;
                response.add(result);
            }
        }
        return response;
    }
}

