Big Data y FIWARE

El término Big Data está cada día más presente en nuestras vidas, pero, ¿qué es realmente y cómo se trata?

Big Data, como su nombre indica, es el tratamiento de un gran volumen de datos para sacar una información concreta y útil en función de unas reglas definidas.

En este blogpost vamos a mostrar cómo hacer un análisis de BigData en la plataforma de FIWARE. Lo haremos mediante:

Supongamos que existe un sensor que cada vez que cambia su temperatura manda dicho cambio a Orion. Orion recibe este dato y mediante Cygnus se lo envía a Cosmos, el cual va almacenando los datos para su posterior tratamiento.

Comunicaciones

Lo primero que vamos a hacer es configurar Orion para que esté suscrito a dicho cambio. Para ello podemos hacer uso de la nube de FIWARE con una cuenta de FIWARE LAB, ya que Orion está preconfigurado en dicha nube. Si accedemos a https://cloud.lab.fiware.org/ con una cuenta válida, y nos vamos al apartado Images podemos ver todas las imágenes preconfiguradas que podemos lanzar. En nuestro caso utilizaremos la que vemos en la imagen:

Imagen de orion

Seguimos los pasos de configuración, tales como asignarle un grupo de seguridad, donde el puerto 22 debe estar abierto para acceder posteriormente por ssh, asignarle nuestro keypair (el cual podemos crear o importar en el apartado Security) y por último le asignamos una IP pública.

Una vez la instancia está iniciada nos conectaremos por ssh con nuestra clave privada a la IP pública. En este caso yo lo estoy haciendo en dos pasos: Me conecto a la máquina que tiene IP pública, la cual es un módulo de seguridad dentro de la misma subred donde está Orion y luego me conecto por ssh a Orion con su IP privada:

 

ssh orion

Una vez estamos dentro de la máquina, vamos a crear una entidad de tipo habitación, que tiene los atributos temperatura y presión. Para ello le hacemos la siguiente petición:

(curl localhost:1026/v1/updateContext -s -S --header 'Content-Type: application/json' --header 'Accept: application/json' -d @- | python -mjson.tool) <<EOF
{
 "contextElements": 
[ { "type": "Room", "isPattern": "false", "id": "Room1", "attributes": [ { "name": "temperature", "type": "float", "value": "23" }, { "name": "pressure", "type": "integer", "value": "720" } ] } ], "updateAction": "APPEND" } EOF

Y aquí vemos la respuesta:

respuesta creacion

 

Seguidamente suscribiremos Orion a los cambios de temperatura de la habitación que hemos creado. Para ello le mandaremos una petición de suscripción para que cuando haya un cambio en la temperatura de Room1 (la entidad recién creada) lo notifique a localhost en el puerto 5050, que es donde estará escuchando Cygnus. Aquí vemos la petición:

(curl localhost:1026/v1/subscribeContext -s -S --header 'Content-Type: application/json' --header 'Accept: application/json' -d @- | python -mjson.tool) <<EOF
{
    "entities": [
        {
            "type": "Room",
            "isPattern": "false",
            "id": "Room1"
        }
    ],
    "attributes": [
        "temperature"
    ],
    "reference": "https://localhost:5050/notify",
    "duration": "P1M",
    "notifyConditions": [
        {
            "type": "ONCHANGE",
            "condValues": [
                "temperature"
            ]
        }
    ],
    "throttling": "PT1S"
}
EOF

Y aquí la respuesta, donde nos da el número de la suscripción:

Respuesta subscripcion

Ahora vamos a configurar Cygnus, para que cuando le llegue un valor lo mande a Cosmos, para lo que vamos al archivo de configuración de Cygnus, ubicado en /usr/cygnus/conf/cygnus.conf

Aquí definiremos las bases de datos que vamos a comunicar, en nuestro caso será hdfs, así que habrá que quitar las relativas a mysql y ckan. Finalmente el archivo tendrá la siguiente forma:

 

