Pasamos a la tercera parte de la serie, en la que ya vamos a ver un ejemplo concreto de lo que puede ser un sencillo interfaz para la gestión de logs usando Solandra. Como siempre, todo el código y los ficheros de configuración más importantes están en Github.

Recordemos los objetivos que nos hemos marcado:

  • Ser capaces de gestionar un volumen muy importante de logs, con la máxima escalabilidad y disponibilidad.
  • Poder añadir los logs en el sistema de una forma sencilla.
  • Tener un interfaz web desde el que poder visualizar los datos.

Ya hemos hablado sobre la escalabilidad de Cassandra en los posts anteriores, así que no vamos a volver a entrar en este punto. Veamos los otros dos:

Inserción de datos en el sistema

Solandra es básicamente una adaptación de Solr, por lo que en realidad vamos a tratar conceptos propios de esta aplicación en lo que queda de post. Cuando veamos algo específico de Solandra, lo señalaré.

Uno de los elementos más importantes de la configuración del sistema es el schema; que viene a ser el lugar en el que se definen los atributos que forman cada "documento" (correo en este caso) que se quiere indexar. Como veremos más adelante, una vez creada esta estructura de datos, usaremos el comando curl para insertarla en el cluster a través de una url determinada. Además, Solandra permite trabajar con varios schemas diferentes de manera simultanea.

Simplicando un poco, el schema está compuesto por dos grandes bloques: La definición de los tipos de datos y la lista de atributos que forman cada documento.

Solr ofrece muchos tipos de datos ya creados de antemano: numéricos, texto, fechas, .... Por si esto fuera poco, la aplicación permite definir nuevos tipos siempre que se considere necesario. Para la gestión de logs de correo, por ejemplo, nos podría venir bien un tipo específico para las direcciones email:

  ...
  <fieldType name="email" class="solr.TextField" >
     <analyzer>
        <tokenizer class="solr.PatternTokenizerFactory" pattern="@" />
        <filter class="solr.LowerCaseFilterFactory" />
        <filter class="solr.TrimFilterFactory" />
     </analyzer>
  </fieldType>
  ...

Este tipo se basa en el TextField clásico, pero forma los tokens alrededor de la "@". De esta manera, facilitaremos las búsquedas tanto en base a la parte local de las direcciones como al dominio. Además, elimina los espacios y convierte las mayúsculas en minúsculas.

El siguiente paso es la definición de los atributos que componen cada documento, y que por supuesto van a ser de alguno de los tipos disponibles en el schema.

  <fields>
      <field name="id" type="uuid" indexed="true" stored="true" required="true" />
      <field name="in_from" type="email" indexed="true" stored="true" />
      <field name="in_size" type="tint" indexed="false" stored="true" />
      <field name="in_fentrada" type="tdate" indexed="true" stored="true" />
      <field name="in_to" type="email" indexed="true" stored="true" />
      <field name="in_to_adicional" type="email" indexed="true" stored="true" multiValued="true" />
      <field name="in_fsalida" type="tdate" indexed="true" stored="true" multiValued="true" />
      <field name="in_estado" type="string" indexed="false" stored="true" multiValued="true" />

      <dynamicField name="*" type="ignored" multiValued="true" />
  </fields>

Casi no hace falta explicar nada. En este sencillo ejemplo tenemos un identificador, un origen, un tamaño, una fecha de entrada y una fecha de entrega. Además, con cada mensaje vamos a guardar el estado de entrega para cada destino (pueden haberse realizado varios intentos, por ejemplo a causa del greylisting), y la lista de destinatarios adicionales para los que iba dirigido.

Obviamente, es una estructura limitada. En un entorno real se debería guardar mucha más información (antivirus, antispam, expansión de aliases, ...).

Y con esto ya lo tenemos. Sólo queda volcar el schema en el cluster:

  curl http://ip_cluster:8983/solandra/schema/correo --data-binary @/root/schema_correo.xml -H 'Content-type:text/xml; charset=utf-8'

Como hemos dicho, al igual que un "/schema/correo", se podría definir un "/schema/web", por ejemplo, y usarlos simultáneamente.

El volcado de datos

El volcado de datos se puede hacer de varias formas, pero vamos a limitarnos al uso de ficheros xml. Lo mejor es ver un ejemplo:

 <add allowDups="false">
    <doc>
       <field name="in_from">origenspam@example.com</field>
       <field name="id">b309842a-1cf7-11e0-9759-f896edbeae14</field>
       <field name="in_fentrada">2011-04-30T00:17:24Z</field>
       <field name="in_size">941</field>
       <field name="in_to">destino1@target.example.net</field>
       <field name="in_fsalida">2011-04-30T00:17:25Z</field>
       <field name="in_estado">0 - 192.168.10.24_accepted_message./Remote_host_said:_250_2.0.0_Ok:_queued_as_DF2BD74CA71/</field>
       <field name="in_to_adicional">destino1@target.example.net</field>
       <field name="in_to_adicional">destino2@target.example.net</field>
       <field name="in_to_adicional">destino3@target.example.net</field>
       <field name="in_to_adicional">destino4@target.example.net</field>
    </doc>
    <doc>
       <field name="in_from">origenspam@example.com</field>
       <field name="id">b30991b7-1cf7-11e0-9759-e819bb7f7b58</field>
       <field name="in_fentrada">2011-04-30T00:17:24Z</field>
       <field name="in_size">941</field>
       <field name="in_to">destino2@target.example.net</field>
       <field name="in_fsalida">2011-04-30T00:17:24Z</field>
       <field name="in_estado">0 - 192.168.10.24_accepted_message./Remote_host_said:_250_2.0.0_Ok:_queued_as_A945A21CD6B/</field>
       <field name="in_to_adicional">destino1@target.example.net</field>
       <field name="in_to_adicional">destino2@target.example.net</field>
       <field name="in_to_adicional">destino3@target.example.net</field>
       <field name="in_to_adicional">destino4@target.example.net</field>
    </doc>
    <doc>
       <field name="in_from">cliente1@example.net</field>
       <field name="id">b43de232-1cf7-11e0-9759-d71ac36a357f</field>
       <field name="in_fentrada">2011-04-30T00:04:41Z</field>
       <field name="in_size">6077</field>
       <field name="in_to">greylister@example.com</field>
       <field name="in_fsalida">2011-04-30T00:07:27Z</field>
       <field name="in_estado">0 - 172.16.0.14_does_not_like_recipient./Remote_host_said:_450_4.7.1_<cliente1@example.net>:
          _Sender_address_rejected:_Message_delayed_now.Retry_later,_please./Giving_up_on_172.16.0.14./</field>
       <field name="in_fsalida">2011-04-30T00:12:38Z</field>
       <field name="in_estado">1 - 172.16.0.14_does_not_like_recipient./Remote_host_said:_450_4.7.1_<cliente1@example.net>:
          _Sender_address_rejected:_Message_delayed_now._Retry_later,_please./Giving_up_on_172.16.0.14./</field>
       <field name="in_fsalida">2011-04-30T00:31:53Z</field>
       <field name="in_estado">2 - 172.16.0.14_does_not_like_recipient./Remote_host_said:_451_4.3.0_<greylister@example.com>:
          _Temporary_lookup_failure/Giving_up_on_172.16.0.14./</field>
       <field name="in_fsalida">2011-04-30T01:04:43Z</field>
       <field name="in_estado">3 - 172.16.0.14_accepted_message./Remote_host_said:_250_2.0.0_Ok:_queued_as_CB6A3D4A217/</field>
       <field name="in_to_adicional">greylister@example.com</field>
    </doc>
 </add>

