El término Big Data está cada día más presente en nuestras vidas, pero, ¿qué es realmente y cómo se trata?
Big Data, como su nombre indica, es el tratamiento de un gran volumen de datos para sacar una información concreta y útil en función de unas reglas definidas.
En este blogpost vamos a mostrar cómo hacer un análisis de BigData en la plataforma de FIWARE. Lo haremos mediante:
- Cosmos y base de datos HDFS.
- Una instancia de Orion Context Broker.
- Un conector de Orion a Cosmos, llamado Cygnus basado en Apache Flume.
- Una fuente de datos.
Supongamos que existe un sensor que cada vez que cambia su temperatura manda dicho cambio a Orion. Orion recibe este dato y mediante Cygnus se lo envía a Cosmos, el cual va almacenando los datos para su posterior tratamiento.
Lo primero que vamos a hacer es configurar Orion para que esté suscrito a dicho cambio. Para ello podemos hacer uso de la nube de FIWARE con una cuenta de FIWARE LAB, ya que Orion está preconfigurado en dicha nube. Si accedemos a https://cloud.lab.fiware.org/ con una cuenta válida, y nos vamos al apartado Images podemos ver todas las imágenes preconfiguradas que podemos lanzar. En nuestro caso utilizaremos la que vemos en la imagen:
Seguimos los pasos de configuración, tales como asignarle un grupo de seguridad, donde el puerto 22 debe estar abierto para acceder posteriormente por ssh, asignarle nuestro keypair (el cual podemos crear o importar en el apartado Security) y por último le asignamos una IP pública.
Una vez la instancia está iniciada nos conectaremos por ssh con nuestra clave privada a la IP pública. En este caso yo lo estoy haciendo en dos pasos: Me conecto a la máquina que tiene IP pública, la cual es un módulo de seguridad dentro de la misma subred donde está Orion y luego me conecto por ssh a Orion con su IP privada:
Una vez estamos dentro de la máquina, vamos a crear una entidad de tipo habitación, que tiene los atributos temperatura y presión. Para ello le hacemos la siguiente petición:
(curl localhost:1026/v1/updateContext -s -S --header 'Content-Type: application/json' --header 'Accept: application/json' -d @- | python -mjson.tool) <<EOF { "contextElements": [fusion_builder_container hundred_percent="yes" overflow="visible"][fusion_builder_row][fusion_builder_column type="1_1" background_position="left top" background_color="" border_size="" border_color="" border_style="solid" spacing="yes" background_image="" background_repeat="no-repeat" padding="" margin_top="0px" margin_bottom="0px" class="" id="" animation_type="" animation_speed="0.3" animation_direction="left" hide_on_mobile="no" center_content="no" min_height="none"][ { "type": "Room", "isPattern": "false", "id": "Room1", "attributes": [ { "name": "temperature", "type": "float", "value": "23" }, { "name": "pressure", "type": "integer", "value": "720" } ] } ], "updateAction": "APPEND" } EOF
Seguidamente suscribiremos Orion a los cambios de temperatura de la habitación que hemos creado. Para ello le mandaremos una petición de suscripción para que cuando haya un cambio en la temperatura de Room1 (la entidad recién creada) lo notifique a localhost en el puerto 5050, que es donde estará escuchando Cygnus. Aquí vemos la petición:
(curl localhost:1026/v1/subscribeContext -s -S --header 'Content-Type: application/json' --header 'Accept: application/json' -d @- | python -mjson.tool) <<EOF { "entities": [ { "type": "Room", "isPattern": "false", "id": "Room1" } ], "attributes": [ "temperature" ], "reference": "https://localhost:5050/notify", "duration": "P1M", "notifyConditions": [ { "type": "ONCHANGE", "condValues": [ "temperature" ] } ], "throttling": "PT1S" } EOF
Y aquí la respuesta, donde nos da el número de la suscripción:
Ahora vamos a configurar Cygnus, para que cuando le llegue un valor lo mande a Cosmos, para lo que vamos al archivo de configuración de Cygnus, ubicado en /usr/cygnus/conf/cygnus.conf
Aquí definiremos las bases de datos que vamos a comunicar, en nuestro caso será hdfs, así que habrá que quitar las relativas a mysql y ckan. Finalmente el archivo tendrá la siguiente forma:
cygnusagent.sources = http-source cygnusagent.sinks = hdfs-sink cygnusagent.channels = hdfs-channel #============================================= # source configuration # channel name where to write the notification events cygnusagent.sources.http-source.channels = hdfs-channel # source class, must not be changed cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource # listening port the Flume source will use for receiving incoming notifications cygnusagent.sources.http-source.port = 5050 # Flume handler that will parse the notifications, must not be changed cygnusagent.sources.http-source.handler = es.tid.fiware.fiwareconnectors.cygnus.handlers.OrionRestHandler # URL target cygnusagent.sources.http-source.handler.notification_target = /notify # Default organization (organization semantic depend on the persistence sink) cygnusagent.sources.http-source.handler.default_organization = org42 # Number of channel re-injection retries before a Flume event is definitely discarded cygnusagent.sources.http-source.handler.events_ttl = 10 # Management interface port (FIXME: temporal location for this parameter) cygnusagent.sources.http-source.handler.management_port = 8089 # Source interceptors, do not change cygnusagent.sources.http-source.interceptors = ts # Timestamp interceptor, do not change cygnusagent.sources.http-source.interceptors.ts.type = timestamp # Destination extractor interceptor, do not change cygnusagent.sources.http-source.interceptors.de.type = es.tid.fiware.fiwreconnectors.cygnus.interceptors.DestinationExtractor$Builder # Matching table for the destination extractor interceptor, do not change cygnusagent.sources.http-source.interceptors.de.matching_table = matching_table.conf # ============================================ # OrionHDFSSink configuration # channel name from where to read notification events cygnusagent.sinks.hdfs-sink.channel = hdfs-channel # sink class, must not be changed cygnusagent.sinks.hdfs-sink.type = es.tid.fiware.fiwareconnectors.cygnus.sinks.OrionHDFSSink # Comma-separated list of FQDN/IP address regarding the Cosmos Namenode endpoints cygnusagent.sinks.hdfs-sink.cosmos_host = 130.206.80.46 # port of the Cosmos service listening for persistence operations; 14000 for httpfs, 50070 for webhdfs and free choice for inifinty cygnusagent.sinks.hdfs-sink.cosmos_port = 14000 # default username allowed to write in HDFS cygnusagent.sinks.hdfs-sink.cosmos_default_username = *****USER****** # default password for the default username cygnusagent.sinks.hdfs-sink.cosmos_default_password = ****PASSWORD***** # HDFS backend type (webhdfs, httpfs or infinity) cygnusagent.sinks.hdfs-sink.hdfs_api = httpfs # how the attributes are stored, either per row either per column (row, column) cygnusagent.sinks.hdfs-sink.attr_persistence = column # prefix for the database and table names, empty if no prefix is desired cygnusagent.sinks.hdfs-sink.naming_prefix = # Hive FQDN/IP address of the Hive server cygnusagent.sinks.hdfs-sink.hive_host = 130.206.80.46 # Hive port for Hive external table provisioning cygnusagent.sinks.hdfs-sink.hive_port = 10000 #============================================= # hdfs-channel configuration # channel type (must not be changed) cygnusagent.channels.hdfs-channel.type = memory # capacity of the channel cygnusagent.channels.hdfs-channel.capacity = 1000 # amount of bytes that can be sent per transaction cygnusagent.channels.hdfs-channel.transactionCapacity = 100
Una vez configurados Orion y Cygnus, y suscrito Orion al cambio de un atributo de una entidad, vamos a actualizar dicho atributo para registrarlo en Cosmos. Esto lo haremos mediante una petición similar a las anteriores:
(curl localhost:1026/v1/updateContext -s -S --header 'Content-Type: application/json' --header 'Accept: application/json' -d @- | python -mjson.tool) <<EOF { "contextElements": [ { "type": "Room", "isPattern": "false", "id": "Room1", "attributes": [ { "name": "temperature", "type": "float", "value": "27" }, { "name": "pressure", "type": "integer", "value": "720" } ] } ], "updateAction": "UPDATE" } EOF
Vemos la respuesta:
Veamos si Cygnus ha registrado el cambio y lo ha mandado a Cosmos. Podemos verlo en el log ejecutando
cat /var/log/cygnus/flume.log
Como podemos ver, las transacciones están siendo mandadas a Cosmos. Ahora vamos a acceder a Cosmos con nuestro usuario y contraseña mediante ssh para, por fin, hacer un análisis de nuestros datos. Para ello ejecutaremos desde un terminal
ssh username@cosmos.lab.fi-ware.org
E introduciremos nuestra contraseña:
Ahora tendremos que hacer una query a hadoop donde hayamos definado que se guarden nuestros datos cuando configuramos cygnus. Podemos examinar nuestros archivos hasta llegar al que queremos mediante el comando
hadoop fs -ls /user/USERNAME/
Hasta llegar al archivo donde se han ido registrando los cambios de temperatura. Una vez localizado podemos guardarlo en nuestro directorio mediante el comando
hadoop get -get /user/USERNAME/pathToFile/file.txt /home/temperaturas.txt
Cuando lo descargamos, podemos ver su contenido. En nuestro caso:
Ahora que tenemos nuestro archivo, estamos en condiciones de procesarlo. Utilizaremos uno de los ejemplos que tiene Cosmos incluídos, y es el de contar palabras dentro de un archivo, el cual ahora no es muy útil, pero nos sirve para comprender el funcionamiento. Para hacer este análisis haremos lo siguiente:
hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar wordcount /user/usuario/org/tabla/tabla.txt /user/usuario/cuentapalabras
Una vez termine el resultado, se almacena en la base de datos, y está disponible para su descarga o consulta:
hadoop fs -getmerge /user/usuario/cuentapalabras /home/usuario/cuentapalabras
Por último, este sería el resultado de este pequeño análisis de BigData:
Con este post pretendemos haceros llegar cuáles son los pasos para iniciar nuestran andadura con el análisis de Big Data en FIWARE. Como vemos no es demasiado complejo, aunque el análisis realizado es sólo uno de los ejemplos disponibles. ¡No dudéis en hacernos llegar vuestras dudas y estaremos encantados de resolverlas!
[/fusion_builder_column][/fusion_builder_row][/fusion_builder_container]
0 comentarios