/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.integration.test.support;

import com.azure.spring.integration.core.api.CheckpointConfig;
import com.azure.spring.integration.core.api.CheckpointMode;
import com.azure.spring.integration.core.api.Checkpointer;
import com.azure.spring.integration.core.api.SendOperation;
import com.azure.spring.integration.test.support.pojo.User;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;

public abstract class SendSubscribeOperationTest<T extends SendOperation> {
    protected T sendSubscribeOperation;
    protected String partitionId = "1";
    protected String destination = "test";
    protected String payload = "payload";
    protected User user = new User(this.payload);
    protected Map<String, Object> headers = new HashMap<String, Object>();
    protected Message<User> userMessage = new GenericMessage((Object)this.user, this.headers);
    protected List<Message<User>> messages = IntStream.range(1, 5).mapToObj(String::valueOf).map(User::new).map(u -> new GenericMessage(u, this.headers)).collect(Collectors.toList());
    private final Message<String> stringMessage = new GenericMessage((Object)this.payload, this.headers);
    private final Message<byte[]> byteMessage = new GenericMessage((Object)this.payload.getBytes(StandardCharsets.UTF_8), this.headers);

    @BeforeEach
    public abstract void setUp() throws Exception;

    @Test
    public void testSendString() {
        this.subscribe(this.destination, this::stringHandler, String.class);
        this.sendSubscribeOperation.sendAsync(this.destination, this.stringMessage);
    }

    @Test
    public void testSendByte() {
        this.subscribe(this.destination, this::byteHandler, byte[].class);
        this.sendSubscribeOperation.sendAsync(this.destination, this.byteMessage);
    }

    @Test
    public void testSendUser() {
        this.subscribe(this.destination, this::userHandler, User.class);
        this.sendSubscribeOperation.sendAsync(this.destination, this.userMessage);
    }

    @Test
    public void testSendReceiveWithManualCheckpointMode() {
        this.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
        this.subscribe(this.destination, this::manualCheckpointHandler, User.class);
        this.sendSubscribeOperation.sendAsync(this.destination, this.userMessage);
    }

    @Test
    public void testSendReceiveWithRecordCheckpointMode() {
        this.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.RECORD).build());
        this.subscribe(this.destination, this::recordCheckpointHandler, User.class);
        this.messages.forEach(m -> this.sendSubscribeOperation.sendAsync(this.destination, m));
        this.verifyCheckpointSuccessCalled(this.messages.size());
    }

    protected void manualCheckpointHandler(Message<?> message) {
        Assertions.assertTrue((boolean)message.getHeaders().containsKey((Object)"azure_checkpointer"));
        Checkpointer checkpointer = (Checkpointer)message.getHeaders().get((Object)"azure_checkpointer", Checkpointer.class);
        Assertions.assertNotNull((Object)checkpointer);
        this.verifyCheckpointSuccess(checkpointer);
        this.verifyCheckpointFailure(checkpointer);
    }

    private void recordCheckpointHandler(Message<?> message) {
    }

    private void stringHandler(Message<?> message) {
        Assertions.assertEquals((Object)this.payload, (Object)message.getPayload());
    }

    private void byteHandler(Message<?> message) {
        Assertions.assertEquals((Object)this.payload, (Object)new String((byte[])message.getPayload(), StandardCharsets.UTF_8));
    }

    private void userHandler(Message<?> message) {
        Assertions.assertEquals((Object)this.user, (Object)message.getPayload());
    }

    protected abstract void verifyCheckpointSuccessCalled(int var1);

    protected abstract void verifyCheckpointBatchSuccessCalled(int var1);

    protected abstract void verifyCheckpointFailureCalled(int var1);

    protected abstract void subscribe(String var1, Consumer<Message<?>> var2, Class<?> var3);

    protected abstract void setCheckpointConfig(CheckpointConfig var1);

    protected void verifyCheckpointSuccess(Checkpointer checkpointer) {
        checkpointer.success();
        this.verifyCheckpointSuccessCalled(1);
    }

    protected void verifyCheckpointFailure(Checkpointer checkpointer) {
        checkpointer.failure();
        this.verifyCheckpointFailureCalled(1);
    }

    public String getPartitionId() {
        return this.partitionId;
    }

    public void setPartitionId(String partitionId) {
        this.partitionId = partitionId;
    }

    public T getSendSubscribeOperation() {
        return this.sendSubscribeOperation;
    }

    public void setSendSubscribeOperation(T sendSubscribeOperation) {
        this.sendSubscribeOperation = sendSubscribeOperation;
    }

    protected void waitMillis(long millis) {
        if (millis <= 0L) {
            millis = 30L;
        }
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }
}