cygnusagent.sources = http-source
cygnusagent.sinks = hdfs-sink
cygnusagent.channels = hdfs-channel
#=============================================                                                                                                                                     
# source configuration                                                                                                                                                             
# channel name where to write the notification events                                                                                                                              
cygnusagent.sources.http-source.channels = hdfs-channel
# source class, must not be changed                                                                                                                                                
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
# listening port the Flume source will use for receiving incoming notifications                                                                                                    
cygnusagent.sources.http-source.port = 5050
# Flume handler that will parse the notifications, must not be changed                                                                                                             
cygnusagent.sources.http-source.handler = es.tid.fiware.fiwareconnectors.cygnus.handlers.OrionRestHandler
# URL target                                                                                                                                                                       
cygnusagent.sources.http-source.handler.notification_target = /notify
# Default organization (organization semantic depend on the persistence sink)                                                                                                      
cygnusagent.sources.http-source.handler.default_organization = org42
# Number of channel re-injection retries before a Flume event is definitely discarded                                                                                              
cygnusagent.sources.http-source.handler.events_ttl = 10
# Management interface port (FIXME: temporal location for this parameter)                                                                                                          
cygnusagent.sources.http-source.handler.management_port = 8089
# Source interceptors, do not change                                                                                                                                               
cygnusagent.sources.http-source.interceptors = ts
# Timestamp interceptor, do not change                                                                                                                                             
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
# Destination extractor interceptor, do not change                                                                                                                                 
cygnusagent.sources.http-source.interceptors.de.type = es.tid.fiware.fiwreconnectors.cygnus.interceptors.DestinationExtractor$Builder
# Matching table for the destination extractor interceptor, do not change                                                                                                          
cygnusagent.sources.http-source.interceptors.de.matching_table = matching_table.conf
# ============================================                                                                                                                                     
# OrionHDFSSink configuration                                                                                                                                                      
# channel name from where to read notification events                                                                                                                              
cygnusagent.sinks.hdfs-sink.channel = hdfs-channel
# sink class, must not be changed                                                                                                                                                  
cygnusagent.sinks.hdfs-sink.type = es.tid.fiware.fiwareconnectors.cygnus.sinks.OrionHDFSSink
# Comma-separated list of FQDN/IP address regarding the Cosmos Namenode endpoints                                                                                                  
cygnusagent.sinks.hdfs-sink.cosmos_host = 130.206.80.46
# port of the Cosmos service listening for persistence operations; 14000 for httpfs, 50070 for webhdfs and free choice for inifinty                                                
cygnusagent.sinks.hdfs-sink.cosmos_port = 14000
# default username allowed to write in HDFS                                                                                                                                        
cygnusagent.sinks.hdfs-sink.cosmos_default_username = *****USER******
# default password for the default username                                                                                                                                        
cygnusagent.sinks.hdfs-sink.cosmos_default_password = ****PASSWORD*****
# HDFS backend type (webhdfs, httpfs or infinity)                                                                                                                                  
cygnusagent.sinks.hdfs-sink.hdfs_api = httpfs
# how the attributes are stored, either per row either per column (row, column)                                                                                                    
cygnusagent.sinks.hdfs-sink.attr_persistence = column
# prefix for the database and table names, empty if no prefix is desired                                                                                                           
cygnusagent.sinks.hdfs-sink.naming_prefix = 
# Hive FQDN/IP address of the Hive server                                                                                                                                          
cygnusagent.sinks.hdfs-sink.hive_host = 130.206.80.46
# Hive port for Hive external table provisioning                                                                                                                                   
cygnusagent.sinks.hdfs-sink.hive_port = 10000

#=============================================                                                                                                                                     
# hdfs-channel configuration                                                                                                                                                       
# channel type (must not be changed)                                                                                                                                               
cygnusagent.channels.hdfs-channel.type = memory
# capacity of the channel                                                                                                                                                          
cygnusagent.channels.hdfs-channel.capacity = 1000
# amount of bytes that can be sent per transaction                                                                                                                                 
cygnusagent.channels.hdfs-channel.transactionCapacity = 100

Una vez configurados Orion y Cygnus, y suscrito Orion al cambio de un atributo de una entidad, vamos a actualizar dicho atributo para registrarlo en Cosmos. Esto lo haremos mediante una petición similar a las anteriores:

(curl localhost:1026/v1/updateContext -s -S --header 'Content-Type: application/json' --header 'Accept: application/json' -d @- | python -mjson.tool) <<EOF
{
 "contextElements": [
   {
     "type": "Room",
     "isPattern": "false",
     "id": "Room1",
     "attributes": [
        {
          "name": "temperature",
          "type": "float",
          "value": "27"
        },
        {
          "name": "pressure",
          "type": "integer",
          "value": "720"
        }
      ]
    }
  ],
  "updateAction": "UPDATE"
  }
EOF

Vemos la respuesta:

Actualización temperatura

Veamos si Cygnus ha registrado el cambio y lo ha mandado a Cosmos. Podemos verlo en el log ejecutando

cat /var/log/cygnus/flume.log

Log de Cygnus

Como podemos ver, las transacciones están siendo mandadas a Cosmos. Ahora vamos a acceder a Cosmos con nuestro usuario y contraseña mediante ssh para, por fin, hacer un análisis de nuestros datos. Para ello ejecutaremos desde un terminal

ssh username@cosmos.lab.fi-ware.org

E introduciremos nuestra contraseña:

Login Cosmos

Ahora tendremos que hacer una query a hadoop donde hayamos definado que se guarden nuestros datos cuando configuramos cygnus. Podemos examinar nuestros archivos hasta llegar al que queremos mediante el comando
hadoop fs -ls /user/USERNAME/
Hasta llegar al archivo donde se han ido registrando los cambios de temperatura. Una vez localizado podemos guardarlo en nuestro directorio mediante el comando
hadoop get -get /user/USERNAME/pathToFile/file.txt /home/temperaturas.txt
Cuando lo descargamos, podemos ver su contenido. En nuestro caso:

Temperaturas

Ahora que tenemos nuestro archivo, estamos en condiciones de procesarlo. Utilizaremos uno de los ejemplos que tiene Cosmos incluídos, y es el de contar palabras dentro de un archivo, el cual ahora no es muy útil, pero nos sirve para comprender el funcionamiento. Para hacer este análisis haremos lo siguiente:

hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar wordcount /user/usuario/org/tabla/tabla.txt /user/usuario/cuentapalabras

Una vez termine el resultado, se almacena en la base de datos, y está disponible para su descarga o consulta:

hadoop fs -getmerge /user/usuario/cuentapalabras /home/usuario/cuentapalabras

Por último, este sería el resultado de este pequeño análisis de BigData:

Big Data

Con este post pretendemos haceros llegar cuáles son los pasos para iniciar nuestran andadura con el análisis de Big Data en FIWARE. Como vemos no es demasiado complejo, aunque el análisis realizado es sólo uno de los ejemplos disponibles. ¡No dudéis en hacernos llegar vuestras dudas y estaremos encantados de resolverlas!

By |2017-04-18T10:10:35+00:00junio 19th, 2015|FIWARE|

Share This Article