Aug 152011
 

Pasamos a la tercera parte de la serie, en la que ya vamos a ver un ejemplo concreto de lo que puede ser un sencillo interfaz para la gestión de logs usando Solandra. Como siempre, todo el código y los ficheros de configuración más importantes están en Github.

Recordemos los objetivos que nos hemos marcado:

  • Ser capaces de gestionar un volumen muy importante de logs, con la máxima escalabilidad y disponibilidad.
  • Poder añadir los logs en el sistema de una forma sencilla.
  • Tener un interfaz web desde el que poder visualizar los datos.

Ya hemos hablado sobre la escalabilidad de Cassandra en los posts anteriores, así que no vamos a volver a entrar en este punto. Veamos los otros dos:

Inserción de datos en el sistema

Solandra es básicamente una adaptación de Solr, por lo que en realidad vamos a tratar conceptos propios de esta aplicación en lo que queda de post. Cuando veamos algo específico de Solandra, lo señalaré.

Uno de los elementos más importantes de la configuración del sistema es el schema; que viene a ser el lugar en el que se definen los atributos que forman cada “documento” (correo en este caso) que se quiere indexar. Como veremos más adelante, una vez creada esta estructura de datos, usaremos el comando curl para insertarla en el cluster a través de una url determinada. Además, Solandra permite trabajar con varios schemas diferentes de manera simultanea.

Simplicando un poco, el schema está compuesto por dos grandes bloques: La definición de los tipos de datos y la lista de atributos que forman cada documento.

Solr ofrece muchos tipos de datos ya creados de antemano: numéricos, texto, fechas, …. Por si esto fuera poco, la aplicación permite definir nuevos tipos siempre que se considere necesario. Para la gestión de logs de correo, por ejemplo, nos podría venir bien un tipo específico para las direcciones email:

...
<fieldType name="email" class="solr.TextField" >
   <analyzer>
      <tokenizer class="solr.PatternTokenizerFactory" pattern="@" />
      <filter class="solr.LowerCaseFilterFactory" />
      <filter class="solr.TrimFilterFactory" />
   </analyzer>
</fieldType>
...

Este tipo se basa en el TextField clásico, pero forma los tokens alrededor de la “@”. De esta manera, facilitaremos las búsquedas tanto en base a la parte local de las direcciones como al dominio. Además, elimina los espacios y convierte las mayúsculas en minúsculas.

El siguiente paso es la definición de los atributos que componen cada documento, y que por supuesto van a ser de alguno de los tipos disponibles en el schema.

<fields>
    <field name="id" type="uuid" indexed="true" stored="true" required="true" />
    <field name="in_from" type="email" indexed="true" stored="true" />
    <field name="in_size" type="tint" indexed="false" stored="true" />
    <field name="in_fentrada" type="tdate" indexed="true" stored="true" />
    <field name="in_to" type="email" indexed="true" stored="true" />
    <field name="in_to_adicional" type="email" indexed="true" stored="true" multiValued="true" />
    <field name="in_fsalida" type="tdate" indexed="true" stored="true" multiValued="true" />
    <field name="in_estado" type="string" indexed="false" stored="true" multiValued="true" />

    <dynamicField name="*" type="ignored" multiValued="true" />
</fields>

Casi no hace falta explicar nada. En este sencillo ejemplo tenemos un identificador, un origen, un tamaño, una fecha de entrada y una fecha de entrega. Además, con cada mensaje vamos a guardar el estado de entrega para cada destino (pueden haberse realizado varios intentos, por ejemplo a causa del greylisting), y la lista de destinatarios adicionales para los que iba dirigido.

Obviamente, es una estructura limitada. En un entorno real se debería guardar mucha más información (antivirus, antispam, expansión de aliases, …).

Y con esto ya lo tenemos. Sólo queda volcar el schema en el cluster:

curl http://ip_cluster:8983/solandra/schema/correo --data-binary @/root/schema_correo.xml -H 'Content-type:text/xml; charset=utf-8'

Como hemos dicho, al igual que un “/schema/correo”, se podría definir un “/schema/web”, por ejemplo, y usarlos simultáneamente.

El volcado de datos

El volcado de datos se puede hacer de varias formas, pero vamos a limitarnos al uso de ficheros xml. Lo mejor es ver un ejemplo:

<add allowDups="false">
   <doc>
      <field name="in_from">origenspam@example.com</field>
      <field name="id">b309842a-1cf7-11e0-9759-f896edbeae14</field>
      <field name="in_fentrada">2011-04-30T00:17:24Z</field>
      <field name="in_size">941</field>
      <field name="in_to">destino1@target.example.net</field>
      <field name="in_fsalida">2011-04-30T00:17:25Z</field>
      <field name="in_estado">0 - 192.168.10.24_accepted_message./Remote_host_said:_250_2.0.0_Ok:_queued_as_DF2BD74CA71/</field>
      <field name="in_to_adicional">destino1@target.example.net</field>
      <field name="in_to_adicional">destino2@target.example.net</field>
      <field name="in_to_adicional">destino3@target.example.net</field>
      <field name="in_to_adicional">destino4@target.example.net</field>
   </doc>
   <doc>
      <field name="in_from">origenspam@example.com</field>
      <field name="id">b30991b7-1cf7-11e0-9759-e819bb7f7b58</field>
      <field name="in_fentrada">2011-04-30T00:17:24Z</field>
      <field name="in_size">941</field>
      <field name="in_to">destino2@target.example.net</field>
      <field name="in_fsalida">2011-04-30T00:17:24Z</field>
      <field name="in_estado">0 - 192.168.10.24_accepted_message./Remote_host_said:_250_2.0.0_Ok:_queued_as_A945A21CD6B/</field>
      <field name="in_to_adicional">destino1@target.example.net</field>
      <field name="in_to_adicional">destino2@target.example.net</field>
      <field name="in_to_adicional">destino3@target.example.net</field>
      <field name="in_to_adicional">destino4@target.example.net</field>
   </doc>
   <doc>
      <field name="in_from">cliente1@example.net</field>
      <field name="id">b43de232-1cf7-11e0-9759-d71ac36a357f</field>
      <field name="in_fentrada">2011-04-30T00:04:41Z</field>
      <field name="in_size">6077</field>
      <field name="in_to">greylister@example.com</field>
      <field name="in_fsalida">2011-04-30T00:07:27Z</field>
      <field name="in_estado">0 - 172.16.0.14_does_not_like_recipient./Remote_host_said:_450_4.7.1_<cliente1@example.net>:_Sender_address_rejected:_Message_delayed_now.Retry_later,_please./Giving_up_on_172.16.0.14./</field>
      <field name="in_fsalida">2011-04-30T00:12:38Z</field>
      <field name="in_estado">1 - 172.16.0.14_does_not_like_recipient./Remote_host_said:_450_4.7.1_<cliente1@example.net>:_Sender_address_rejected:_Message_delayed_now._Retry_later,_please./Giving_up_on_172.16.0.14./</field>
      <field name="in_fsalida">2011-04-30T00:31:53Z</field>
      <field name="in_estado">2 - 172.16.0.14_does_not_like_recipient./Remote_host_said:_451_4.3.0_<greylister@example.com>:_Temporary_lookup_failure/Giving_up_on_172.16.0.14./</field>
      <field name="in_fsalida">2011-04-30T01:04:43Z</field>
      <field name="in_estado">3 - 172.16.0.14_accepted_message./Remote_host_said:_250_2.0.0_Ok:_queued_as_CB6A3D4A217/</field>
      <field name="in_to_adicional">greylister@example.com</field>
   </doc>
</add>

El volcado, otra vez, es muy sencillo.

curl http://ip_cluster:8983/solandra/correo/update -F stream.file=/tmp/volcado.xml

La conversión de logs desde el más que probable modo texto de syslog a xml, y de ahí al cluster de Solandra, queda fuera de esta serie de posts. De hecho, un servidor piensa que esto es lo realmente importante y difícil para llevar este proyecto a la práctica de una manera “seria”.

El Interfaz

¿Qué mejor que un interfaz web para mostrar los datos que hemos almacenado en Solandra? ¡Sorpresa! la gente detŕas del proyecto ajax-solr ya ha hecho la mayoría del trabajo, así que sólo nos queda modificar un puñado de ficheros, algo de código JavaScript, y ya lo tendremos.