El volcado, otra vez, es muy sencillo.

  curl http://ip_cluster:8983/solandra/correo/update -F stream.file=/tmp/volcado.xml

La conversión de logs desde el más que probable modo texto de syslog a xml, y de ahí al cluster de Solandra, queda fuera de esta serie de posts. De hecho, un servidor piensa que esto es lo realmente importante y difícil para llevar este proyecto a la práctica de una manera "seria".

El Interfaz

¿Qué mejor que un interfaz web para mostrar los datos que hemos almacenado en Solandra? ¡Sorpresa! la gente detŕas del proyecto ajax-solr ya ha hecho la mayoría del trabajo, así que sólo nos queda modificar un puñado de ficheros, algo de código JavaScript, y ya lo tendremos: Demo

Un detalle más: No queremos permitir el acceso directo a Solandra desde la web, así que necesitamos un proxy que filtre las consultas y las redirija al puerto de Solandra (tcp/8983 por defecto), y que en mi laboratorio escucha en localhost. En este caso, como casi siempre que quiero programar algún tipo de servicio para Internet sin dedicarle mucho tiempo, he usado node.js. Para este ejemplo, y por jugar un poco con GeoIP, he escrito un sencillo proxy que permite conexiones sólo si tienen como referer forondarena.net y si vienen desde Europa o América del Norte. Como no podría ser de otra forma, el código de este proxy también está disponible en Github.

(Nota: Esta demo es un Solr normal, pero el funcionamiento es idéntico al de Solandra).

Conclusiones

El "mercado" está lleno de soluciones de todo tipo que nos pueden ayudar en la gestión de logs. Hay aplicaciones comerciales, como Splunk, sistemas basados en software libre, como Solr, tecnología que nos puede permitir crecer "ilimitadamente", como Cassandra, Hadoop o Hbase, pero que requieren algo de trabajo; y también tenemos los mágnificos sistemas de bases de datos, como Mysql o Postgresql. ¿Cuál elegir?

En una primera fase, una buena base de datos con un sencillo interfaz web o un Solr estándar pueden servir perfectamente para gestionar todo tipo de logs. De hecho, tanto Mysql como Solr ofrecen alternativas para el particionado que pueden permitir este esquema en la segunda, tercera o cuarta fase.

Un buen consejo que escuche hace tiempo es el de "no arreglar lo que no está roto". Sólo deberíamos plantearnos el uso de tecnología que probablemente no conozcamos tan bien como las anteriores cuando realmente sea necesario. Llegado ese momento, adelante. Como siempre, la comunidad detrás del software libre es activa y está dispuesta a ayudar. Por si esto fuera poco, cada vez son más comunes las empresas que ofrecen servicios alrededor de este tipo de soluciones, y que pueden asesorarnos llegado el caso.


Gestión de logs con Solandra II

vie, 05 ago 2011 by Foron

Seguimos con el segundo post de la serie, en el que pasamos a dar una descripción un poco más técnica de los componentes necesarios para poner en marcha todo lo descrito en el primero. No vamos a entrar en demasiado detalle. En todo caso, una vez conocidas las aplicaciones es más fácil buscar información en la red.

Aunque Solandra puede encargarse de la instalación de Cassandra, aquí vamos a usar los componentes por separado.

Cassandra

Cassandra es un tipo de base de datos creado siguiendo los principios propuestos por Dynamo (Amazon) y por BigTable (Google). El que quiera entrar en detalle tiene bibliografía y mucha documentación disponible.

Explicar el modelo de datos, la replicación o los niveles de consistencia va más allá del objetivo de este post. Lo más interesante en nuestro contexto es dejar claro que Cassandra es una base de datos completamente distribuida y descentralizada, en la que todas las máquinas del cluster cumplen el mismo y único rol, sin distinciones entre "maestros", "esclavos", "catálogos", .... Esto significa que añadir capacidad a un cluster de Cassandra supone básicamente añadir más hierro. Nada más.

La instalación puede complicarse todo lo que queramos, pero lo básico es:

  # Cuidado con la versión
  cd /tmp/ && wget http://apache.rediris.es/cassandra/0.8.2/apache-cassandra-0.8.2-bin.tar.gz
  cd /usr/local/ && tar xvzf /tmp/apache-cassandra-0.8.2-bin.tar.gz

A partir de aquí se crean los directorios de datos y logs (por ejemplo /var/lib/cassandra y /var/log/cassandra), y se adaptan los ficheros de configuración (que están en /usr/local/apache-cassandra-0.8.2/conf). El que más nos interesa ahora es cassandra.yaml. En github hay un ejemplo de configuración de este fichero, aunque sea casi por defecto, de cada uno de los tres nodos que he usado en esta prueba.

Hay un par de opciones de configuración que pueden servir para comprender la estructura del sistema. Un cluster de Cassandra se entiende como un anillo en el que cada uno de los nodos gestiona un volumen determinado de datos (replicación aparte). Básicamente estamos hablando de una serie de claves (hashes) que se distribuyen de una forma más o menos equilibrada entre todas las máquinas. Aunque no sea estrictamente necesario, como para esta prueba he usado un número fijo de nodos (3), he asignado ya desde el comienzo un 33% de datos a cada uno. Por supuesto, en un entorno real en el que se añaden y quitan nodos dinámicamente la gestión es diferente. La configuración en mi laboratorio es la siguiente:

  # Nodo 1
  initial_token: 0
  # Nodo 2
  initial_token: 56713727820156410577229101238628035242
  # Nodo 3
  initial_token: 113427455640312821154458202477256070485

La otra opción que merece la pena comentar es "seed_provider". Cassandra usa un protocolo tipo Gossip para distribuir la información entre los nodos. Esto significa que, cuando se añade un nuevo miembro al cluster, es suficiente con indicarle un servidor activo del mismo. El protocolo se encarga de propagar esta nueva información en todos los nodos. Por lo tanto, la configuración se limita a especificar una (o varias) IPs activas:

  seed_provider:
      - class_name: org.apache.cassandra.locator.SimpleSeedProvider
        parameters:
            - seeds: 192.168.10.145

Hay mucha más opciones de configuración, por supuesto, pero lo dejamos aquí.

En este momento ya se podría ejecutar Cassandra, pero esperaremos a instalar Solandra.

Solandra

