/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.integration.test.support.rx;

import com.azure.spring.integration.core.api.CheckpointConfig;
import com.azure.spring.integration.core.api.CheckpointMode;
import com.azure.spring.integration.core.api.RxSendOperation;
import com.azure.spring.integration.test.support.pojo.User;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import rx.Observable;
import rx.observers.AssertableSubscriber;

public abstract class RxSendSubscribeOperationTest<T extends RxSendOperation> {
    protected String destination = "test";
    protected String partitionId = "1";
    protected T sendSubscribeOperation;
    private Map<String, Object> headers = new HashMap<String, Object>();
    protected Message<User>[] messages = (Message[])IntStream.range(1, 5).mapToObj(String::valueOf).map(User::new).map(u -> new GenericMessage(u, this.headers)).toArray(Message[]::new);
    private String payload = "payload";
    private Message<byte[]> byteMessage = new GenericMessage((Object)this.payload.getBytes(StandardCharsets.UTF_8), this.headers);
    private Message<String> stringMessage = new GenericMessage((Object)this.payload, this.headers);
    private User user = new User(this.payload);
    protected Message<User> userMessage = new GenericMessage((Object)this.user, this.headers);

    protected abstract void setCheckpointConfig(CheckpointConfig var1);

    @BeforeEach
    public abstract void setUp();

    protected abstract Observable<Message<?>> subscribe(String var1, Class<?> var2);

    @Test
    public void testSendByte() {
        AssertableSubscriber subscriber = this.subscribe(this.destination, byte[].class).map(Message::getPayload).cast(byte[].class).map(String::new).test();
        this.sendSubscribeOperation.sendRx(this.destination, this.byteMessage);
        subscriber.assertValue((Object)this.payload).assertNoErrors();
    }

    @Test
    public void testSendReceiveWithManualCheckpointMode() {
        this.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
        Observable<Message<User>> observable = this.subscribe(this.destination, User.class);
        AssertableSubscriber userSubscriber = observable.map(Message::getPayload).cast(User.class).test();
        this.sendSubscribeOperation.sendRx(this.destination, this.userMessage);
        userSubscriber.assertValue((Object)this.user).assertNoErrors();
        this.verifyCheckpointSuccessCalled(0);
    }

    @Test
    public void testSendReceiveWithRecordCheckpointMode() {
        this.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.RECORD).build());
        AssertableSubscriber subscriber = this.subscribe(this.destination, User.class).map(Message::getPayload).cast(User.class).test();
        Arrays.stream(this.messages).forEach(m -> this.sendSubscribeOperation.sendRx(this.destination, m));
        subscriber.assertValueCount(this.messages.length).assertNoErrors();
        this.verifyCheckpointSuccessCalled(this.messages.length);
    }

    @Test
    public void testSendString() {
        AssertableSubscriber subscriber = this.subscribe(this.destination, String.class).map(Message::getPayload).cast(String.class).test();
        this.sendSubscribeOperation.sendRx(this.destination, this.stringMessage);
        subscriber.assertValue((Object)this.payload).assertNoErrors();
    }

    @Test
    public void testSendUser() {
        AssertableSubscriber subscriber = this.subscribe(this.destination, User.class).map(Message::getPayload).cast(User.class).test();
        this.sendSubscribeOperation.sendRx(this.destination, this.userMessage);
        subscriber.assertValue((Object)this.user).assertNoErrors();
    }

    protected abstract void verifyCheckpointBatchSuccessCalled(int var1);

    protected abstract void verifyCheckpointSuccessCalled(int var1);

    public String getPartitionId() {
        return this.partitionId;
    }

    public void setPartitionId(String partitionId) {
        this.partitionId = partitionId;
    }

    public T getSendSubscribeOperation() {
        return this.sendSubscribeOperation;
    }

    public void setSendSubscribeOperation(T sendSubscribeOperation) {
        this.sendSubscribeOperation = sendSubscribeOperation;
    }
}