Un detalle más: No queremos permitir el acceso directo a Solandra desde la web, así que necesitamos un proxy que filtre las consultas y las redirija al puerto de Solandra (tcp/8983 por defecto), y que en mi laboratorio escucha en localhost. En este caso, como casi siempre que quiero programar algún tipo de servicio para Internet sin dedicarle mucho tiempo, he usado node.js. Para este ejemplo, y por jugar un poco con GeoIP, he escrito un sencillo proxy que permite conexiones sólo si tienen como referer forondarena.net y si vienen desde Europa o América del Norte. Como no podría ser de otra forma, el código de este proxy también está disponible en Github.


(Nota: Esta demo es un Solr normal, pero el funcionamiento es idéntico al de Solandra).

Conclusiones

El “mercado” está lleno de soluciones de todo tipo que nos pueden ayudar en la gestión de logs. Hay aplicaciones comerciales, como Splunk, sistemas basados en software libre, como Solr, tecnología que nos puede permitir crecer “ilimitadamente”, como Cassandra, Hadoop o Hbase, pero que requieren algo de trabajo; y también tenemos los mágnificos sistemas de bases de datos, como Mysql o Postgresql. ¿Cuál elegir?

En una primera fase, una buena base de datos con un sencillo interfaz web o un Solr estándar pueden servir perfectamente para gestionar todo tipo de logs. De hecho, tanto Mysql como Solr ofrecen alternativas para el particionado que pueden permitir este esquema en la segunda, tercera o cuarta fase.

Un buen consejo que escuche hace tiempo es el de “no arreglar lo que no está roto”. Sólo deberíamos plantearnos el uso de tecnología que probablemente no conozcamos tan bien como las anteriores cuando realmente sea necesario. Llegado ese momento, adelante. Como siempre, la comunidad detrás del software libre es activa y está dispuesta a ayudar. Por si esto fuera poco, cada vez son más comunes las empresas que ofrecen servicios alrededor de este tipo de soluciones, y que pueden asesorarnos llegado el caso.

Aug 052011
 

Seguimos con el segundo post de la serie, en el que pasamos a dar una descripción un poco más técnica de los componentes necesarios para poner en marcha todo lo descrito en el primero. No vamos a entrar en demasiado detalle. En todo caso, una vez conocidas las aplicaciones es más fácil buscar información en la red.

Aunque Solandra puede encargarse de la instalación de Cassandra, aquí vamos a usar los componentes por separado.

Cassandra
Cassandra es un tipo de base de datos creado siguiendo los principios propuestos por Dynamo (Amazon) y por BigTable (Google). El que quiera entrar en detalle tiene bibliografía y mucha documentación disponible.

Explicar el modelo de datos, la replicación o los niveles de consistencia va más allá del objetivo de este post. Lo más interesante en nuestro contexto es dejar claro que Cassandra es una base de datos completamente distribuida y descentralizada, en la que todas las máquinas del cluster cumplen el mismo y único rol, sin distinciones entre “maestros”, “esclavos”, “catálogos”, …. Esto significa que añadir capacidad a un cluster de Cassandra supone básicamente añadir más hierro. Nada más.

La instalación puede complicarse todo lo que queramos, pero lo básico es:

# Cuidado con la versión
cd /tmp/ && wget http://apache.rediris.es/cassandra/0.8.2/apache-cassandra-0.8.2-bin.tar.gz
cd /usr/local/ && tar xvzf /tmp/apache-cassandra-0.8.2-bin.tar.gz 

A partir de aquí se crean los directorios de datos y logs (por ejemplo /var/lib/cassandra y /var/log/cassandra), y se adaptan los ficheros de configuración (que están en /usr/local/apache-cassandra-0.8.2/conf). El que más nos interesa ahora es cassandra.yaml. En github hay un ejemplo de configuración de este fichero, aunque sea casi por defecto, de cada uno de los tres nodos que he usado en esta prueba.

Hay un par de opciones de configuración que pueden servir para comprender la estructura del sistema. Un cluster de Cassandra se entiende como un anillo en el que cada uno de los nodos gestiona un volumen determinado de datos (replicación aparte). Básicamente estamos hablando de una serie de claves (hashes) que se distribuyen de una forma más o menos equilibrada entre todas las máquinas. Aunque no sea estrictamente necesario, como para esta prueba he usado un número fijo de nodos (3), he asignado ya desde el comienzo un 33% de datos a cada uno. Por supuesto, en un entorno real en el que se añaden y quitan nodos dinámicamente la gestión es diferente. La configuración en mi laboratorio es la siguiente:

# Nodo 1
initial_token: 0
# Nodo 2
initial_token: 56713727820156410577229101238628035242
# Nodo 3
initial_token: 113427455640312821154458202477256070485

La otra opción que merece la pena comentar es “seed_provider”. Cassandra usa un protocolo tipo Gossip para distribuir la información entre los nodos. Esto significa que, cuando se añade un nuevo miembro al cluster, es suficiente con indicarle un servidor activo del mismo. El protocolo se encarga de propagar esta nueva información en todos los nodos. Por lo tanto, la configuración se limita a especificar una (o varias) IPs activas:

seed_provider:
    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
      parameters:
          - seeds: 192.168.10.145

Hay mucha más opciones de configuración, por supuesto, pero lo dejamos aquí.

En este momento ya se podría ejecutar Cassandra, pero esperaremos a instalar Solandra.

Solandra
Hablemos antes de lo que es Solr, una vez más, muy muy por encima.

Solr es una plataforma de búsqueda construida sobre la librería Lucene. Simplificando mucho, y en el contexto de este post, ofrece un interfaz XML (es el que nos interesa aquí, pero no el único) para añadir documentos, y un API HTTP a través de cual recibir resultados en formato JSON (entre otros).

Todo se verá más claro cuando creemos el schema para la gestión de logs. Para el que quiera profundizar más en Solr, aquí tiene un libro.

Volvamos a Solandra. La instalación es sencilla. Una vez descargado el tar.gz desde github, y con ant y los binarios de java en el path, ejecutamos lo siguiente en cada uno de los nodos:

# El nombre del fichero cambia
cd /usr/local && tar xvzf /tmp/tjake-Solandra-4f3eda9.tar.gz 
cd tjake-Solandra-4f3eda9/
ant -Dcassandra=/usr/local/apache-cassandra-0.8.2 cassandra-dist

Si todo va bien, en unos minutos la salida estándar mostrará lo siguiente:

....
cassandra-dist:
     [copy] Copying 36 files to /usr/local/apache-cassandra/lib
     [copy] Copying 8 files to /usr/local/apache-cassandra/conf
     [copy] Copying 1 file to /usr/local/apache-cassandra/bin
     [echo] Libraries successfully copied into cassandra distribution
     [echo] Start the cassandra server with /usr/local/apache-cassandra/bin/solandra command

BUILD SUCCESSFUL
Total time: 2 minutes 43 seconds

Durante la instalación deberían haberse copiado las librerías y scripts en el arbol de Cassandra. Incluyendo algunos ficheros de configuración, que en este caso dejamos por defecto, aunque lo normal sería que los adaptásemos.

Sin más, vamos a arrancar Solandra (y con ello Cassandra), en cada nodo. Como no hemos hecho ningún cambio en el logueo, vamos a ver todos los mensajes por la salida estándar:

cd /usr/local/apache-cassandra-0.8.2/bin/
./solandra &

Y con esto ya debería estar todo listo. El siguiente paso es añadir el schema (parecido a como se haría en un Solr estándar), volcar los datos, y preparar el interfaz web. Pero esto es cosa de un tercer post. No pensaba escribirlo, pero bueno, este ya es demasiado largo.

Aug 022011
 

Allá por el 2008, Rackspace(Mailtrust) publicaba algunos datos sobre la forma en la que había ido evolucionado su sistema de gestión de logs de la infraestructura de correo electrónico, y que por aquel entonces ya superaba holgadamente los 100GB de crecimiento diario. Junto a este documento, en una de las referencias bibliográficas sobre Hadoop, esta misma empresa explicaba, con algo de detalle técnico, la forma en la que habían implementado su sistema, basado sobre todo en Hadoop y Lucene+Solr.