Hablemos antes de lo que es Solr, una vez más, muy muy por encima.

Solr es una plataforma de búsqueda construida sobre la librería Lucene. Simplificando mucho, y en el contexto de este post, ofrece un interfaz XML (es el que nos interesa aquí, pero no el único) para añadir documentos, y un API HTTP a través de cual recibir resultados en formato JSON (entre otros).

Todo se verá más claro cuando creemos el schema para la gestión de logs. Para el que quiera profundizar más en Solr, aquí tiene un libro.

Volvamos a Solandra. La instalación es sencilla. Una vez descargado el tar.gz desde githubsolandra, y con ant y los binarios de java en el path, ejecutamos lo siguiente en cada uno de los nodos:

  # El nombre del fichero cambia
  cd /usr/local && tar xvzf /tmp/tjake-Solandra-4f3eda9.tar.gz
  cd tjake-Solandra-4f3eda9/
  ant -Dcassandra=/usr/local/apache-cassandra-0.8.2 cassandra-dist

Si todo va bien, en unos minutos la salida estándar mostrará lo siguiente:

  ....
  cassandra-dist:
       [copy] Copying 36 files to /usr/local/apache-cassandra/lib
       [copy] Copying 8 files to /usr/local/apache-cassandra/conf
       [copy] Copying 1 file to /usr/local/apache-cassandra/bin
       [echo] Libraries successfully copied into cassandra distribution
       [echo] Start the cassandra server with /usr/local/apache-cassandra/bin/solandra command

  BUILD SUCCESSFUL
  Total time: 2 minutes 43 seconds

Durante la instalación deberían haberse copiado las librerías y scripts en el arbol de Cassandra. Incluyendo algunos ficheros de configuración, que en este caso dejamos por defecto, aunque lo normal sería que los adaptásemos.

Sin más, vamos a arrancar Solandra (y con ello Cassandra), en cada nodo. Como no hemos hecho ningún cambio en el logueo, vamos a ver todos los mensajes por la salida estándar:

  cd /usr/local/apache-cassandra-0.8.2/bin/
  ./solandra &

Y con esto ya debería estar todo listo. El siguiente paso es añadir el schema (parecido a como se haría en un Solr estándar), volcar los datos, y preparar el interfaz web. Pero esto es cosa de un tercer post. No pensaba escribirlo, pero bueno, este ya es demasiado largo.

read more

Gestión de logs con Solandra I

mar, 02 ago 2011 by Foron

Allá por el 2008, Rackspace(Mailtrust) publicaba algunos datos sobre la forma en la que había ido evolucionado su sistema de gestión de logs de la infraestructura de correo electrónico, y que por aquel entonces ya superaba holgadamente los 100GB de crecimiento diario. Junto a este documento, en una de las referencias bibliográficas sobre Hadoop, esta misma empresa explicaba, con algo de detalle técnico, la forma en la que habían implementado su sistema, basado sobre todo en Hadoop y Lucene+Solr.

Aunque Hadoop siga siendo una solución magnífica para la gestión de logs en el contexto del Software Libre (siempre hablando de volúmenes de datos realmente muy grandes), en esta serie de posts vamos a ver cómo podemos llevar la idea de Rackspace a la práctica usando otro tipo de tecnología, y más concretamente, Cassandra.

En realidad, como mis tres lectores no quieren posts demasiado largos, en lugar de entrar en los detalles de lo que sería una implementación más o menos "casera", vamos a usar una de las aplicaciones que Datastax (una empresa que da servicios comerciales para Cassandra) está potenciando como parte de su ecosistema alrededor de Cassandra, y que se llama Solandra.

El problema

Repasemos, simplificando un poco, la evolución de Rackspace:

  1. En una primera fase, los logs se almacenan en máquinas individuales. Cuando hay alguna incidencia, algún técnico tiene que entrar a hacer un grep. Si el negocio va bien, llegará un momento en el que el tiempo perdido haciendo estas búsquedas será, por lo menos, "crispante".
  2. En la segunda fase, los logs pasan a gestionarse a través de un syslog centralizado. En realidad, esta no es más que una versión algo mejorada de la primera evolución, pero al menos facilita el trabajo. En cualquier caso, en el fondo se sigue perdiendo mucho tiempo en la búsqueda manual en logs.
  3. La solución más natural en este punto es volcar los datos a una base de datos, y con ello a algún tipo de interfaz web. Dejaremos a un lado el desarrollo del frontend y de los scripts que cargan los datos en la bbdd (que pueden no ser en absoluto triviales, en función de la complejidad de la plataforma).

Hasta aquí, vale, todo es razonablemente sencillo. Sin embargo, cuando se gestionan digamos que 25 millones de mensajes al día, y cuando se quieren mantener 2 años de información (es un decir), nos encontramos con un problema.

¿Cómo se soluciona?

Aquí ya cada uno toma decisiones en función de su capacidad, su presupuesto, los perfiles que tiene disponibles, .... En algunos casos, mantener 30 días en base de datos (lo que genera la mayoría de incidentes), puede ser suficiente. En otros casos, se trabaja con los mecanismos que ofrecen las bases de datos para escalar (el framework Gizzard de Twitter es un estupendo ejemplo, aunque no hablemos de logs). Y por último, algunos pasan a otras soluciones, ya sean de pago o libres. En el caso de Rackspace, por ejemplo, su opción fue Hadoop y Lucene+Solr.

Una vez más, cada una de estas opciones puede ser "la mejor" en función del entorno en el que se desarrolle. Pero claro, si quiero seguir con este post tengo que optar por la tercera alternativa, obviamente :) .

Vamos por partes:

  1. Queremos almacenar un volumen de datos muy significativo. Para unos pocos GB todo esto no tiene demasiado sentido.
  2. Queremos que lo único necesario para aumentar la capacidad sea añadir nuevo hardware. Nada más. Ni cambios en la programación, ni cambios en la arquitectura.
  3. Queremos poder hacer consultas complejas sobre estos logs, en base a origen, destino, rangos de fechas, .... Por ejemplo, sería estupendo poder consultar todo el spam enviado a cuentas tipo info@ en toda la plataforma en un periodo de tiempo concreto.
  4. Aunque el tiempo real no es un requerimiento, es preferible poder hacer estas consultas y recibir los resultados al momento, en una misma sesión de navegador.

Para conseguir los puntos 1 y 2 Hadoop es una solución estupenda. Para 3 y 4 es necesario más trabajo. El acceso "casi inmediato" a los datos se conseguiría con alternativas como HBase, tan de moda ahora que Facebook ha empezado a usarlo. Además, siempre disponemos de Pig y Hive para simplificar las consultas. Ahora bien, de una u otra manera, con Hadoop es bastante probable que tengamos que programar bastante código.

