/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.storm.hook;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.storm.hook.StormTopologyUtil;
import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.storm.ISubmitterHook;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StormAtlasHook
extends AtlasHook
implements ISubmitterHook {
    public static final Logger LOG = LoggerFactory.getLogger(StormAtlasHook.class);
    public static final String ANONYMOUS_OWNER = "anonymous";
    public static final String HBASE_NAMESPACE_DEFAULT = "default";
    public static final String ATTRIBUTE_DB = "db";
    public static final String RELATIONSHIP_STORM_TOPOLOGY_NODES = "storm_topology_nodes";
    public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs";
    public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs";
    public static final String HBASE_TABLE_QUALIFIED_NAME_FORMAT = "%s:%s@%s";

    public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology stormTopology) {
        LOG.info("Collecting metadata for a new storm topology: {}", (Object)topologyInfo.get_name());
        try {
            String user = StormAtlasHook.getUser((String)topologyInfo.get_owner(), null);
            AtlasEntity topology = this.createTopologyInstance(topologyInfo, stormConf);
            AtlasEntity.AtlasEntitiesWithExtInfo entity = new AtlasEntity.AtlasEntitiesWithExtInfo(topology);
            this.addTopologyDataSets(stormTopology, topologyInfo.get_owner(), stormConf, topology, (AtlasEntity.AtlasEntityExtInfo)entity);
            List<AtlasEntity> graphNodes = this.createTopologyGraph(stormTopology, stormTopology.get_spouts(), stormTopology.get_bolts());
            if (CollectionUtils.isNotEmpty(graphNodes)) {
                topology.setRelationshipAttribute("nodes", (Object)AtlasTypeUtil.getAtlasRelatedObjectIds(graphNodes, (String)RELATIONSHIP_STORM_TOPOLOGY_NODES));
                for (AtlasEntity graphNode : graphNodes) {
                    entity.addReferredEntity(graphNode);
                }
            }
            List<HookNotification.EntityCreateRequestV2> hookNotifications = Collections.singletonList(new HookNotification.EntityCreateRequestV2(user, entity));
            this.notifyEntities(hookNotifications, null);
        }
        catch (Exception e) {
            throw new RuntimeException("Atlas hook is unable to process the topology.", e);
        }
    }

    private AtlasEntity createTopologyInstance(TopologyInfo topologyInfo, Map stormConf) {
        AtlasEntity topology = new AtlasEntity(StormDataTypes.STORM_TOPOLOGY.getName());
        String owner = topologyInfo.get_owner();
        if (StringUtils.isEmpty((String)owner)) {
            owner = ANONYMOUS_OWNER;
        }
        topology.setAttribute("id", (Object)topologyInfo.get_id());
        topology.setAttribute("name", (Object)topologyInfo.get_name());
        topology.setAttribute("qualifiedName", (Object)topologyInfo.get_name());
        topology.setAttribute("owner", (Object)owner);
        topology.setAttribute("startTime", (Object)new Date(System.currentTimeMillis()));
        topology.setAttribute("clusterName", (Object)this.getMetadataNamespace());
        return topology;
    }

    private void addTopologyDataSets(StormTopology stormTopology, String topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntity.AtlasEntityExtInfo entityExtInfo) {
        this.addTopologyInputs(stormTopology.get_spouts(), stormConf, topologyOwner, topology, entityExtInfo);
        this.addTopologyOutputs(stormTopology, topologyOwner, stormConf, topology, entityExtInfo);
    }

    private void addTopologyInputs(Map<String, SpoutSpec> spouts, Map stormConf, String topologyOwner, AtlasEntity topology, AtlasEntity.AtlasEntityExtInfo entityExtInfo) {
        ArrayList<AtlasEntity> inputs = new ArrayList<AtlasEntity>();
        for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
            Serializable instance = (Serializable)Utils.javaDeserialize((byte[])entry.getValue().get_spout_object().get_serialized_java(), Serializable.class);
            String dsType = instance.getClass().getSimpleName();
            AtlasEntity dsEntity = this.addDataSet(dsType, topologyOwner, instance, stormConf, entityExtInfo);
            if (dsEntity == null) continue;
            inputs.add(dsEntity);
        }
        topology.setRelationshipAttribute("inputs", (Object)AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, (String)RELATIONSHIP_DATASET_PROCESS_INPUTS));
    }

    private void addTopologyOutputs(StormTopology stormTopology, String topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntity.AtlasEntityExtInfo entityExtInfo) {
        ArrayList<AtlasEntity> outputs = new ArrayList<AtlasEntity>();
        Map bolts = stormTopology.get_bolts();
        Set<String> boltNames = StormTopologyUtil.getTerminalUserBoltNames(stormTopology);
        for (String boltName : boltNames) {
            Serializable instance = (Serializable)Utils.javaDeserialize((byte[])((Bolt)bolts.get(boltName)).get_bolt_object().get_serialized_java(), Serializable.class);
            String dsType = instance.getClass().getSimpleName();
            AtlasEntity dsEntity = this.addDataSet(dsType, topologyOwner, instance, stormConf, entityExtInfo);
            if (dsEntity == null) continue;
            outputs.add(dsEntity);
        }
        topology.setRelationshipAttribute("outputs", (Object)AtlasTypeUtil.getAtlasRelatedObjectIds(outputs, (String)RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
    }

    private AtlasEntity addDataSet(String dataSetType, String topologyOwner, Serializable instance, Map stormConf, AtlasEntity.AtlasEntityExtInfo entityExtInfo) {
        Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null);
        AtlasEntity ret = null;
        String metadataNamespace = this.getMetadataNamespace();
        switch (dataSetType) {
            case "KafkaSpout": {
                String topicName = config.get("KafkaSpout.kafkaSpoutConfig.translator.topic");
                String uri = config.get("KafkaSpout.kafkaSpoutConfig.kafkaProps.bootstrap.servers");
                if (StringUtils.isEmpty((String)topicName)) {
                    topicName = config.get("KafkaSpout._spoutConfig.topic");
                }
                if (StringUtils.isEmpty((String)uri)) {
                    uri = config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr");
                }
                if (StringUtils.isEmpty((String)topologyOwner)) {
                    topologyOwner = ANONYMOUS_OWNER;
                }
                if (topicName == null) {
                    LOG.error("Kafka topic name not found");
                    break;
                }
                ret = new AtlasEntity(StormDataTypes.KAFKA_TOPIC.getName());
                ret.setAttribute("topic", (Object)topicName);
                ret.setAttribute("uri", (Object)uri);
                ret.setAttribute("owner", (Object)topologyOwner);
                ret.setAttribute("qualifiedName", (Object)StormAtlasHook.getKafkaTopicQualifiedName(metadataNamespace, topicName));
                ret.setAttribute("name", (Object)topicName);
                break;
            }
            case "HBaseBolt": {
                String hbaseTableName = config.get("HBaseBolt.tableName");
                String uri = config.get("hbase.rootdir");
                if (StringUtils.isEmpty((String)uri)) {
                    uri = hbaseTableName;
                }
                metadataNamespace = this.extractComponentMetadataNamespace(HBaseConfiguration.create(), stormConf);
                if (hbaseTableName == null) {
                    LOG.error("HBase table name not found");
                    break;
                }
                ret = new AtlasEntity(StormDataTypes.HBASE_TABLE.getName());
                ret.setAttribute("uri", (Object)hbaseTableName);
                ret.setAttribute("name", (Object)uri);
                ret.setAttribute("owner", stormConf.get("storm.kerberos.principal"));
                ret.setAttribute("qualifiedName", (Object)StormAtlasHook.getHbaseTableQualifiedName(metadataNamespace, HBASE_NAMESPACE_DEFAULT, hbaseTableName));
                break;
            }
            case "HdfsBolt": {
                String hdfsUri = config.get("HdfsBolt.rotationActions") == null ? config.get("HdfsBolt.fileNameFormat.path") : config.get("HdfsBolt.rotationActions");
                String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri;
                Path hdfsPath = new Path(hdfsPathStr);
                String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath((String)hdfsPathStr);
                ret = new AtlasEntity("hdfs_path");
                ret.setAttribute("clusterName", (Object)metadataNamespace);
                ret.setAttribute("owner", stormConf.get("hdfs.kerberos.principal"));
                ret.setAttribute("name", (Object)Path.getPathWithoutSchemeAndAuthority((Path)hdfsPath).toString().toLowerCase());
                if (StringUtils.isNotEmpty((String)nameServiceID)) {
                    String updatedPath = HdfsNameServiceResolver.getPathWithNameServiceID((String)hdfsPathStr);
                    ret.setAttribute("path", (Object)updatedPath);
                    ret.setAttribute("nameServiceId", (Object)nameServiceID);
                    ret.setAttribute("qualifiedName", (Object)StormAtlasHook.getHdfsPathQualifiedName(metadataNamespace, updatedPath));
                    break;
                }
                ret.setAttribute("path", (Object)hdfsPathStr);
                ret.setAttribute("qualifiedName", (Object)StormAtlasHook.getHdfsPathQualifiedName(metadataNamespace, hdfsPathStr));
                break;
            }
            case "HiveBolt": {
                metadataNamespace = this.extractComponentMetadataNamespace((Configuration)new HiveConf(), stormConf);
                String dbName = config.get("HiveBolt.options.databaseName");
                String tblName = config.get("HiveBolt.options.tableName");
                if (dbName == null || tblName == null) {
                    LOG.error("Hive database or table name not found");
                    break;
                }
                AtlasEntity dbEntity = new AtlasEntity("hive_db");
                dbEntity.setAttribute("name", (Object)dbName);
                dbEntity.setAttribute("qualifiedName", (Object)HiveMetaStoreBridge.getDBQualifiedName((String)metadataNamespace, (String)dbName));
                dbEntity.setAttribute("clusterName", (Object)metadataNamespace);
                entityExtInfo.addReferredEntity(dbEntity);
                ret = new AtlasEntity("hive_table");
                ret.setAttribute("name", (Object)tblName);
                ret.setRelationshipAttribute(ATTRIBUTE_DB, (Object)AtlasTypeUtil.getAtlasRelatedObjectId((AtlasEntity)dbEntity, (String)"hive_table_db"));
                ret.setAttribute("qualifiedName", (Object)HiveMetaStoreBridge.getTableQualifiedName((String)metadataNamespace, (String)dbName, (String)tblName));
                break;
            }
            default: {
                return null;
            }
        }
        if (ret != null) {
            entityExtInfo.addReferredEntity(ret);
        }
        return ret;
    }

    private List<AtlasEntity> createTopologyGraph(StormTopology stormTopology, Map<String, SpoutSpec> spouts, Map<String, Bolt> bolts) {
        HashMap<String, AtlasEntity> nodeEntities = new HashMap<String, AtlasEntity>();
        this.addSpouts(spouts, nodeEntities);
        this.addBolts(bolts, nodeEntities);
        this.addGraphConnections(stormTopology, nodeEntities);
        return new ArrayList<AtlasEntity>(nodeEntities.values());
    }

    private void addSpouts(Map<String, SpoutSpec> spouts, Map<String, AtlasEntity> nodeEntities) {
        for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
            String spoutName = entry.getKey();
            AtlasEntity spout = this.createSpoutInstance(spoutName, entry.getValue());
            nodeEntities.put(spoutName, spout);
        }
    }

    private void addBolts(Map<String, Bolt> bolts, Map<String, AtlasEntity> nodeEntities) {
        for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
            String boltName = entry.getKey();
            AtlasEntity boltInstance = this.createBoltInstance(boltName, entry.getValue());
            nodeEntities.put(boltName, boltInstance);
        }
    }

    private AtlasEntity createSpoutInstance(String spoutName, SpoutSpec stormSpout) {
        AtlasEntity spout = new AtlasEntity(StormDataTypes.STORM_SPOUT.getName());
        Serializable instance = (Serializable)Utils.javaDeserialize((byte[])stormSpout.get_spout_object().get_serialized_java(), Serializable.class);
        Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null);
        spout.setAttribute("name", (Object)spoutName);
        spout.setAttribute("driverClass", (Object)instance.getClass().getName());
        spout.setAttribute("conf", flatConfigMap);
        return spout;
    }

    private AtlasEntity createBoltInstance(String boltName, Bolt stormBolt) {
        AtlasEntity bolt = new AtlasEntity(StormDataTypes.STORM_BOLT.getName());
        Serializable instance = (Serializable)Utils.javaDeserialize((byte[])stormBolt.get_bolt_object().get_serialized_java(), Serializable.class);
        Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null);
        bolt.setAttribute("name", (Object)boltName);
        bolt.setAttribute("driverClass", (Object)instance.getClass().getName());
        bolt.setAttribute("conf", flatConfigMap);
        return bolt;
    }

    private void addGraphConnections(StormTopology stormTopology, Map<String, AtlasEntity> nodeEntities) {
        Map<String, Set<String>> adjacencyMap = StormTopologyUtil.getAdjacencyMap(stormTopology, true);
        for (Map.Entry<String, Set<String>> entry : adjacencyMap.entrySet()) {
            String nodeName = entry.getKey();
            Set<String> adjacencyList = adjacencyMap.get(nodeName);
            if (CollectionUtils.isEmpty(adjacencyList)) continue;
            AtlasEntity node = nodeEntities.get(nodeName);
            ArrayList<String> outputs = new ArrayList<String>(adjacencyList.size());
            outputs.addAll(adjacencyList);
            node.setAttribute("outputs", outputs);
            for (String adjacentNodeName : adjacencyList) {
                AtlasEntity adjacentNode = nodeEntities.get(adjacentNodeName);
                ArrayList<String> inputs = (ArrayList<String>)adjacentNode.getAttribute("inputs");
                if (inputs == null) {
                    inputs = new ArrayList<String>();
                }
                inputs.add(nodeName);
                adjacentNode.setAttribute("inputs", inputs);
            }
        }
    }

    public static String getKafkaTopicQualifiedName(String metadataNamespace, String topicName) {
        return String.format("%s@%s", topicName.toLowerCase(), metadataNamespace);
    }

    public static String getHbaseTableQualifiedName(String metadataNamespace, String nameSpace, String tableName) {
        return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName.toLowerCase(), metadataNamespace);
    }

    public static String getHdfsPathQualifiedName(String metadataNamespace, String hdfsPath) {
        return String.format("%s@%s", hdfsPath.toLowerCase(), metadataNamespace);
    }

    private String extractComponentMetadataNamespace(Configuration configuration, Map stormConf) {
        String clusterName = configuration.get("atlas.cluster.name", null);
        if (clusterName == null) {
            clusterName = this.getMetadataNamespace();
        }
        return clusterName;
    }

    public String getMessageSource() {
        return "storm";
    }
}