Aunque Hadoop siga siendo una solución magnífica para la gestión de logs en el contexto del Software Libre (siempre hablando de volúmenes de datos realmente muy grandes), en esta serie de posts vamos a ver cómo podemos llevar la idea de Rackspace a la práctica usando otro tipo de tecnología, y más concretamente, Cassandra.

En realidad, como mis tres lectores no quieren posts demasiado largos, en lugar de entrar en los detalles de lo que sería una implementación más o menos “casera”, vamos a usar una de las aplicaciones que Datastax (una empresa que da servicios comerciales para Cassandra) está potenciando como parte de su ecosistema alrededor de Cassandra, y que se llama Solandra.

El problema
Repasemos, simplificando un poco, la evolución de Rackspace:

  1. En una primera fase, los logs se almacenan en máquinas individuales. Cuando hay alguna incidencia, algún técnico tiene que entrar a hacer un grep. Si el negocio va bien, llegará un momento en el que el tiempo perdido haciendo estas búsquedas será, por lo menos, “crispante”.
  2. En la segunda fase, los logs pasan a gestionarse a través de un syslog centralizado. En realidad, esta no es más que una versión algo mejorada de la primera evolución, pero al menos facilita el trabajo. En cualquier caso, en el fondo se sigue perdiendo mucho tiempo en la búsqueda manual en logs.
  3. La solución más natural en este punto es volcar los datos a una base de datos, y con ello a algún tipo de interfaz web. Dejaremos a un lado el desarrollo del frontend y de los scripts que cargan los datos en la bbdd (que pueden no ser en absoluto triviales, en función de la complejidad de la plataforma).

Hasta aquí, vale, todo es razonablemente sencillo. Sin embargo, cuando se gestionan digamos que 25 millones de mensajes al día, y cuando se quieren mantener 2 años de información (es un decir), nos encontramos con un problema.

¿Cómo se soluciona?

Aquí ya cada uno toma decisiones en función de su capacidad, su presupuesto, los perfiles que tiene disponibles, …. En algunos casos, mantener 30 días en base de datos (lo que genera la mayoría de incidentes), puede ser suficiente. En otros casos, se trabaja con los mecanismos que ofrecen las bases de datos para escalar (el framework Gizzard de Twitter es un estupendo ejemplo, aunque no hablemos de logs). Y por último, algunos pasan a otras soluciones, ya sean de pago o libres. En el caso de Rackspace, por ejemplo, su opción fue Hadoop y Lucene+Solr.

Una vez más, cada una de estas opciones puede ser “la mejor” en función del entorno en el que se desarrolle. Pero claro, si quiero seguir con este post tengo que optar por la tercera alternativa, obviamente :) .

Vamos por partes:

  1. Queremos almacenar un volumen de datos muy significativo. Para unos pocos GB todo esto no tiene demasiado sentido.
  2. Queremos que lo único necesario para aumentar la capacidad sea añadir nuevo hardware. Nada más. Ni cambios en la programación, ni cambios en la arquitectura.
  3. Queremos poder hacer consultas complejas sobre estos logs, en base a origen, destino, rangos de fechas, …. Por ejemplo, sería estupendo poder consultar todo el spam enviado a cuentas tipo info@ en toda la plataforma en un periodo de tiempo concreto.
  4. Aunque el tiempo real no es un requerimiento, es preferible poder hacer estas consultas y recibir los resultados al momento, en una misma sesión de navegador.

Para conseguir los puntos 1 y 2 Hadoop es una solución estupenda. Para 3 y 4 es necesario más trabajo. El acceso “casi inmediato” a los datos se conseguiría con alternativas como HBase, tan de moda ahora que Facebook ha empezado a usarlo. Además, siempre disponemos de Pig y Hive para simplificar las consultas. Ahora bien, de una u otra manera, con Hadoop es bastante probable que tengamos que programar bastante código.

La otra alternativa viene de la mano de Cassandra. Una vez más, los puntos 1 y 2 son inmediatos con esta tecnología. Al igual que con Hadoop, 3 y 4 no lo son; pero gracias a la aplicación llamada Solandra, que no deja de ser un Solr que guarda sus índices en Cassandra, podemos conseguir la capacidad de búsqueda de Lucene, el interfaz tipo REST que ofrece Solr, y la escalabilidad de Cassandra. Todo en uno.

El post se ha alargado un poco. Dejamos la parte práctica para el segundo (y último) mensaje de esta serie.

May 302010
 

Antes de que me adelanten, y sobre todo por las durísimas presiones por parte de las tres personas que leen este blog :-) , voy a terminar esta serie de posts, hablando sobre una de las herramientas que se han implementado para dar un mayor nivel de abstracción (o facilidad de uso, como se prefiera decir) a Hadoop.

Hadoop tiene un problema, o al menos un argumento que se usa en su contra: Desarrollar una aplicación MapReduce lleva demasiado tiempo. A pesar de que el Streaming del que hablamos en posts anteriores lo solucione en parte, es cierto que si queremos trabajar de forma “nativa” con el framework tenemos que estar preparados para escribir un buen puñado de líneas de código.

Para dar salida este problema han ido apareciendo distintas alternativas, entre las que destacan Hive (by Facebook) y Pig (by Yahoo). En este post me voy a centrar en esta última, aunque esto no significa, ni mucho menos, que sea mejor que Hive.

Gracias a Pig se pueden escribir “en 4 lineas” aplicaciones MapReduce. Dicho de otra forma, permite que nos centremos en el qué queremos hacer con nuestros datos, y no en el cómo lo hacemos. Se basa en dos componentes:

  • Un sencillo lenguaje para escribir los scripts, llamado “pig latin”.
  • El “”compilador”", que trasforma los scripts en aplicaciones MapReduce para su ejecución en el cluster Hadoop.

Junto a esto, Pig tambén ofrece un shell interactivo (se llama grunt), para poder ejecutar comandos.

Bien, terminada la teoría (fácil, ¿no?), vamos a centrarnos en los datos, tal y como hemos dicho antes. (Para el que no haya leido los posts anteriores, este sería un buen momento).

Hasta ahora hemos estado preparando nuestra plataforma de logueo para poder disponer de años, y quizá terabytes, de logs en un mismo lugar, barato y redundado, del que poder sacar información útil en unos pocos minutos. Recordemos:

  1. Los logs de todos nuestros servidores se consolidan en una/varias máquinas syslog (o similar).
  2. Los logs se copian tal cual en Hadoop. A pesar de que vayan a estar replicados, no es mala idea tener una copia extra en cinta.
  3. Ejecutamos un primer script MapReduce que lee los scripts y los adapta al formato que queremos y sobre el que trabajaremos.
  4. Ya no necesitamos los ficheros originales, los podemos borrar de Hadoop. En caso de desastre, siempre podríamos recurrir a las cintas.

¿Cómo guardábamos los logs?

El resultado del reducer.pl son líneas como esta:

010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com mail.example.org[10.0.2.34],15853,1268223532,5A4FE29B3B6,43A9B28B357,Mar 10 12:18:52 destinovarios@example.com destinovarios@example.com Antispam sent (250 OK sent queued as 43A9B28B357

Recordad que el script guarda mucha información formateada en base64, por lo que en realidad tenemos algo similar a esto:

2010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com texto_en_base64

¿Por qué?

Porque todas las consultas que se suelen hacer son del tipo:

  • No he recibido un mensaje enviado entre las x y las z del martes, enviado desde esta cuenta, y con destino a esta otra.
  • Quiero todos los mensajes que se recibieron en tal buzón.

La estructura de logs que hemos preparado va a permitir que los scripts sean muy sencillos, sin ningún tipo de join o similar (ahora lo vemos). Dejaremos al interfaz web desde el que se van a ver los resultados el peso de formatear las entradas y hacerlas “visuales”.

¿Interfaz web?

Nada mejor que un sencillo interfaz web en el que escribir los parámetros de búsqueda, preparar el script .pig, ejecutarlo, y mostrar los resultados una vez termine. Evidentemente, no voy a escribir aquí el formulario, pero por dar un ejemplo, imaginemos que queremos saber los correos que han llegado desde infojobs entre las 8 a.m. del 11 de marzo, y las 12 a.m. del mismo día.

Empezamos pasando las fechas a timestamp. Muy fácil:

$ date -u -d 'Thu Mar 11 08:00:00 2010' '+%s'
1268294400

$ date -u -d 'Thu Mar 11 12:00:00 2010' '+%s'
1268308800

Con esto y con la dirección origen ya tenemos todo lo necesario. Ahora lo normal sería escribir un fichero .pig, pero aquí vamos a usar el intérprete de comandos grunt para ver los resultados paso a paso:

$ pig
10/05/25 21:57:29 INFO pig.Main: Logging error messages to: /usr/local/pig-0.7.0/pig_1274817449369.log
2010-05-25 21:57:29,688 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://fn140
2010-05-25 21:57:30,149 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: fn140:8021
grunt>

Empezamos cargando los datos:

grunt> REGISTROS = LOAD '/user/hadoop/logs_formateados' USING PigStorage('\t') AS (msgid:chararray, fecha:long, origen:chararray, destinoreal:chararray, destinoentrada:chararray, resto:chararray);

Con este comando hemos cargado los datos desde el directorio /user/hadoop/logs_formateados en la variable REGISTROS. Los ficheros de entrada se separan con un tabulador [1], con el schema que hemos venido usando. Veamos, antes de seguir, si Pig ha entendido lo que realmente queremos:

grunt> ILLUSTRATE REGISTROS
2010-05-25 23:58:36,627 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://fn140
2010-05-25 23:58:36,681 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: fn140:8021
2010-05-25 23:58:36,791 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2010-05-25 23:58:36,792 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| REGISTROS     | msgid: chararray                                                    | fecha: long | origen: chararray   | destinoreal: chararray | destinoentrada: chararray | resto: chararray |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|               | 6B160A8CF29E064XA067E116FA2A458CAD8A66@GLOBL0310V01.exampleserv.com | 1268304751  | pr1user@example.com | mga@example.net        | mga@example.net           | texto_base64     |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Ahora que tenemos todos los registros, vamos a filtrarlos en base a los criterios que hemos definido:

grunt> FILTRADOS = FILTER REGISTROS BY (origen matches '.+infojobs.+') AND (fecha > 1268294400) AND (fecha < 1268301600);

Hecho esto sólo nos queda agruparlos por msgid. Recordad, tenemos dos entradas por mensaje: una para el nivel antispam, y otra para la entrega. En realidad, en base al msgid podríamos seguir un mensaje por tantos saltos como tuviéramos (algo parecido a lo que Mailtrust ha documentado que hace):

grunt> AGRUPADOS = GROUP FILTRADOS BY msgid;

Y con esto ya tenemos el resultado. Lo podríamos confirmar con el comando DUMP, que muestra los datos por salida estándar, pero lo vamos a guardar directamente en un fichero:

grunt> STORE AGRUPADOS INTO '/tmp/fichero_resultados.txt' USING PigStorage();
2010-05-26 22:17:03,942 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_LONG 2 time(s).
2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.impl.logicalLayer.optimizer.PruneColumns - No column pruned for REGISTROS
2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.impl.logicalLayer.optimizer.PruneColumns - No map keys pruned for REGISTROS
2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.ResourceSchema - Insert two-level access to Resource Schema
2010-05-26 22:17:03,983 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_LONG 2 time(s).
2010-05-26 22:17:03,985 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: Store(hdfs://fn140/tmp/fichero_resultados.txt:PigStorage) - 1-1007 Operator Key: 1-1007)
2010-05-26 22:17:03,986 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2010-05-26 22:17:03,986 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2010-05-26 22:17:04,019 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
2010-05-26 22:17:05,180 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
2010-05-26 22:17:05,188 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
2010-05-26 22:17:05,191 [Thread-120] WARN  org.apache.hadoop.mapred.JobClient - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2010-05-26 22:17:05,545 [Thread-120] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2010-05-26 22:17:05,546 [Thread-120] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
2010-05-26 22:17:06,382 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201005262134_0008
2010-05-26 22:17:06,382 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://fn140:50030/jobdetails.jsp?jobid=job_201005262134_0008
2010-05-26 22:17:06,384 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
2010-05-26 22:17:19,214 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 10% complete
2010-05-26 22:17:20,719 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 20% complete
...............
2010-05-26 22:17:28,241 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 60% complete
2010-05-26 22:17:41,788 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2010-05-26 22:17:41,788 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Successfully stored result in: "hdfs://fn140/tmp/fichero_resultados.txt"
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Records written : 72
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Bytes written : 44727
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Spillable Memory Manager spill count : 0
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Proactive spill count : 0
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

Vaya, parece que tenemos 72 usuarios suscritos a infojobs. Veamos un ejemplo de lo que se ha escrito en "/tmp/fichero_resultados.txt" (es un path de HDFS, no el /tmp local). Los saltos de línea los he puesto yo para que sea más visual, y he escrito la versión sin codificar en base64:

1268304853778.112.POCFQELSVS@[hellboy07]
{
    (1268304853778.112.POCFQELSVS@[hellboy07],1268308417,aviso_incripciones@push.infojobs.net,usuario@example.com,usuario@example.com,localhost[127.0.0.1],11854,1268308418,9ABC12FACB4,-1,Mar 11 11:53:38 usuario@example.com usuario@example.com Maildrop sent (delivered via maildrop service))
,
    (1268304853778.112.POCFQELSVS@[hellboy07],1268308417,aviso_incripciones@push.infojobs.net,usuario@example.com,usuario@example.com,push3.infojobs.net[79.171.25.71],11107,1268308418,3B0213FAECD,9ABC12FACB4,Mar 11 11:53:38 usuario@example.com usuario@example.com Antispam sent (250 OK queued as 9ABC12FACB4)
}

Ahora sólo necesitamos que nuestro interfaz web formatee el resultado y lo saque por pantalla. Fácil y sencillo.

Resumiendo, con estas cuatro líneas podemos hacer cualquier búsqueda en los logs, tengan el tamaño que tengan:

REGISTROS = LOAD '/user/hadoop/logs_formateados' USING PigStorage('\t') AS (msgid:chararray, fecha:long, origen:chararray, destinoreal:chararray, destinoentrada:chararray, resto:chararray);
FILTRADOS = FILTER REGISTROS BY (origen matches '.+infojobs.+') AND (fecha > 1268294400) AND (fecha < 1268301600);
AGRUPADOS = GROUP FILTRADOS BY msgid;
STORE AGRUPADOS INTO '/tmp/fichero_resultados.txt' USING PigStorage();

Referencias

Hay muchos artículos y documentación disponible en la red sobre Hadoop. En cuanto a bibliografía, que yo sepa hay al menos tres libros disponibles:

  • Pro Hadoop, de la editorial Apress.
  • Hadoop: The Definitive Guide, de la editorial O'Reilly.
  • Hadoop in Action, de la editorial Manning.

[1]: La verdad es que he hecho trampa. El reducer original separaba el id de mensaje del resto de datos con un tabulador, pero luego usaba un espacio simple. Esto es porque al principio tenía pensado trabajar otros aspectos de Hadoop y Pig, como el Chaining y las "user defined functions".
Al final, para no tener que escribir más posts, he cambiado el reducer para que separe todos los campos con tabuladores.

Mar 212010
 

En este tercer post de la serie dejamos la teoría para ver un caso práctico de aplicación de Hadoop, que completaré en un cuarto post.

Como sabemos, gracias a Hadoop conseguimos un sistema con mucha capacidad de almacenamiento, a un coste por mega reducido, redundado, de rendimiento prácticamente lineal (más máquinas = más rendimiento) y sobre el que ejecutar de forma pararela todo tipo de aplicaciones que se puedan implementar como MapReduce.

Con estos datos, hay una utilidad práctica que viene inmediatamente a la mente: tratamiento de logs. Veamos:

  • Me gustaría decir lo contrario, pero personalmente no conozco a nadie que maneje el dinero destinado a proyectos que no ponga pegas a una inversión fuerte para el tratamiento de logs. El coste por mega de Hadoop es difícil de superar.
  • Si combinamos Hadoop, por ejemplo para datos con hasta un año de antiguedad, con un soporte en cinta para el resto, lo tendremos fácil para cumplir cualquier legislación en lo relacionado a retención de datos.
  • Sin llegar a los niveles de Facebook o Rackspace, es probable que el volumen de logs que guardemos durante un año sea lo suficientemente grande como para hacer difícil su tratamiento si un juez (por ejemplo) pide todos los correos desde la cuenta x a la cuenta z de los últimos siete meses, o todos los acceso al foro web de fulanito. MapReduce y Hadoop permiten ejecutar este tipo de tareas sin mayores problemas, y con un rendimiento prácticamente lineal.

En este entorno, planteo un ejemplo de tratamiento de logs de Postfix de lo más simple, con un filtro de contenido en modo post-queue (Amavis, por ejemplo, en su uso más habitual). Vamos, nada nuevo.

El objetivo sería, además de los puntos mencionados anteriormente, poder realizar cualquier tipo de consulta sobre los logs de meses (o años), en un tiempo razonable. Y digo tiempo razonable porque en este tipo de consultas no suele hacer falta tener el resultado al momento. Se acepta esperar durante 1,5 o 10 minutos hasta que se genere el resultado.

Resumiendo, tenemos una infraestructura parecida a esta:

Postfix Amavis con Hadoop

Todos los servidores envían los logs de manera centralizada a un syslog remoto, que además de hacer una copia diaria a cinta, almacena la información en Hadoop, usando alguno de los muchos interfaces que se ofrecen, como pueden ser FUSE o FTP.

Junto a esto tendremos una máquina, preferiblemente un interfaz web, para generar trabajos MapReduce. Básicamente necesitaremos un formulario (cuenta origen, destino, fechas, …), y algo de control sobre los trabajos que ya se han realizado para mostrar los resultados.

Streaming

Se pueden crear trabajos MapReduce para Hadoop de varias formas, pero en este ejemplo me voy a limitar a lo que se llama “Streaming”. Dicho de otra forma (aunque no sea del todo cierto), a programar aplicaciones MapReduce en perl o python (o en cualquier lenguaje que pueda leer y escribir en la entrada y salida estándar).

La gran ventaja que tiene el Streaming es que permite desarrollar prototipos de una forma rápida, aunque no sea lo más eficiente. En cualquier caso, sobra para este ejemplo.

La mejor forma de explicar lo que es el Streaming en Hadoop es ver el equivalente, en versión linux:

cat datos_entrada.txt | mapper.pl | sort | reducer.pl

El mapper.pl recibirá los datos desde los ficheros de log, línea por línea, y los tratará, escribiendo por salida estándar con el formato clave “tabulador” valor.

Esta salida se ordena, y pasa a un reducer. Hadoop garantiza que todas las claves serán tratadas por el mismo reducer, aunque éste puede, y normalmente así lo hará, procesar varias claves (este es un buen momento para recordar que los ficheros de entrada pueden ser, por decir algo, 200GB de datos, sin que suponga ningún problema para Hadoop).

Volviendo al ejemplo, veamos unas cuantas líneas de log de postfix:

Feb 20 14:27:03 server postfix/smtpd[17875]: E78D13073B2: client=mail.example.com[10.0.0.6]
Feb 20 14:27:03 server postfix/cleanup[17822]: E78D13073B2: message-id=<F0E59D47X9B343A7DFRAB1F0E45@EUW0600375>
Feb 20 14:27:04 server postfix/qmgr[5370]: E78D13073B2: from=<prueba@example.com>, size=20827, nrcpt=1 (queue active)
Feb 20 14:27:04 server postfix/smtp[17980]: E78D13073B2: to=<destino@example.org>, relay=127.0.0.1[127.0.0.1]:10025, delay=0.6, delays=0.08/0/0/0.51, dsn=2.0.0, status=sent (250 Ok: queued as 6A737307316)
Feb 20 14:27:04 server postfix/qmgr[5370]: E78D13073B2: removed


Feb 20 14:27:04 server postfix/smtpd[17882]: 6A737307316: client=localhost[127.0.0.1]
Feb 20 14:27:04 server postfix/cleanup[17817]: 6A737307316: message-id=<F0E59D47X9B343A7DFRAB1F0E45@EUW0600375>
Feb 20 14:27:04 server postfix/qmgr[5370]: 6A737307316: from=<prueba@example.com>, size=21719, nrcpt=1 (queue active)
Feb 20 14:27:04 server postfix/pipe[16491]: 6A737307316: to=<destino@example.org>, relay=maildrop, delay=0.16, delays=0.11/0.01/0/0.04, dsn=2.0.0, status=sent (delivered via maildrop service)
Feb 20 14:27:04 server postfix/qmgr[5370]: 6A737307316: removed

Esta es la información que postfix loguea al recibir y entregar un mensaje usando un filtro after-queue. He ordenado los logs para que sea más claro, pero en la realidad no es así.

En este ejemplo se recibe un correo desde mail.example.com, y se le asigna el queue-id E78D13073B2. También se loguea el message-id, el origen y el tamaño.

Una vez se ha encolado el mensaje, se envía a Amavis, a través de 127.0.0.1:10025. Como se puede ver en el ejemplo, en este punto sabemos el destino del mensaje, y también si fue aceptado o no. Lo sabemos porque el log hace referencia al queue id que amavis reinjecta en postfix. De no haberlo aceptado no tendríamos un queue id en el log.

Este nuevo mensaje mantiene el message-id, el tamaño es algo mayor, y termina entregandose, en este caso en la máquina local, usando la aplicación courier-maildrop.

Bien, esto no es más que un ejemplo. En la realidad hay que tratar muchos casos: aliases, reenvíos a máquinas remotas, bounces, mensajes que pasan a deferred por no haber podido entregarse, ….

Además, e insistiendo en que esto no es más que un ejemplo, voy a asumir dos cosas, que aunque se cumplen a menudo, no son ciertas:

  • Que el queue-id es único.
  • Que el message-id es único.

Mapper.pl

El proceso comienza con Hadoop enviando los ficheros de entrada (logs) a la entrada estándar de los mapper.pl que estemos ejecutando en el cluster, que tratarán cada línea y la enviarán por salida estándar con el formato:

queueid<tabulador>informacion_separada_por_comas

Para separar la clave del valor se usa un tabulador. Normalmente, los campos del valor se separan con espacios, pero en este ejemplo he preferido las “comas”.

Veamos un trozo del código:

..............
while (<>)
..............
if ($linea =~ /^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) .+ postfix\/smtpd\[\d+\]: (\w+): client=(\S+)$/) {
                $estructura{fecha} = $1; $estructura{qid} = $2; $estructura{host} = $3;
                print $estructura{qid} . "\t" . "Conexion," . join (",",$estructura{fecha},$estructura{host}) . "\n";
        }
         elsif ($linea =~ /^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) .+ postfix\/smtp\[\d+\]: (\w+): to\=\<([^ ]+)\>, relay=127.0.0.1.*status=(.*) (\w+)\)$/) {

                $estructura{fecha} = $1; $estructura{qid} = $2; $estructura{to} = $3; $estructura{origto} = $3; $estructura{status} = $4; $estructura{relay} = "Antispam"; $estructura{rqid} = $5; $estructura{status} =~ s/,//g;
                print $estructura{qid} . "\t" . "Antispam," . join (",",$estructura{fecha},$estructura{to},$estructura{to},$estructura{relay},$estructura{status},$estructura{rqid}) . "\n";
        }