La otra alternativa viene de la mano de Cassandra. Una vez más, los puntos 1 y 2 son inmediatos con esta tecnología. Al igual que con Hadoop, 3 y 4 no lo son; pero gracias a la aplicación llamada Solandra, que no deja de ser un Solr que guarda sus índices en Cassandra, podemos conseguir la capacidad de búsqueda de Lucene, el interfaz tipo REST que ofrece Solr, y la escalabilidad de Cassandra. Todo en uno.

El post se ha alargado un poco. Dejamos la parte práctica para el segundo (y último) mensaje de esta serie.

read more

Computación distribuida con Hadoop IV

dom, 30 may 2010 by Foron

Antes de que me adelanten, y sobre todo por las durísimas presiones por parte de las tres personas que leen este blog :-), voy a terminar esta serie de posts, hablando sobre una de las herramientas que se han implementado para dar un mayor nivel de abstracción (o facilidad de uso, como se prefiera decir) a Hadoop.

Hadoop tiene un problema, o al menos un argumento que se usa en su contra: Desarrollar una aplicación MapReduce lleva demasiado tiempo. A pesar de que el Streaming del que hablamos en posts anteriores lo solucione en parte, es cierto que si queremos trabajar de forma "nativa" con el framework tenemos que estar preparados para escribir un buen puñado de líneas de código.

Para dar salida este problema han ido apareciendo distintas alternativas, entre las que destacan Hive (by Facebook) y Pig (by Yahoo). En este post me voy a centrar en esta última, aunque esto no significa, ni mucho menos, que sea mejor que Hive.

Gracias a Pig se pueden escribir "en 4 lineas" aplicaciones MapReduce. Dicho de otra forma, permite que nos centremos en el qué queremos hacer con nuestros datos, y no en el cómo lo hacemos. Se basa en dos componentes:

  • Un sencillo lenguaje para escribir los scripts, llamado "pig latin".
  • El ""compilador"", que trasforma los scripts en aplicaciones MapReduce para su ejecución en el cluster Hadoop.

Junto a esto, Pig tambén ofrece un shell interactivo (se llama grunt), para poder ejecutar comandos.

Bien, terminada la teoría (fácil, ¿no?), vamos a centrarnos en los datos, tal y como hemos dicho antes. (Para el que no haya leido los posts anteriores, este sería un buen momento).

Hasta ahora hemos estado preparando nuestra plataforma de logueo para poder disponer de años, y quizá terabytes, de logs en un mismo lugar, barato y redundado, del que poder sacar información útil en unos pocos minutos. Recordemos:

  1. Los logs de todos nuestros servidores se consolidan en una/varias máquinas syslog (o similar).
  2. Los logs se copian tal cual en Hadoop. A pesar de que vayan a estar replicados, no es mala idea tener una copia extra en cinta.
  3. Ejecutamos un primer script MapReduce que lee los scripts y los adapta al formato que queremos y sobre el que trabajaremos.
  4. Ya no necesitamos los ficheros originales, los podemos borrar de Hadoop. En caso de desastre, siempre podríamos recurrir a las cintas.

¿Cómo guardábamos los logs?

