/*
 * Decompiled with CFR 0.152.
 */
package com.consol.citrus.websocket.endpoint;

import com.consol.citrus.context.TestContext;
import com.consol.citrus.endpoint.EndpointConfiguration;
import com.consol.citrus.exceptions.MessageTimeoutException;
import com.consol.citrus.message.Message;
import com.consol.citrus.messaging.AbstractSelectiveMessageConsumer;
import com.consol.citrus.websocket.endpoint.WebSocketEndpointConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.WebSocketMessage;

public class WebSocketConsumer
extends AbstractSelectiveMessageConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(WebSocketConsumer.class);
    private final WebSocketEndpointConfiguration endpointConfiguration;

    public WebSocketConsumer(String name, WebSocketEndpointConfiguration endpointConfiguration) {
        super(name, (EndpointConfiguration)endpointConfiguration);
        this.endpointConfiguration = endpointConfiguration;
    }

    public Message receive(String selector, TestContext context, long timeout) {
        LOG.info(String.format("Waiting %s ms for Web Socket message ...", timeout));
        WebSocketMessage<?> message = this.receive(this.endpointConfiguration, timeout);
        Message receivedMessage = this.endpointConfiguration.getMessageConverter().convertInbound(message, this.endpointConfiguration, context);
        LOG.info("Received Web Socket message");
        context.onInboundMessage(receivedMessage);
        return receivedMessage;
    }

    private WebSocketMessage<?> receive(WebSocketEndpointConfiguration config, long timeout) {
        long timeLeft = timeout;
        WebSocketMessage<?> message = config.getHandler().getMessage();
        String endpointUri = this.endpointConfiguration.getEndpointUri();
        while (message == null && timeLeft > 0L) {
            long sleep;
            long l = sleep = (timeLeft -= this.endpointConfiguration.getPollingInterval()) > 0L ? this.endpointConfiguration.getPollingInterval() : this.endpointConfiguration.getPollingInterval() + timeLeft;
            if (LOG.isDebugEnabled()) {
                String msg = "Waiting for message on '%s' - retrying in %s ms";
                LOG.debug(String.format(msg, endpointUri, sleep));
            }
            try {
                Thread.sleep(sleep);
            }
            catch (InterruptedException e) {
                LOG.warn(String.format("Thread interrupted while waiting for message on '%s'", endpointUri), (Throwable)e);
            }
            message = config.getHandler().getMessage();
        }
        if (message == null) {
            throw new MessageTimeoutException(timeout, endpointUri);
        }
        return message;
    }
}