...........
}

Tenemos varios “if/elsif”, que adaptan la línea del log de entrada al formato que quiero. Además, he añadido un texto con el tipo de entrada de log que estamos tratando, y que sabemos por el “if” en el que estamos (Antispam en este ejemplo), para luego usarlo en el reducer.

El código completo incluye también la detección de los mensajes que Amavis considera que son spam, en los que pone el rqid como 0. Además, en aquellos casos donde “no proceda” aplicar el concepto de rqid pongo -1 como valor. Todo esto servirá en el reducer.

Volviendo a los logs de ejemplo, el resultado de ejecutar el mapper sobre ellos es:

E78D13073B2     Conexion,Feb 20 14:27:03,mail.example.com[10.0.0.6]
E78D13073B2     Msgid,Jan 20 06:27:03,F0E59D47X9B343A7DFRAB1F0E45@EUW0600375
E78D13073B2     Origen,Feb 20 14:27:04,prueba@example.com,20827
E78D13073B2     Antispam,Feb 20 14:27:04,destino@example.org,destino@example.org,Antispam,sent (250 Ok: queued as,6A737307316)
E78D13073B2     Cierre,Feb 20 14:27:04
6A737307316     Conexion,Feb 20 14:27:04,localhost[127.0.0.1]
6A737307316     Msgid,Feb 20 14:27:04,F0E59D47X9B343A7DFRAB1F0E45@EUW0600375
6A737307316     Origen,Feb 20 14:27:04,prueba@example.com,21719
6A737307316     Maildrop,Feb 20 14:27:04,destino@example.org,destino@example.org,Maildrop,sent (delivered via maildrop service),-1
6A737307316     Cierre,Feb 20 14:27:04