El resultado del reducer.pl son líneas como esta:

  010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com mail.example.org[10.0.2.34],15853,1268223532,5A4FE29B3B6,43A9B28B357,Mar 10 12:18:52 destinovarios@example.com destinovarios@example.com Antispam sent (250 OK sent queued as 43A9B28B357

Recordad que el script guarda mucha información formateada en base64, por lo que en realidad tenemos algo similar a esto:

  2010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com texto_en_base64

¿Por qué?

Porque todas las consultas que se suelen hacer son del tipo:

  • No he recibido un mensaje enviado entre las x y las z del martes, enviado desde esta cuenta, y con destino a esta otra.
  • Quiero todos los mensajes que se recibieron en tal buzón.

La estructura de logs que hemos preparado va a permitir que los scripts sean muy sencillos, sin ningún tipo de join o similar (ahora lo vemos). Dejaremos al interfaz web desde el que se van a ver los resultados el peso de formatear las entradas y hacerlas "visuales".

¿Interfaz web?

Nada mejor que un sencillo interfaz web en el que escribir los parámetros de búsqueda, preparar el script .pig, ejecutarlo, y mostrar los resultados una vez termine. Evidentemente, no voy a escribir aquí el formulario, pero por dar un ejemplo, imaginemos que queremos saber los correos que han llegado desde infojobs entre las 8 a.m. del 11 de marzo, y las 12 a.m. del mismo día.

Empezamos pasando las fechas a timestamp. Muy fácil:

  $ date -u -d 'Thu Mar 11 08:00:00 2010' '+%s'
  1268294400

  $ date -u -d 'Thu Mar 11 12:00:00 2010' '+%s'
  1268308800

Con esto y con la dirección origen ya tenemos todo lo necesario. Ahora lo normal sería escribir un fichero .pig, pero aquí vamos a usar el intérprete de comandos grunt para ver los resultados paso a paso:

  $ pig
  10/05/25 21:57:29 INFO pig.Main: Logging error messages to: /usr/local/pig-0.7.0/pig_1274817449369.log
  2010-05-25 21:57:29,688 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://fn140
  2010-05-25 21:57:30,149 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: fn140:8021
  grunt>

Empezamos cargando los datos:

  grunt> REGISTROS = LOAD '/user/hadoop/logs_formateados' USING PigStorage('\t') AS (msgid:chararray, fecha:long, origen:chararray, destinoreal:chararray, destinoentrada:chararray, resto:chararray);

Con este comando hemos cargado los datos desde el directorio /user/hadoop/logs_formateados en la variable REGISTROS. Los ficheros de entrada se separan con un tabulador [1], con el schema que hemos venido usando. Veamos, antes de seguir, si Pig ha entendido lo que realmente queremos:

  grunt> ILLUSTRATE REGISTROS
  2010-05-25 23:58:36,627 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://fn140
  2010-05-25 23:58:36,681 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: fn140:8021
  2010-05-25 23:58:36,791 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
  2010-05-25 23:58:36,792 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
  --------------------------------------------------------------------------------------------------------------------
  | REGISTROS | msgid: chararray | fecha: long | origen: chararray   | destinoreal: chararray | destinoentrada: chararray | resto: chararray |
  --------------------------------------------------------------------------------------------------------------------
  |           | 6B160A8CF29E064XA067E116FA2A458CAD8A66@GLOBL0310V01.exampleserv.com | 1268304751  | pr1user@example.com | mga@example.net        | mga@example.net           | texto_base64     |
  --------------------------------------------------------------------------------------------------------------------

Ahora que tenemos todos los registros, vamos a filtrarlos en base a los criterios que hemos definido:

  grunt> FILTRADOS = FILTER REGISTROS BY (origen matches '.+infojobs.+') AND (fecha > 1268294400) AND (fecha < 1268301600);

Hecho esto sólo nos queda agruparlos por msgid. Recordad, tenemos dos entradas por mensaje: una para el nivel antispam, y otra para la entrega. En realidad, en base al msgid podríamos seguir un mensaje por tantos saltos como tuviéramos (algo parecido a lo que Mailtrust ha documentado que hace):

  grunt> AGRUPADOS = GROUP FILTRADOS BY msgid;

Y con esto ya tenemos el resultado. Lo podríamos confirmar con el comando DUMP, que muestra los datos por salida estándar, pero lo vamos a guardar directamente en un fichero:

  grunt> STORE AGRUPADOS INTO '/tmp/fichero_resultados.txt' USING PigStorage();
  2010-05-26 22:17:03,942 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_LONG 2 time(s).
  2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.impl.logicalLayer.optimizer.PruneColumns - No column pruned for REGISTROS
  2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.impl.logicalLayer.optimizer.PruneColumns - No map keys pruned for REGISTROS
  2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.ResourceSchema - Insert two-level access to Resource Schema
  2010-05-26 22:17:03,983 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_LONG 2 time(s).
  2010-05-26 22:17:03,985 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: Store(hdfs://fn140/tmp/fichero_resultados.txt:PigStorage) - 1-1007 Operator Key: 1-1007)
  2010-05-26 22:17:03,986 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
  2010-05-26 22:17:03,986 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
  2010-05-26 22:17:04,019 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
  2010-05-26 22:17:05,180 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
  2010-05-26 22:17:05,188 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
  2010-05-26 22:17:05,191 [Thread-120] WARN  org.apache.hadoop.mapred.JobClient - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
  2010-05-26 22:17:05,545 [Thread-120] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
  2010-05-26 22:17:05,546 [Thread-120] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
  2010-05-26 22:17:06,382 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201005262134_0008
  2010-05-26 22:17:06,382 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://fn140:50030/jobdetails.jsp?jobid=job_201005262134_0008
  2010-05-26 22:17:06,384 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
  2010-05-26 22:17:19,214 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 10% complete
  2010-05-26 22:17:20,719 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 20% complete
  ...............
  2010-05-26 22:17:28,241 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 60% complete
  2010-05-26 22:17:41,788 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
  2010-05-26 22:17:41,788 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Successfully stored result in: "hdfs://fn140/tmp/fichero_resultados.txt"
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Records written : 72
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Bytes written : 44727
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Spillable Memory Manager spill count : 0
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Proactive spill count : 0
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

Vaya, parece que tenemos 72 usuarios suscritos a infojobs. Veamos un ejemplo de lo que se ha escrito en "/tmp/fichero_resultados.txt" (es un path de HDFS, no el /tmp local). Los saltos de línea los he puesto yo para que sea más visual, y he escrito la versión sin codificar en base64:

  1268304853778.112.POCFQELSVS@[hellboy07]
  {
      (1268304853778.112.POCFQELSVS@[hellboy07],1268308417,aviso_incripciones@push.infojobs.net,usuario@example.com,
        usuario@example.com,localhost[127.0.0.1],11854,1268308418,9ABC12FACB4,-1,
        Mar 11 11:53:38 usuario@example.com usuario@example.com Maildrop sent (delivered via maildrop service))
  ,
      (1268304853778.112.POCFQELSVS@[hellboy07],1268308417,aviso_incripciones@push.infojobs.net,usuario@example.com,
        usuario@example.com,push3.infojobs.net[79.171.25.71],11107,1268308418,3B0213FAECD,9ABC12FACB4,
        Mar 11 11:53:38 usuario@example.com usuario@example.com Antispam sent (250 OK queued as 9ABC12FACB4)
  }

Ahora sólo necesitamos que nuestro interfaz web formatee el resultado y lo saque por pantalla. Fácil y sencillo.

Resumiendo, con estas cuatro líneas podemos hacer cualquier búsqueda en los logs, tengan el tamaño que tengan:

  REGISTROS = LOAD '/user/hadoop/logs_formateados' USING PigStorage('\t') AS (msgid:chararray, fecha:long, origen:chararray, destinoreal:chararray, destinoentrada:chararray, resto:chararray);
  FILTRADOS = FILTER REGISTROS BY (origen matches '.+infojobs.+') AND (fecha > 1268294400) AND (fecha < 1268301600);
  AGRUPADOS = GROUP FILTRADOS BY msgid;
  STORE AGRUPADOS INTO '/tmp/fichero_resultados.txt' USING PigStorage();

Referencias

Hay muchos artículos y documentación disponible en la red sobre Hadoop. En cuanto a bibliografía, que yo sepa hay al menos tres libros disponibles:

  • Pro Hadoop, de la editorial Apress.
  • Hadoop: The Definitive Guide, de la editorial O'Reilly.
  • Hadoop in Action, de la editorial Manning.

Notas

[1]La verdad es que he hecho trampa. El reducer original separaba el id de mensaje del resto de datos con un tabulador, pero luego usaba un espacio simple. Esto es porque al principio tenía pensado trabajar otros aspectos de Hadoop y Pig, como el Chaining y las "user defined functions". Al final, para no tener que escribir más posts, he cambiado el reducer para que separe todos los campos con tabuladores.
read more

Computación distribuida con Hadoop III

dom, 21 mar 2010 by Foron

En este tercer post de la serie dejamos la teoría para ver un caso práctico de aplicación de Hadoop, que completaré en un cuarto post.

Como sabemos, gracias a Hadoop conseguimos un sistema con mucha capacidad de almacenamiento, a un coste por mega reducido, redundado, de rendimiento prácticamente lineal (más máquinas = más rendimiento) y sobre el que ejecutar de forma pararela todo tipo de aplicaciones que se puedan implementar como MapReduce.

Con estos datos, hay una utilidad práctica que viene inmediatamente a la mente: tratamiento de logs. Veamos:

  • Me gustaría decir lo contrario, pero personalmente no conozco a nadie que maneje el dinero destinado a proyectos que no ponga pegas a una inversión fuerte para el tratamiento de logs. El coste por mega de Hadoop es difícil de superar.
  • Si combinamos Hadoop, por ejemplo para datos con hasta un año de antiguedad, con un soporte en cinta para el resto, lo tendremos fácil para cumplir cualquier legislación en lo relacionado a retención de datos.
  • Sin llegar a los niveles de Facebook o Rackspace, es probable que el volumen de logs que guardemos durante un año sea lo suficientemente grande como para hacer difícil su tratamiento si un juez (por ejemplo) pide todos los correos desde la cuenta x a la cuenta z de los últimos siete meses, o todos los acceso al foro web de fulanito. MapReduce y Hadoop permiten ejecutar este tipo de tareas sin mayores problemas, y con un rendimiento prácticamente lineal.

En este entorno, planteo un ejemplo de tratamiento de logs de Postfix de lo más simple, con un filtro de contenido en modo post-queue (Amavis, por ejemplo, en su uso más habitual). Vamos, nada nuevo.

El objetivo sería, además de los puntos mencionados anteriormente, poder realizar cualquier tipo de consulta sobre los logs de meses (o años), en un tiempo razonable. Y digo tiempo razonable porque en este tipo de consultas no suele hacer falta tener el resultado al momento. Se acepta esperar durante 1,5 o 10 minutos hasta que se genere el resultado.

Resumiendo, tenemos una infraestructura parecida a esta:

Infraestructura Hadoop y Amavis

Todos los servidores envían los logs de manera centralizada a un syslog remoto, que además de hacer una copia diaria a cinta, almacena la información en Hadoop, usando alguno de los muchos interfaces que se ofrecen, como pueden ser FUSE o FTP.

Junto a esto tendremos una máquina, preferiblemente un interfaz web, para generar trabajos MapReduce. Básicamente necesitaremos un formulario (cuenta origen, destino, fechas, ...), y algo de control sobre los trabajos que ya se han realizado para mostrar los resultados.

Streaming

Se pueden crear trabajos MapReduce para Hadoop de varias formas, pero en este ejemplo me voy a limitar a lo que se llama "Streaming". Dicho de otra forma (aunque no sea del todo cierto), a programar aplicaciones MapReduce en perl o python (o en cualquier lenguaje que pueda leer y escribir en la entrada y salida estándar).

La gran ventaja que tiene el Streaming es que permite desarrollar prototipos de una forma rápida, aunque no sea lo más eficiente. En cualquier caso, sobra para este ejemplo.

La mejor forma de explicar lo que es el Streaming en Hadoop es ver el equivalente, en versión linux:

  cat datos_entrada.txt | mapper.pl | sort | reducer.pl

El mapper.pl recibirá los datos desde los ficheros de log, línea por línea, y los tratará, escribiendo por salida estándar con el formato clave "tabulador" valor.

Esta salida se ordena, y pasa a un reducer. Hadoop garantiza que todas las claves serán tratadas por el mismo reducer, aunque éste puede, y normalmente así lo hará, procesar varias claves (este es un buen momento para recordar que los ficheros de entrada pueden ser, por decir algo, 200GB de datos, sin que suponga ningún problema para Hadoop).

Volviendo al ejemplo, veamos unas cuantas líneas de log de postfix:

  Feb 20 14:27:03 server postfix/smtpd[17875]: E78D13073B2: client=mail.example.com[10.0.0.6]
  Feb 20 14:27:03 server postfix/cleanup[17822]: E78D13073B2: message-id=
  Feb 20 14:27:04 server postfix/qmgr[5370]: E78D13073B2: from=, size=20827, nrcpt=1 (queue active)
  Feb 20 14:27:04 server postfix/smtp[17980]: E78D13073B2: to=, relay=127.0.0.1[127.0.0.1]:10025, delay=0.6, delays=0.08/0/0/0.51, dsn=2.0.0, status=sent (250 Ok: queued as 6A737307316)
  Feb 20 14:27:04 server postfix/qmgr[5370]: E78D13073B2: removed

  Feb 20 14:27:04 server postfix/smtpd[17882]: 6A737307316: client=localhost[127.0.0.1]
  Feb 20 14:27:04 server postfix/cleanup[17817]: 6A737307316: message-id=
  Feb 20 14:27:04 server postfix/qmgr[5370]: 6A737307316: from=, size=21719, nrcpt=1 (queue active)
  Feb 20 14:27:04 server postfix/pipe[16491]: 6A737307316: to=, relay=maildrop, delay=0.16, delays=0.11/0.01/0/0.04, dsn=2.0.0, status=sent (delivered via maildrop service)
  Feb 20 14:27:04 server postfix/qmgr[5370]: 6A737307316: removed

Esta es la información que postfix loguea al recibir y entregar un mensaje usando un filtro after-queue. He ordenado los logs para que sea más claro, pero en la realidad no es así.

En este ejemplo se recibe un correo desde mail.example.com, y se le asigna el queue-id E78D13073B2. También se loguea el message-id, el origen y el tamaño.

Una vez se ha encolado el mensaje, se envía a Amavis, a través de 127.0.0.1:10025. Como se puede ver en el ejemplo, en este punto sabemos el destino del mensaje, y también si fue aceptado o no. Lo sabemos porque el log hace referencia al queue id que amavis reinjecta en postfix. De no haberlo aceptado no tendríamos un queue id en el log.

Este nuevo mensaje mantiene el message-id, el tamaño es algo mayor, y termina entregandose, en este caso en la máquina local, usando la aplicación courier-maildrop.

Bien, esto no es más que un ejemplo. En la realidad hay que tratar muchos casos: aliases, reenvíos a máquinas remotas, bounces, mensajes que pasan a deferred por no haber podido entregarse, ....

Además, e insistiendo en que esto no es más que un ejemplo, voy a asumir dos cosas, que aunque se cumplen a menudo, no son ciertas:

  • Que el queue-id es único.
  • Que el message-id es único.

Mapper.pl

El proceso comienza con Hadoop enviando los ficheros de entrada (logs) a la entrada estándar de los mapper.pl que estemos ejecutando en el cluster, que tratarán cada línea y la enviarán por salida estándar con el formato:

  queueidinformacion_separada_por_comas

Para separar la clave del valor se usa un tabulador. Normalmente, los campos del valor se separan con espacios, pero en este ejemplo he preferido las "comas".

Veamos un trozo del código:

 ..............
 while (<>)
 ..............
 if ($linea =~ /^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) .+ postfix\/smtpd\[\d+\]: (\w+): client=(\S+)$/) {
                 $estructura{fecha} = $1; $estructura{qid} = $2; $estructura{host} = $3;
                 print $estructura{qid} . "\t" . "Conexion," . join (",",$estructura{fecha},$estructura{host}) . "\n";
         }
          elsif ($linea =~ /^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) .+ postfix\/smtp\[\d+\]: (\w+): to\=\<([^ ]+)\>, relay=127.0.0.1.*status=(.*) (\w+)\)$/) {

                 $estructura{fecha} = $1; $estructura{qid} = $2; $estructura{to} = $3; $estructura{origto} = $3; $estructura{status} = $4; $estructura{relay} = "Antispam"; $estructura{rqid} = $5; $estructura{status} =~ s/,//g;
                 print $estructura{qid} . "\t" . "Antispam," . join (",",$estructura{fecha},$estructura{to},$estructura{to},$estructura{relay},$estructura{status},$estructura{rqid}) . "\n";
         }
 ...........
 }

