/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io;

import com.datatorrent.common.util.PubSubMessageCodec;
import com.datatorrent.lib.io.WebSocketInputOperator;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubSubWebSocketInputOperator<T>
extends WebSocketInputOperator<T> {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketInputOperator.class);
    private String topic = null;
    private transient PubSubMessageCodec<Object> codec = new PubSubMessageCodec(this.mapper);

    public void setTopic(String topic) {
        this.topic = topic;
    }

    @NotNull
    public String getTopic() {
        return this.topic;
    }

    @Override
    protected T convertMessage(String message) throws IOException {
        Map map = (Map)this.mapper.readValue(message, HashMap.class);
        return (T)map.get("data");
    }

    @Override
    public void run() {
        super.run();
        try {
            this.connection.sendMessage(PubSubMessageCodec.constructSubscribeMessage((String)this.topic, this.codec));
        }
        catch (IOException ex) {
            LOG.error("Exception caught", (Throwable)ex);
        }
    }
}