Después de esto Hadoop ejecutaría un sort (en el ejemplo ya están ordenados por queue-id), y volvería a enviar el resultado a los reducer.pl del cluster. Por lo tanto, sabemos que un reducer va a recibir todas las líneas de log correspondientes a un mismo queue-id.

Reducer.pl

Pasamos directamente a ver algo del código del reducer.pl:

my %mensaje = (qid => "",
            fecha => "",
            servidor => "",
            msgid => "",
            origen => "",
            size => "",
            to => "",
            origto => "",
            rqid => "",
            relay => "",
            status => "",
            cierre => ""
            );

..............
while () {
    chomp;
    ($qid, $valor) = split /\t/;

        if ($qid ne $qidanterior) {
                # tenemos un queueid nuevo
                if ($qidanterior ne "") {
                        &mostrar_datos();

                        %mensaje = (qid => "",
                                fecha => "",
                                desdeip => "",
                                msgid => "",
                                origen => "",
                                size => "",
                                to => "",
                                origto => "",
                                rqid => "",
                                status => "",
                                cierre => ""
                        );
                }
                $qidanterior = $qid;
        }

        # seguimos procesando los datos del mismo qid
        $mensaje{qid} = $qid;
        @datos = split(",",$valor);

        switch ($datos[0]) {
                case "Conexion" {
                        $mensaje{fecha} = $datos[1];
                        $mensaje{desdeip} = $datos[2];
                }
                case "Msgid" {
                        $mensaje{msgid} = $datos[2];
                }
                case "Origen" {
                        $mensaje{origen} = $datos[2];
                        $mensaje{size} = $datos[3];
                }
                case "Antispam" {
                        $mensaje{to} .= $datos[2] . "-separador-";
                        $mensaje{origto} .= $datos[3] . "-separador-";
                        $mensaje{status} .= $datos[1] . " " .$datos[2] . " " . $datos[3] . " " . $datos[4] . " " . $datos[5] . "-separador-";
                        $mensaje{rqid} = $datos[6];
                }

................
sub mostrar_datos {
     ................
     if ( ($mensaje{fecha} ne "") && ($mensaje{cierre} ne "") ) {
          # mensaje completo: Salida estandar
          # hay que revisar que se haya pasado por todas las fases
          # y realizar comprobaciones, que aqui no se hacen
          $fechaentrada = `date -u -d '$mensaje{fecha}' '+%s'`; chomp $fechaentrada;
          $fechasalida = `date -u -d '$mensaje{cierre}' '+%s'`; chomp $fechasalida;

          $datoscodificables = $mensaje{desdeip} .",".$mensaje{size}.",".$fechasalida.",".$mensaje{qid}.",".$mensaje{rqid}.",".$mensaje{status};
          $resto = encode_base64($datoscodificables,"");

          print $mensaje{msgid} . "\t" . $fechaentrada . " " . $mensaje{origen} . " " . $mensaje{to} . " " . $mensaje{origto} . $resto . "\n";

    } else {
          # mensaje incompleto. Guardar en otro fichero para poder seguir el tracking
          # Falta por hacer
    }

}

Por lo tanto, cogemos cada línea de entrada, y ayudándonos de la descripción del tipo de línea que hemos añadido en el mapper, vamos completando la estructura de datos que después será el resultado (por fin) que queremos.

Un ejemplo de la ejecución de este trabajo en mi cluster de prueba:

$ hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-0.20.2-streaming.jar -input /user/hadoop/mail_logs -output /user/hadoop/logs_formateados/domingo_14_03 -file /home/hadoop/mapper.pl -file /home/hadoop/reducer.pl -mapper "/usr/bin/perl -w mapper.pl" -reducer "/usr/bin/perl -w reducer.pl"

packageJobJar: [/home/hadoop/mapper.pl, /home/hadoop/reducer.pl, /home/hadoop/hadoop-unjar4902963819550178151/] [] /tmp/streamjob534817239215339193.jar tmpDir=null
10/03/18 22:20:02 INFO mapred.FileInputFormat: Total input paths to process : 4
10/03/18 22:20:02 INFO streaming.StreamJob: getLocalDirs(): [/home/hadoop/mapred/local]
10/03/18 22:20:02 INFO streaming.StreamJob: Running job: job_201003182146_0001
10/03/18 22:20:02 INFO streaming.StreamJob: To kill this job, run:
10/03/18 22:20:02 INFO streaming.StreamJob: /usr/local/hadoop/bin/hadoop job  -Dmapred.job.tracker=fn140:8021 -kill job_201003182146_0001
10/03/18 22:20:02 INFO streaming.StreamJob: Tracking URL: http://fn140.forondarena.net:50030/jobdetails.jsp?jobid=job_201003182146_0001
10/03/18 22:20:03 INFO streaming.StreamJob:  map 0%  reduce 0%
10/03/18 22:20:17 INFO streaming.StreamJob:  map 8%  reduce 0%
10/03/18 22:20:18 INFO streaming.StreamJob:  map 11%  reduce 0%
10/03/18 22:20:20 INFO streaming.StreamJob:  map 15%  reduce 0%
.....
10/03/18 22:21:56 INFO streaming.StreamJob:  map 92%  reduce 25%
10/03/18 22:21:59 INFO streaming.StreamJob:  map 100%  reduce 25%
....
10/03/18 23:14:13 INFO streaming.StreamJob:  map 100%  reduce 98%
10/03/18 23:15:55 INFO streaming.StreamJob:  map 100%  reduce 99%
10/03/18 23:17:31 INFO streaming.StreamJob:  map 100%  reduce 100%
10/03/18 23:18:25 INFO streaming.StreamJob: Job complete: job_201003182146_0001
10/03/18 23:18:25 INFO streaming.StreamJob: Output: /user/hadoop/logs_formateados/domingo_14_03

El resultado es un fichero en la ruta “/user/hadoop/logs_formateados/domingo_14_03″, con líneas formadas por un message-id, seguido por un tabulador, seguido del origen, el destino, el destino antes de la expansión de aliases, y del resto de la información, codificada en base64, todo separado por espacios.

No hace falta decir que el formato de salida en un entorno real se debería adaptar a lo que se necesitase. De hecho, Rackspace usa una estructura binaria, donde entre otras cosas se guarda la información de los diferentes saltos que hace un correo dentro de su sistema. Este ejemplo, por supuesto, es mucho más sencillo.

Al final del proceso hemos obtenido un fichero, que es el que de verdad queremos en nuestro cluster, con toda la información de cada mensaje que ha entrado en nuestra red en un periodo de tiempo concreto (digamos que cada 4 horas volcamos los logs de syslog a Hadoop y ejecutamos la tarea), como se ve en los siguientes ejemplos (sin la codificación en base64, para que sean más fáciles de seguir):

Mensaje que se entrega en un buzón local:

ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org 1268225082 origen@example.org destino@example.com destino@example.com mail.example.org[10.0.2.34],2430,1268225085,B6A6A296E55,158AB296F24,Mar 10 12:44:45 destino@example.com destino@example.com Antispam sent (250 OK sent queued as 158AB296F24

ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org 1268225085 origen@example.org destino@example.com destino@example.com localhost[127.0.0.1],3081,1268225085,158AB296F24,-1,Mar 10 12:44:45 destino@example.com destino@example.com Maildrop sent (delivered via maildrop service)

Por lo tanto:

  • El mensaje con id ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org
  • Entró en el sistema a las 1268225082
  • Desde el servidor mail.example.org[10.0.2.34]
  • El origen era origen@example.org
  • Y el destino destino@example.com (no es un alias)
  • Una vez pasado por el sistema antispam se reencola con queueid 158AB296F24
  • El mensaje de la entrega al servidor antispam es: sent (250 OK sent queued as 158AB296F24)
  • Una vez ha pasado el filtro antispam el mensaje se reencola a las 1268225085
  • Por supuesto desde el servidor localhost[127.0.0.1]
  • Y el mensaje de la entrega local es Mar 10 12:44:45 Maildrop sent (delivered via maildrop service), sólo al destino destino@example.com

Mensaje que se entrega a un servidor remoto:

1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3 1268219047 origen@example.net destino@example.com destino@example.com mail.example.net[10.5.3.2],1901,1268219051,CA6782FABFB,B6C7A2FAACA,Mar 10 11:04:11 destino@example.com destino@example.com Antispam sent (250 OK sent queued as B6C7A2FAACA

1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3 1268219049 origen@example.net destinoreal@remoto.example.org destino@example.com localhost[127.0.0.1],2686,1268219051,B6C7A2FAACA,-1,Mar 10 11:04:11 destinoreal@remoto.example.org destino@example.com mail.example.org[10.0.2.34]:25 sent (250 2.0.0 Ok: queued as 423872AB862)
    En este caso tenemos un alias que se reescribe para entrega remota:

  • Su messageid es 1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3
  • Entro en el sistema a las 1268219047
  • Desde el servidor mail.example.net[10.5.3.2]
  • Desde el origen origen@example.net
  • Y al destino destino@example.com, que en la fase antispam todavía no se expande a ningún alias
  • El mensaje se relaciona con B6C7A2FAACA
  • Y el mensaje de la entrega es Mar 10 11:04:11 destino@example.com destino@example.com Antispam sent (250 OK sent queued as B6C7A2FAACA)
  • Se reencola, a través de localhost, a las 1268219049
  • El mensaje pasa a ser la dirección real destinoreal@remoto.example.org,
    proveniente del alias destino@example.com
  • El mensaje se entrega en mail.example.org[10.0.2.34]:25
  • Con el mensaje sent (250 2.0.0 Ok: queued as 423A72AB262)

Mensaje que es spam:

9545850218.UWD8KHA190@spammer.example.info 1268238332 spammer@example.net destino@example.com destino@example.com mail.example.info[10.8.2.12],2141,1268238333,F355429B3A5,0,Mar 10 16:25:33 destino@example.com destino@example.com Antispam sent (250 OK sent mensaje_rechazado por spam

En este caso el mensaje es spam, y no se relaciona con ningún otro queueid.

Mensaje que se entrega a varias cuentas:

2010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com mail.example.org[10.0.2.34],15853,1268223532,5A4FE29B3B6,43A9B28B357,Mar 10 12:18:52 destinovarios@example.com destinovarios@example.com Antispam sent (250 OK sent queued as 43A9B28B357

2010031011.AB263566.pc1@mailer.example.org 1268223532 usuario@example.org destinolocal@example.com-separador-destinoremoto@example.info destinovarios@example.com-separador-destinovarios@example.com localhost[127.0.0.1],16525,1268223532,43A9B28B357,-1,Mar 10 12:18:52 destinolocal@example.com destinovarios@example.com Maildrop sent (delivered via maildrop service)-separador-Mar 10 12:18:52 destinoremoto@example.info destinovarios@example.com mail.example.info[10.8.2.12]:25 sent (250 2.0.0 Ok: queued as 452362CA51A)
  • Ahora tenemos un mensaje con id 2010031011.AB263566.pc1@mailer.example.org
  • Que se envía desde usuario@example.org
  • Al destino destinovarios@example.com
  • Que a su vez es un alias a dos cuentas destinolocal@example.com y destinoremoto@example.info
  • Los mensajes de entrega a cada una de ellas son:
    • Mar 10 12:18:52 destinolocal@example.com Maildrop sent (delivered via maildrop service)
    • Mar 10 12:18:52 destinoremoto@example.info mail.example.info[10.8.2.12]:25 sent (250 2.0.0 Ok: queued as 452362CA51A)

Como se puede ver, ya tenemos toda la información necesaria, debidamente preformateada, por lo que ya no necesitamos los ficheros mail.log, que podríamos borrar de Hadoop (que no de las cintas).

La información contenida en estos ficheros nos servirá para preparar consultas, bien directamente con otros trabajos MapReduce, bien con otras herramientas, tal y como mostraré en el siguiente post de la serie, en el que nos aprovecharemos del formato de los ficheros para hacer consultas por message-id, cuenta origen, fecha o cuenta destino (según entró en el sistema o una vez expandidos los aliases).

En este punto hay que volver a recordar que Hadoop está pensado para trabajar con cantidades muy grandes de información, en los que hacer un simple “grep” o usar una base de datos relacional estándar resulta impracticable.

Por último, tened en cuenta que el código de este post, además de ser sólo un ejemplo, es completamente Alpha. Aparte de no funcionar del todo bien (tiene varios fallos que he detectado pero que no he corregido todavía), necesitaría tener en cuenta muchos más casos, como mensajes cuya entrega se retrasa durante días (que no deja de ser algo normal en los sistemas de correo reales).

En el cuarto y último post de la serie: acceso a Hadoop a través de una herramienta de apoyo, como pig.

Feb 052010
 

Continúo la serie de posts sobre Hadoop describiendo, muy por encima, los componentes y la estructura que suelen tener este tipo de sistemas. Como ya he dicho en otros posts, este blog pretende ser sobre todo práctico, así que no me voy a extender demasiado.

Gráficamente un cluster Hadoop se puede parecer mucho a esto:

Estructura

JobTracker:
En un cluster hay un único JobTracker. Su labor principal es la gestión de los TaskTrackers, entre los que distribuye los trabajos MapReduce que recibe. Es, por lo tanto, el interfaz principal que tienen los usuarios para acceder al cluster.

TaskTracker:
Un cluster tiene n TaskTrackers que se encargan de la ejecución de las tareas map y reduce en los nodos. Cada uno de los TaskTrackers gestiona la ejecución de estas tareas en una máquina del cluster. El JobTracker se encarga, a su vez, de controlarlos.

NameNode:
Las funciones principales del NameNode son el almacenamiento y la gestión de los metadatos del sistema de ficheros distribuido y ofrecer el interfaz principal que tiene el usuario para acceder al contenido HDFS. En un cluster hay un único proceso NameNode.
Sin los metadatos que mantiene el NameNode no se sabría en qué nodo está cada bloque, además de perderse la información sobre la estructura de directorios. Es, por lo tanto, el componente más importante del cluster, y debe estar redundado. Hadoop no ofrece de manera nativa ningún mecanismo de alta disponibilidad, pero sí dispone de herramientas que permiten replicar la información. Entre ellas está el NameNode secundario, que permite guardar una copia de los metadatos (en realidad hace más cosas) en una máquina diferente al NameNode. Eso sí, no es una copia en tiempo real, y no ofrece failover automático en caso de fallo del primario. Para esto necesitamos usar otro tipo de herramientas, entre las que destacan DRBD y Heartbeat.

DataNode:
Estos procesos ofrecen el servicio de almacenamiento de bloques para el sistema de ficheros distribuido. Son coordinados por el NameNode.

Estos son los procesos principales del sistema, pero hay otros, como el Balancer, que se encarga de equilibrar la distribución de los bloques entre los DataNodes, por ejemplo cuando se añade un nuevo servidor.

Mi laboratorio se limita a cuatro servidores: un NameNode+JobTracker y tres DataNode+TaskTracker, en cuatro máquinas virtuales de un core y 1G de memoria, para un total de 5G de almacenamiento.

hadoop@fn140:/usr/local/hadoop/conf$ hadoop dfsadmin -report
Configured Capacity: 8695013376 (8.1 GB)
Present Capacity: 5455364096 (5.08 GB)
DFS Remaining: 5455290368 (5.08 GB)
DFS Used: 73728 (72 KB)
DFS Used%: 0%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0

-------------------------------------------------
Datanodes available: 3 (3 total, 0 dead)

Name: 192.168.10.142:50010
Decommission Status : Normal
Configured Capacity: 2898337792 (2.7 GB)
DFS Used: 24576 (24 KB)
Non DFS Used: 1079918592 (1.01 GB)
DFS Remaining: 1818394624(1.69 GB)
DFS Used%: 0%
DFS Remaining%: 62.74%
Last contact: Sun Jan 31 16:10:58 CET 2010


Name: 192.168.10.143:50010
Decommission Status : Normal
Configured Capacity: 2898337792 (2.7 GB)
DFS Used: 24576 (24 KB)
Non DFS Used: 1080020992 (1.01 GB)
DFS Remaining: 1818292224(1.69 GB)
DFS Used%: 0%
DFS Remaining%: 62.74%
Last contact: Sun Jan 31 16:10:58 CET 2010


Name: 192.168.10.141:50010
Decommission Status : Normal
Configured Capacity: 2898337792 (2.7 GB)
DFS Used: 24576 (24 KB)
Non DFS Used: 1079709696 (1.01 GB)
DFS Remaining: 1818603520(1.69 GB)
DFS Used%: 0%
DFS Remaining%: 62.75%
Last contact: Sun Jan 31 16:10:56 CET 2010

En los próximos dos posts voy a dejar la “teoría” para pasar a la práctica, empezando por una de las aplicaciones más obvias de Hadoop, como es el tratamiento de logs; para seguir por una de las utilidades que existen para facilitar el desarrollo de aplicaciones MapReduce: pig.

Jan 172010
 

Allá por el año 2003, cuando Google ya dominaba el mundo de los buscadores, muchos administradores de sistemas nos preguntábamos por la tecnología que usarían para indexar páginas, calcular Pageranks, gestionar el almacenamiento ….

En ese momento Google publicó varios documentos al respecto, como este sobre MapReduce y este otro sobre su sistema de ficheros GFS (Google File System), que daban algunas pistas, y que a la larga han sido la base de varios proyectos, como Hadoop.

Pretendo que este blog sea sobre todo práctico, así que no voy a escribir demasiado ni sobre la teoría que hay detrás de MapReduce (programación funcional, map/reduce, bla bla bla) ni sobre Google y GFS. Para el que quiera leer algo más sobre la tecnología de Google recomiendo este excelente blog en el que se pueden encontrar varios posts sobre el buscador.

El problema
El problema es sencillo y se resume en el volumen de datos, cada vez mayor, que generan las aplicaciones; y que por supuesto hay que tratar. Por dar un ejemplo, en el 2008 Yahoo! gestionaba unos 5PB de almacenamiento para generar su “webmap”[1], que es uno de los componentes que usa su buscador.
Otro ejemplo es Rackspace, y su división de correo, Mailtrust, que genera más de 150GB de logs de correo al día. Por supuesto, además de ser mucho volumen acomulado, es fundamental que se pueda tratar de una forma ágil para dar una respuesta rápida a clientes que piden cosas como “el correo que me mandó fulanito hace 3 meses no me ha llegado” o “quiero todos los correos recibidos en mi dominio en los últimos 5 meses”. Esto es razonablemente sencillo cuando se controlan unos pocos cientos de GB, pero se hace mucho más difícil cuando la escala pasa al Tera.

El que haya nombrado estos dos ejemplos no es casual, ya que Yahoo! y Rackspace son dos de los usuarios más notables (junto a Facebook, LastFM, …) del software que voy a describir en los siguientes posts: Hadoop.

¿Qué es Hadoop?

Resumiendo mucho, Hadoop es un framework software que intenta implementar los conceptos de MapReduce y GFS que presentó Google. Está pensado, por lo tanto, para el tratamiento distribuido de grandes cantidades de datos en máquinas de bajo coste y con poca fiabilidad.

Partiendo de esta base, parece claro que hace falta un sistema de ficheros pensado para almacenar mucha información y con tolerancia ante fallos. Aquí es donde aparece HDFS (Hadoop Distributed File System). Algunas de sus características:

  • El tamaño de bloque por defecto en este sistema de ficheros es de 64MB, pero muchas veces se aumenta a 128MB.
  • Ofrece la mencionada disponibilidad al dejar varias copias de cada bloque en máquinas diferentes.
  • Usa el concepto de “rack awareness”, para saber dónde está cada bloque y qué proceso de cálculo puede acceder más rápido a él.
  • Su diseño facilita la lectura secuencial de datos (razonable en discos estándar de bajo coste), pero lo hace sacrificando otras características propias de los sistemas de ficheros POSIX. Por ejemplo, que nadie piense editar un fichero en HDFS y borrar la línea número 2000.

Por otro lado, el framework implementa el concepto MapReduce, que como ya he escrito consiste en dividir el trabajo entre n servidores para presentar el resultado después de forma coherente. Veamos un ejemplo:

Digamos[2] que tenemos un cluster de 4 máquinas Hadoop que guardan logs de apache de 40 servidores balanceados, para un total de 100GB al día. Nuestro objetivo es diseñar un trabajo MapReduce para sumar los accesos que ha hecho cada IP en todos los frontales.

Los logs van a ser sencillos y se van a limitar a “FECHA IP URL”.

Servidor 1:

FECHA 172.17.2.34 /index.html
FECHA 172.17.7.13 /contenido/futbol.html
FECHA 172.17.2.34 /img/banner.jpg
FECHA 172.17.2.42 /index.html

Servidor 2:

FECHA 172.17.2.34 /contenido/peliculas.html
FECHA 172.17.7.13 /img/futbol.jpg

Servidor 3:

FECHA 172.17.7.13 /index.html
FECHA 172.17.2.34 /peliculas/bladerunner.html
FECHA 172.17.2.42 /img/banner.jpg

Servidor 4:

FECHA 172.17.2.42 /contenido/series.html
FECHA 172.17.2.42 /img/series.jpg

A partir de estos datos Hadoop empezará la fase Map, que se ejecuta paralelamente en las máquinas del cluster, y que consiste en adaptar las líneas de log al formato clave< tabulador >valor.

Volviendo al ejemplo, la clave será la IP, y el valor la URL (no necesitamos la fecha).
El resultado de la fase Map es el siguiente[3]:

(172.17.2.34 [/index.html /img/banner.jpg /contenido/peliculas.html /peliculas/bladerunner.html])
(172.17.2.42 [/index.html /img/banner.jpg /contenido/series.html /img/series.jpg])
(172.17.7.13 [/contenido/futbol.html /img/futbol.jpg /index.html])

Hemos generado un listado ordenado por IP, en el que se identifican todos los accesos que ha habido.

Una vez hecho esto Hadoop entrega trozos de esta salida a n procesos Reduce, que en este sencillo ejemplo sólo tienen que contar las URL que aparecen en el valor, para generar algo como lo siguiente:

(172.17.2.34    4) 
(172.17.2.42    4)
(172.17.7.13    3)

Esto no es más que un ejemplo. En la realidad, Hadoop se ha usado para cosas tan diversas como convertir 4TB de artículos de periódico escaneados en formato tiff en 11 millones de pdfs; todo usando unas 100 instancias de amazon y en menos de 24 horas, por un total de 240$ sin contar ancho de banda, como se puede ver aquí.

El próximo post describirá una instalación básica del sistema, y el siguiente será una prueba algo más interesante de uso con logs de postfix+amavis, muy al estilo Mailtrust, y usando perl y quizá alguno de los proyectos de apoyo que han surgido alrededor de Hadoop; como Hive, que fue desarrollado por Facebook para crear trabajos MapReduce desde una consola con sintaxis muy similar a SQL.


[1] En el 2008 el webmap de Yahoo! era un grafo de un trillón de vértices(enlaces web) y 100 billones de nodos (URL únicas).
[2] Los tres o cuatro lectores de este blog se enfadan si no uso al menos un “digamos” o “supongamos” en cada post.
[3] En realidad hay más fases, como sort-suffle.