Tenemos varios "if/elsif", que adaptan la línea del log de entrada al formato que quiero. Además, he añadido un texto con el tipo de entrada de log que estamos tratando, y que sabemos por el "if" en el que estamos (Antispam en este ejemplo), para luego usarlo en el reducer.

El código completo incluye también la detección de los mensajes que Amavis considera que son spam, en los que pone el rqid como 0. Además, en aquellos casos donde "no proceda" aplicar el concepto de rqid pongo -1 como valor. Todo esto servirá en el reducer.

Volviendo a los logs de ejemplo, el resultado de ejecutar el mapper sobre ellos es:

  E78D13073B2     Conexion,Feb 20 14:27:03,mail.example.com[10.0.0.6]
  E78D13073B2     Msgid,Jan 20 06:27:03,F0E59D47X9B343A7DFRAB1F0E45@EUW0600375
  E78D13073B2     Origen,Feb 20 14:27:04,prueba@example.com,20827
  E78D13073B2     Antispam,Feb 20 14:27:04,destino@example.org,destino@example.org,Antispam,sent (250 Ok: queued as,6A737307316)
  E78D13073B2     Cierre,Feb 20 14:27:04
  6A737307316     Conexion,Feb 20 14:27:04,localhost[127.0.0.1]
  6A737307316     Msgid,Feb 20 14:27:04,F0E59D47X9B343A7DFRAB1F0E45@EUW0600375
  6A737307316     Origen,Feb 20 14:27:04,prueba@example.com,21719
  6A737307316     Maildrop,Feb 20 14:27:04,destino@example.org,destino@example.org,Maildrop,sent (delivered via maildrop service),-1
  6A737307316     Cierre,Feb 20 14:27:04

Después de esto Hadoop ejecutaría un sort (en el ejemplo ya están ordenados por queue-id), y volvería a enviar el resultado a los reducer.pl del cluster. Por lo tanto, sabemos que un reducer va a recibir todas las líneas de log correspondientes a un mismo queue-id.

Reducer.pl

Pasamos directamente a ver algo del código del reducer.pl:

 my %mensaje = (qid => "",
             fecha => "",
             servidor => "",
             msgid => "",
             origen => "",
             size => "",
             to => "",
             origto => "",
             rqid => "",
             relay => "",
             status => "",
             cierre => ""
             );

 ..............
 while () {
     chomp;
     ($qid, $valor) = split /\t/;

         if ($qid ne $qidanterior) {
                 # tenemos un queueid nuevo
                 if ($qidanterior ne "") {
                         &mostrar_datos();

                         %mensaje = (qid => "",
                                 fecha => "",
                                 desdeip => "",
                                 msgid => "",
                                 origen => "",
                                 size => "",
                                 to => "",
                                 origto => "",
                                 rqid => "",
                                 status => "",
                                 cierre => ""
                         );
                 }
                 $qidanterior = $qid;
         }

         # seguimos procesando los datos del mismo qid
         $mensaje{qid} = $qid;
         @datos = split(",",$valor);

         switch ($datos[0]) {
                 case "Conexion" {
                         $mensaje{fecha} = $datos[1];
                         $mensaje{desdeip} = $datos[2];
                 }
                 case "Msgid" {
                         $mensaje{msgid} = $datos[2];
                 }
                 case "Origen" {
                         $mensaje{origen} = $datos[2];
                         $mensaje{size} = $datos[3];
                 }
                 case "Antispam" {
                         $mensaje{to} .= $datos[2] . "-separador-";
                         $mensaje{origto} .= $datos[3] . "-separador-";
                         $mensaje{status} .= $datos[1] . " " .$datos[2] . " " . $datos[3] . " " . $datos[4] . " " . $datos[5] . "-separador-";
                         $mensaje{rqid} = $datos[6];
                 }

 ................
 sub mostrar_datos {
      ................
      if ( ($mensaje{fecha} ne "") && ($mensaje{cierre} ne "") ) {
           # mensaje completo: Salida estandar
           # hay que revisar que se haya pasado por todas las fases
           # y realizar comprobaciones, que aqui no se hacen
           $fechaentrada = `date -u -d '$mensaje{fecha}' '+%s'`; chomp $fechaentrada;
           $fechasalida = `date -u -d '$mensaje{cierre}' '+%s'`; chomp $fechasalida;

           $datoscodificables = $mensaje{desdeip} .",".$mensaje{size}.",".$fechasalida.",".$mensaje{qid}.",".$mensaje{rqid}.",".$mensaje{status};
           $resto = encode_base64($datoscodificables,"");

           print $mensaje{msgid} . "\t" . $fechaentrada . " " . $mensaje{origen} . " " . $mensaje{to} . " " . $mensaje{origto} . $resto . "\n";

     } else {
           # mensaje incompleto. Guardar en otro fichero para poder seguir el tracking
           # Falta por hacer
     }

 }

Por lo tanto, cogemos cada línea de entrada, y ayudándonos de la descripción del tipo de línea que hemos añadido en el mapper, vamos completando la estructura de datos que después será el resultado (por fin) que queremos.

Un ejemplo de la ejecución de este trabajo en mi cluster de prueba:

  $ hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-0.20.2-streaming.jar -input /user/hadoop/mail_logs -output /user/hadoop/logs_formateados/domingo_14_03 -file /home/hadoop/mapper.pl -file /home/hadoop/reducer.pl -mapper "/usr/bin/perl -w mapper.pl" -reducer "/usr/bin/perl -w reducer.pl"

  packageJobJar: [/home/hadoop/mapper.pl, /home/hadoop/reducer.pl, /home/hadoop/hadoop-unjar4902963819550178151/] [] /tmp/streamjob534817239215339193.jar tmpDir=null
  10/03/18 22:20:02 INFO mapred.FileInputFormat: Total input paths to process : 4
  10/03/18 22:20:02 INFO streaming.StreamJob: getLocalDirs(): [/home/hadoop/mapred/local]
  10/03/18 22:20:02 INFO streaming.StreamJob: Running job: job_201003182146_0001
  10/03/18 22:20:02 INFO streaming.StreamJob: To kill this job, run:
  10/03/18 22:20:02 INFO streaming.StreamJob: /usr/local/hadoop/bin/hadoop job  -Dmapred.job.tracker=fn140:8021 -kill job_201003182146_0001
  10/03/18 22:20:02 INFO streaming.StreamJob: Tracking URL: http://fn140.forondarena.net:50030/jobdetails.jsp?jobid=job_201003182146_0001
  10/03/18 22:20:03 INFO streaming.StreamJob:  map 0%  reduce 0%
  10/03/18 22:20:17 INFO streaming.StreamJob:  map 8%  reduce 0%
  10/03/18 22:20:18 INFO streaming.StreamJob:  map 11%  reduce 0%
  10/03/18 22:20:20 INFO streaming.StreamJob:  map 15%  reduce 0%
  .....
  10/03/18 22:21:56 INFO streaming.StreamJob:  map 92%  reduce 25%
  10/03/18 22:21:59 INFO streaming.StreamJob:  map 100%  reduce 25%
  ....
  10/03/18 23:14:13 INFO streaming.StreamJob:  map 100%  reduce 98%
  10/03/18 23:15:55 INFO streaming.StreamJob:  map 100%  reduce 99%
  10/03/18 23:17:31 INFO streaming.StreamJob:  map 100%  reduce 100%
  10/03/18 23:18:25 INFO streaming.StreamJob: Job complete: job_201003182146_0001
  10/03/18 23:18:25 INFO streaming.StreamJob: Output: /user/hadoop/logs_formateados/domingo_14_03

El resultado es un fichero en la ruta "/user/hadoop/logs_formateados/domingo_14_03", con líneas formadas por un message-id, seguido por un tabulador, seguido del origen, el destino, el destino antes de la expansión de aliases, y del resto de la información, codificada en base64, todo separado por espacios.

No hace falta decir que el formato de salida en un entorno real se debería adaptar a lo que se necesitase. De hecho, Rackspace usa una estructura binaria, donde entre otras cosas se guarda la información de los diferentes saltos que hace un correo dentro de su sistema. Este ejemplo, por supuesto, es mucho más sencillo.

Al final del proceso hemos obtenido un fichero, que es el que de verdad queremos en nuestro cluster, con toda la información de cada mensaje que ha entrado en nuestra red en un periodo de tiempo concreto (digamos que cada 4 horas volcamos los logs de syslog a Hadoop y ejecutamos la tarea), como se ve en los siguientes ejemplos (sin la codificación en base64, para que sean más fáciles de seguir):

Mensaje que se entrega en un buzón local:

  ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org 1268225082 origen@example.org destino@example.com destino@example.com mail.example.org[10.0.2.34],2430,1268225085,B6A6A296E55,158AB296F24,Mar 10 12:44:45 destino@example.com destino@example.com Antispam sent (250 OK sent queued as 158AB296F24

  ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org 1268225085 origen@example.org destino@example.com destino@example.com localhost[127.0.0.1],3081,1268225085,158AB296F24,-1,Mar 10 12:44:45 destino@example.com destino@example.com Maildrop sent (delivered via maildrop service)

Por lo tanto:

  • El mensaje con id ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org
  • Entró en el sistema a las 1268225082
  • Desde el servidor mail.example.org[10.0.2.34]
  • El origen era origen@example.org
  • Y el destino destino@example.com (no es un alias)
  • Una vez pasado por el sistema antispam se reencola con queueid 158AB296F24
  • El mensaje de la entrega al servidor antispam es: sent (250 OK sent queued as 158AB296F24)
  • Una vez ha pasado el filtro antispam el mensaje se reencola a las 1268225085
  • Por supuesto desde el servidor localhost[127.0.0.1]
  • Y el mensaje de la entrega local es Mar 10 12:44:45 Maildrop sent (delivered via maildrop service), sólo al destino destino@example.com

Mensaje que se entrega a un servidor remoto:

  1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3 1268219047 origen@example.net destino@example.com destino@example.com mail.example.net[10.5.3.2],1901,1268219051,CA6782FABFB,B6C7A2FAACA,Mar 10 11:04:11 destino@example.com destino@example.com Antispam sent (250 OK sent queued as B6C7A2FAACA

  1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3 1268219049 origen@example.net destinoreal@remoto.example.org destino@example.com localhost[127.0.0.1],2686,1268219051,B6C7A2FAACA,-1,Mar 10 11:04:11 destinoreal@remoto.example.org destino@example.com mail.example.org[10.0.2.34]:25 sent (250 2.0.0 Ok: queued as 423872AB862)
  • Su messageid es 1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3
  • Entro en el sistema a las 1268219047
  • Desde el servidor mail.example.net[10.5.3.2]
  • Desde el origen origen@example.net
  • Y al destino destino@example.com, que en la fase antispam todavía no se expande a ningún alias
  • El mensaje se relaciona con B6C7A2FAACA
  • Y el mensaje de la entrega es Mar 10 11:04:11 destino@example.com destino @example.com Antispam sent (250 OK sent queued as B6C7A2FAACA)
  • Se reencola, a través de localhost, a las 1268219049
  • El mensaje pasa a ser la dirección real destinoreal@remoto.example.org, proveniente del alias destino@example.com
  • El mensaje se entrega en mail.example.org[10.0.2.34]:25
  • Con el mensaje sent (250 2.0.0 Ok: queued as 423A72AB262)

Mensaje que es spam:

  9545850218.UWD8KHA190@spammer.example.info 1268238332 spammer@example.net destino@example.com destino@example.com mail.example.info[10.8.2.12],2141,1268238333,F355429B3A5,0,Mar 10 16:25:33 destino@example.com destino@example.com Antispam sent (250 OK sent mensaje_rechazado por spam

En este caso el mensaje es spam, y no se relaciona con ningún otro queueid.

Mensaje que se entrega a varias cuentas:

  2010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com mail.example.org[10.0.2.34],15853,1268223532,5A4FE29B3B6,43A9B28B357,Mar 10 12:18:52 destinovarios@example.com destinovarios@example.com Antispam sent (250 OK sent queued as 43A9B28B357

  2010031011.AB263566.pc1@mailer.example.org 1268223532 usuario@example.org destinolocal@example.com-separador-destinoremoto@example.info destinovarios@example.com-separador-destinovarios@example.com localhost[127.0.0.1],16525,1268223532,43A9B28B357,-1,Mar 10 12:18:52 destinolocal@example.com destinovarios@example.com Maildrop sent (delivered via maildrop service)-separador-Mar 10 12:18:52 destinoremoto@example.info destinovarios@example.com mail.example.info[10.8.2.12]:25 sent (250 2.0.0 Ok: queued as 452362CA51A)
  • Ahora tenemos un mensaje con id 2010031011.AB263566.pc1@mailer.example.org
  • Que se envía desde usuario@example.org
  • Al destino destinovarios@example.com
  • Que a su vez es un alias a dos cuentas destinolocal@example.com y destinoremoto@example.info
  • Los mensajes de entrega a cada una de ellas son:
    • Mar 10 12:18:52 destinolocal@example.com Maildrop sent (delivered via maildrop service)
    • Mar 10 12:18:52 destinoremoto@example.info mail.example.info[10.8.2.12]:25 sent (250 2.0.0 Ok: queued as 452362CA51A)

Como se puede ver, ya tenemos toda la información necesaria, debidamente preformateada, por lo que ya no necesitamos los ficheros mail.log, que podríamos borrar de Hadoop (que no de las cintas).

La información contenida en estos ficheros nos servirá para preparar consultas, bien directamente con otros trabajos MapReduce, bien con otras herramientas, tal y como mostraré en el siguiente post de la serie, en el que nos aprovecharemos del formato de los ficheros para hacer consultas por message-id, cuenta origen, fecha o cuenta destino (según entró en el sistema o una vez expandidos los aliases).

En este punto hay que volver a recordar que Hadoop está pensado para trabajar con cantidades muy grandes de información, en los que hacer un simple "grep" o usar una base de datos relacional estándar resulta impracticable.

Por último, tened en cuenta que el código de este post, además de ser sólo un ejemplo, es completamente Alpha. Aparte de no funcionar del todo bien (tiene varios fallos que he detectado pero que no he corregido todavía), necesitaría tener en cuenta muchos más casos, como mensajes cuya entrega se retrasa durante días (que no deja de ser algo normal en los sistemas de correo reales).

En el cuarto y último post de la serie: acceso a Hadoop a través de una herramienta de apoyo, como pig.

read more