Ahora que ando reestructurando mi laboratorio, voy a aprovechar para documentar un par de aplicaciones que estoy moviendo al nuevo hierro.

En realidad, esto se sale un poco del objetivo de este blog, sobre todo teniendo en cuenta que ya hay kilos de documentación sobre, en este caso, Dovecot; pero bueno, a mí me va a servir como referencia rápida, y quizá os sea de utilidad a alguno de los cuatro que pasáis por aquí.

Empezamos con los buzones compartidos en Dovecot. Como siempre, pretendo ser lo más práctico posible, así que lo mejor que se me ha ocurrido es describir exactamente la forma en la que yo mismo tengo montado "el invento".

El problema

Antes de nada, hablemos sobre las aplicaciones que debemos tener funcionando antes de empezar:

Necesitamos un servidor de correo propio (probablemente Postfix) en el que recibir los mensajes, ya sea porque el MX apunta a él o porque usamos software tipo Fetchmail, por ejemplo. Además, tenemos varios usuarios en nuestro sistema que acceden a su correo a través de Dovecot.

Entre el correo que reciben, además del privado para cada uno, es muy probable que los usuarios estén suscritos a varias listas: de seguridad, de usuarios, anuncios de nuevas versiones de x aplicación, .... Además, también hay cuentas tipo helpdesk y listas para un departamento o grupo que deben estar accesibles para varios usuarios simultaneamente.

Algo habitual en estos casos es, por un lado, que cada usuario gestione sus propias subscripciones a listas de correo, y por otro, para el caso de las cuentas tipo helpdesk de las que hemos hablado, el mandar una copia a cada uno de los n usuarios implicados. Obviamente, esto genera un montón de copias y mensajes repetidos que, aunque no tenga que suponer un enorme problema, sí que es más propenso a fallos, además de no dejar de ser "poco elegante".

La solución

Bien, aquí ya entramos en la forma en la que yo lo hago, que no tiene que ser ni mucho menos la mejor o la única.

Vamos a empezar por el caso más fácil, los buzones públicos:

Por usar un ejemplo concreto, al hablar de "buzón público" me voy a referir a listas de correo generales y accesibles para todo el mundo. El único control de acceso que se va a hacer sobre ellas es a nivel de sistema de ficheros, por lo que recomiendo que los usuarios del sistema pertenezcan a un grupo concreto (o varios) siempre que quieran acceder a una u otra lista.

Y digo usuarios del sistema porque cada lista va a ser un usuario de sistema (mapeada con una cuenta de correo), de tal manera que la suscripción será única. Si quisieramos hacer que todo el mundo tuviera acceso a la lista de usuarios de Postfix, crearíamos el usuario "postlista", por ejemplo, y nos suscribiríamos a la lista de Postfix con postlista _A_T_ forondarena.net (no existe ni existirá nunca, que conste, pero ahora ya tengo un spamtrap más :D ). Como podéis suponer, el correo que llegue a esta dirección se guardará físicamente en (esta parte de la configuración de Postfix/Dovecot os la dejo a vosotros):

  drwxrwx--- 5 postlista postlista 4096 oct  1 17:57   /home/postlista/Maildir

Volviendo a los permisos a nivel de sistema operativo, hay algo importante a tener en cuenta: Dovecot 2.x (y con ello su agente de entrega local que usamos en Postfix) crea los ficheros de correo con los mismos permisos que su directorio principal, y por lo tanto debemos hacer que Maildir tenga 770, por ejemplo (siempre que optemos por los permisos a nivel de sistema operativo para limitar el acceso).

Una vez hecho esto vamos a crear un directorio común que nos va a servir como referencia para que los distintos usuarios "sepan" donde están las listas. La estructura va a ser la siguiente:

  ls -lha /home/listas/Maildir
  lrwxrwxrwx 1 root  root    21 sep 22 22:03 .postlista -> /home/postlista/Maildir

En el directorio "/home/listas" (o cualquier otro, no es un usuario del sistema) vamos a crear un enlace simbólico para cada uno de los buzones públicos.

¿Pero cómo sabe un usuario que puede acceder a esa lista pública?

Para esto usaremos los namespaces de IMAP (El que quiera entrar en el detalle sobre lo que son, que busque el RFC).

Dovecot define los namespaces en un bloque similar al siguiente, a veces en un único dovecot.conf, a veces en otro fichero. Debian, por ejemplo, divide los diferentes apartados de configuración en ".conf" diferentes:

  namespace {
    type = public
    separator = /
    prefix = listas-public/
    location = maildir:/home/listas/Maildir:INDEX=~/Maildir/listas-public
    subscriptions = no
  }

Adaptad la configuración a vuestro gusto, pero lo importante es que cuando el usuario "pepe" se loguee vía IMAP, va a poder suscribirse a todo lo listado en el directorio "/home/listas/Maildir", que además va a aparecer en su Thunderbird en el subdirectorio listas-public. Además, queremos que cada usuario tenga los ficheros propios de control que usa Dovecot en su maildir privado.

Esta es la forma fácil de crear buzones públicos.

Sin embargo, IMAP define una extensión para ACLs (los interesados también tienen RFCs al respecto), con las que podemos definir un control de acceso mucho más detallado, y que nos permitirán hilar mucho más fino en lo que puede hacer el usuario x en el buzón del usuario z. Para que os hagáis una idea, y cogido directamente del wiki de Dovecot, las ACLs permiten establecer los siguientes permisos:

  • lookup: Mailbox is visible in mailbox list. Mailbox can be subscribed to
  • read: Mailbox can be opened for reading
  • write: Message flags and keywords can be changed, except \Seen and \Deleted
  • write-seen: \Seen flag can be changed
  • write-deleted: \Deleted flag can be changed
  • insert: Messages can be written or copied to the mailbox
  • post: Messages can be posted to the mailbox by LDA, e.g. from Sieve scripts
  • expunge: Messages can be expunged
  • create: Mailboxes can be created (or renamed) directly under this mailbox (but not necessarily under its children, see ACL Inheritance section above) (renaming also requires delete rights)
  • delete: Mailbox can be deleted
  • admin: Administration rights to the mailbox (currently: ability to change ACLs for mailbox)

En el wiki de Dovecot podéis ver que letra corresponde a cada permiso. Por ahora lo dejamos y pasamos a la creación de un buzón compartido. Por cierto, no sé si está claro que con "buzón" me puedo estar refiriendo también a carpetas como "Trash", "Sent", o cualquier otra que creemos a mano.

Vamos a suponer que tenemos un usuario "helpdesk" en el que recibimos todo el correo destinado a "helpdesk _A_T_ forondarena.net" (otro spamtrap).

  /home/helpdesk/Maildir

Helpdesk es otro usuario del sistema, y su Maildir tiene permisos 770, para limitar el acceso también a nivel de sistema operativo.

Ahora que ya tenemos el usuario del sistema y que estamos recibiendo correo, vamos a activar el soporte para ACLs en Dovecot. Es muy sencillo, sólo hay que añadir un par de plugins, ya sea en dovecot.conf, o en el fichero que defina vuestra distribución.

  ...
  mail_plugins = acl
  ...
  protocol imap {
    mail_plugins = $mail_plugins imap_acl
  }

Y una vez tenemos los plugins, añadimos la configuración básica:

  plugin {
     acl = vfile
  }

Recordemos que queremos que el usuario "pepe" pueda acceder a todo el buzón "helpdesk". Para conseguirlo, vamos a hacer login con el usuario helpdesk, y vamos a usar el propio protocolo IMAP para dar acceso a pepe:

  # openssl s_client -connect 192.168.10.20:993
  CONNECTED(00000003)
  ..... (información SSL) .....
  * OK [CAPABILITY IMAP4rev1 LITERAL+ SASL-IR LOGIN-REFERRALS ID ENABLE IDLE AUTH=PLAIN] Dovecot ready.
  . login helpdesk password
  . OK [CAPABILITY IMAP4rev1 LITERAL+ SASL-IR LOGIN-REFERRALS ID ENABLE IDLE SORT SORT=DISPLAY THREAD=REFERENCES THREAD=REFS MULTIAPPEND UNSELECT CHILDREN NAMESPACE UIDPLUS LIST-EXTENDED I18NLEVEL=1 CONDSTORE QRESYNC ESEARCH ESORT SEARCHRES WITHIN CONTEXT=SEARCH LIST-STATUS ACL RIGHTS=texk] Logged in
  . setacl INBOX pepe lrwstipekxacd
  . OK Setacl complete.

Dicho de otra forma, hemos permitido "lrwstipekxacd" (cada permiso de la tabla anterior es una de estas letras) en el INBOX de helpdesk al usuario pepe.

¿Pero cómo sabe pepe que puede acceder a helpdesk?

Pues con los namespaces, claro. Vamos a crear uno de tipo "shared":

  namespace {
    ...
    type = shared
    separator = /
    prefix = buzones-shared/%%u/
    location = maildir:%%h/Maildir:INDEX=~/Maildir/buzones-shared/%%u
    ...
  }

Igual que con los buzones públicos, cuando pepe acceda a su cuenta va a ver un directorio buzones-shared en el que aparecerá helpdesk. ¿Verdad?

Pues no, porque no hay forma (con un rendimiento razonable) de hacer que Dovecot se recorra todos los buzones configurados (todos esos %%h y %%u), y sepa a cuáles tiene acceso pepe. A fin de cuentas, pepe y helpdesk son usuarios completamente diferentes, y no hay un enlace en un directorio claramente identificado, como "/home/listas" en el caso de los buzones públicos.

¿Entonces qué?

Es aquí donde entran en juego lo que en Dovecot se llaman "directorios", y que vamos a usar para decir, en este caso a pepe, que puede acceder a helpdesk. La configuración es la siguiente:

  plugin {
    acl_shared_dict = file:/etc/dovecot/acls/shared-mailboxes
  }

Quien dice "file:" dice base de datos o fichero .db. En cualquier caso, para este ejemplo usaremos un fichero en texto plano, "/etc/dovecot/acls/shared-mailboxes". Tened en cuenta, una vez más, que pepe tiene que poder leer el contenido de este fichero, y que además le interesa poder crear un lock en el directorio mientras está trabajando con él (vigilad los permisos unix, usad el directorio que queráis).

El contenido de shared-mailboxes es el siguiente:

  shared/shared-boxes/user/pepe/helpdesk
  1

Es una sintaxis que me parece particularmente "retorcida", pero es lo que hay si no queréis usar base de datos.

Y con esto lo tenemos ya "casi todo" (ver notas al final del post). Cuando pepe abra su Thunderbird y liste las carpetas a las que puede suscribirse verá el inbox de helpdesk, en este caso a partir de la carpeta "buzones-shared".

Notas

Primera nota:

En Dovecot, una vez que definimos un namespace, también tenemos que definir explicitamente el namespace privado. Por lo tanto, la configuración completa, en lo que a namespaces se refiere, tiene que parecerse a esto:

  namespace {
    inbox = yes
    location =
    prefix =
    separator = /
    subscriptions = yes
    type = private
  }
  namespace {
    location = maildir:/home/listas/Maildir:INDEX=~/Maildir/listas-public
    prefix = listas-public/
    separator = /
    subscriptions = no
    type = public
  }
  namespace {
    location = maildir:%%h/Maildir:INDEX=~/Maildir/buzones-shared/%%u
    prefix = buzones-shared/%%u/
    separator = /
    subscriptions = no
    type = shared
  }

Una vez más, adaptad la configuración a vuestras preferencias. Por ejeplo, quizá os venga bien añadir un "list = children" en vuestros namespaces.

Segunda nota:

¿Si habéis hecho pruebas antes de llegar al final, os habéis dado cuenta de que, una vez configurados los buzones shared, las listas públicas han dejado de verse?

¡Claro!, hemos empezado a usar ACLs, así que hay que dar acceso a pepe a las listas. Muy fácil, nos logueamos con el usuario de la lista que queramos abrir, y damos acceso a pepe:

 ...
  * OK [CAPABILITY IMAP4rev1 LITERAL+ SASL-IR LOGIN-REFERRALS ID ENABLE IDLE AUTH=PLAIN] Dovecot ready.
  . login postlista password
  . OK [CAPABILITY IMAP4rev1 LITERAL+ SASL-IR LOGIN-REFERRALS ID ENABLE IDLE SORT SORT=DISPLAY THREAD=REFERENCES THREAD=REFS MULTIAPPEND UNSELECT CHILDREN NAMESPACE UIDPLUS LIST-EXTENDED I18NLEVEL=1 CONDSTORE QRESYNC ESEARCH ESORT SEARCHRES WITHIN CONTEXT=SEARCH LIST-STATUS ACL RIGHTS=texk] Logged in
  . setacl INBOX pepe lrwstipekxacd
  . OK Setacl complete.
  . logout

Tercera nota:

Los ficheros dovecot-acl tienen el siguiente contenido de ejemplo:

  user=pepe akxeilprwts

Pero no los editéis directamente. Usad setacl, getacl, myrights, ..., o mejor aún, alguna extensión de Thunderbird que lo haga por vosotros.

Y para terminar, tened en cuenta, una vez más, que este post no es una referencia para hacer copy/paste. Necesita trabajo (poco) si queréis hacerlo funcionar.


Computación distribuida con Hadoop I

dom, 17 ene 2010 by Foron

Allá por el año 2003, cuando Google ya dominaba el mundo de los buscadores, muchos administradores de sistemas nos preguntábamos por la tecnología que usarían para indexar páginas, calcular Pageranks, gestionar el almacenamiento ....

En ese momento Google publicó varios documentos al respecto, como este sobre MapReduce y este otro sobre su sistema de ficheros GFS (Google File System), que daban algunas pistas, y que a la larga han sido la base de varios proyectos, como Hadoop.

Pretendo que este blog sea sobre todo práctico, así que no voy a escribir demasiado ni sobre la teoría que hay detrás de MapReduce (programación funcional, map/reduce, bla bla bla) ni sobre Google y GFS. Para el que quiera leer algo más sobre la tecnología de Google recomiendo este excelente blog en el que se pueden encontrar varios posts sobre el buscador.

El problema

El problema es sencillo y se resume en el volumen de datos, cada vez mayor, que generan las aplicaciones; y que por supuesto hay que tratar. Por dar un ejemplo, en el 2008 Yahoo! gestionaba unos 5PB de almacenamiento para generar su "webmap" [1], que es uno de los componentes que usa su buscador. Otro ejemplo es Rackspace, y su división de correo, Mailtrust, que genera más de 150GB de logs de correo al día. Por supuesto, además de ser mucho volumen acomulado, es fundamental que se pueda tratar de una forma ágil para dar una respuesta rápida a clientes que piden cosas como "el correo que me mandó fulanito hace 3 meses no me ha llegado" o "quiero todos los correos recibidos en mi dominio en los últimos 5 meses". Esto es razonablemente sencillo cuando se controlan unos pocos cientos de GB, pero se hace mucho más difícil cuando la escala pasa al Tera.

El que haya nombrado estos dos ejemplos no es casual, ya que Yahoo! y Rackspace son dos de los usuarios más notables (junto a Facebook, LastFM, ...) del software que voy a describir en los siguientes posts: Hadoop.

¿Qué es Hadoop?

Resumiendo mucho, Hadoop es un framework software que intenta implementar los conceptos de MapReduce y GFS que presentó Google. Está pensado, por lo tanto, para el tratamiento distribuido de grandes cantidades de datos en máquinas de bajo coste y con poca fiabilidad.

Partiendo de esta base, parece claro que hace falta un sistema de ficheros pensado para almacenar mucha información y con tolerancia ante fallos. Aquí es donde aparece HDFS (Hadoop Distributed File System). Algunas de sus características:

  • El tamaño de bloque por defecto en este sistema de ficheros es de 64MB, pero muchas veces se aumenta a 128MB.
  • Ofrece la mencionada disponibilidad al dejar varias copias de cada bloque en máquinas diferentes.
  • Usa el concepto de "rack awareness", para saber dónde está cada bloque y qué proceso de cálculo puede acceder más rápido a él.
  • Su diseño facilita la lectura secuencial de datos (razonable en discos estándar de bajo coste), pero lo hace sacrificando otras características propias de los sistemas de ficheros POSIX. Por ejemplo, que nadie piense editar un fichero en HDFS y borrar la línea número 2000.

Por otro lado, el framework implementa el concepto MapReduce, que como ya he escrito consiste en dividir el trabajo entre n servidores para presentar el resultado después de forma coherente. Veamos un ejemplo:

Digamos [2] que tenemos un cluster de 4 máquinas Hadoop que guardan logs de apache de 40 servidores balanceados, para un total de 100GB al día. Nuestro objetivo es diseñar un trabajo MapReduce para sumar los accesos que ha hecho cada IP en todos los frontales.

Los logs van a ser sencillos y se van a limitar a "FECHA IP URL".

Servidor 1:

  FECHA 172.17.2.34 /index.html
  FECHA 172.17.7.13 /contenido/futbol.html
  FECHA 172.17.2.34 /img/banner.jpg
  FECHA 172.17.2.42 /index.html

Servidor 2:

  FECHA 172.17.2.34 /contenido/peliculas.html
  FECHA 172.17.7.13 /img/futbol.jpg

Servidor 3:

  FECHA 172.17.7.13 /index.html
  FECHA 172.17.2.34 /peliculas/bladerunner.html
  FECHA 172.17.2.42 /img/banner.jpg

Servidor 4:

  FECHA 172.17.2.42 /contenido/series.html
  FECHA 172.17.2.42 /img/series.jpg

A partir de estos datos Hadoop empezará la fase Map, que se ejecuta paralelamente en las máquinas del cluster, y que consiste en adaptar las líneas de log al formato clave< tabulador >valor.

Volviendo al ejemplo, la clave será la IP, y el valor la URL (no necesitamos la fecha). El resultado de la fase Map es el siguiente [3]:

  (172.17.2.34 [/index.html /img/banner.jpg /contenido/peliculas.html /peliculas/bladerunner.html])
  (172.17.2.42 [/index.html /img/banner.jpg /contenido/series.html /img/series.jpg])
  (172.17.7.13 [/contenido/futbol.html /img/futbol.jpg /index.html])

Hemos generado un listado ordenado por IP, en el que se identifican todos los accesos que ha habido.

Una vez hecho esto Hadoop entrega trozos de esta salida a n procesos Reduce, que en este sencillo ejemplo sólo tienen que contar las URL que aparecen en el valor, para generar algo como lo siguiente:

  (172.17.2.34    4)
  (172.17.2.42    4)
  (172.17.7.13    3)

Esto no es más que un ejemplo. En la realidad, Hadoop se ha usado para cosas tan diversas como convertir 4TB de artículos de periódico escaneados en formato tiff en 11 millones de pdfs; todo usando unas 100 instancias de amazon y en menos de 24 horas, por un total de 240$ sin contar ancho de banda, como se puede ver aquí.

El próximo post describirá una instalación básica del sistema, y el siguiente será una prueba algo más interesante de uso con logs de postfix+amavis, muy al estilo Mailtrust, y usando perl y quizá alguno de los proyectos de apoyo que han surgido alrededor de Hadoop; como Hive, que fue desarrollado por Facebook para crear trabajos MapReduce desde una consola con sintaxis muy similar a SQL.

Notas

[1]En el 2008 el webmap de Yahoo! era un grafo de un trillón de vértices(enlaces web) y 100 billones de nodos (URL únicas).
[2]Los tres o cuatro lectores de este blog se enfadan si no uso al menos un "digamos" o "supongamos" en cada post.
[3]En realidad hay más fases, como sort-suffle.
read more

Computación distribuida con Hadoop II

vie, 05 feb 2010 by Foron

Continúo la serie de posts sobre Hadoop describiendo, muy por encima, los componentes y la estructura que suelen tener este tipo de sistemas. Como ya he dicho en otros posts, este blog pretende ser sobre todo práctico, así que no me voy a extender demasiado.

Gráficamente un cluster Hadoop se puede parecer mucho a esto:

Infraestructura Hadoop

JobTracker:

En un cluster hay un único JobTracker. Su labor principal es la gestión de los TaskTrackers, entre los que distribuye los trabajos MapReduce que recibe. Es, por lo tanto, el interfaz principal que tienen los usuarios para acceder al cluster.

TaskTracker:

Un cluster tiene n TaskTrackers que se encargan de la ejecución de las tareas map y reduce en los nodos. Cada uno de los TaskTrackers gestiona la ejecución de estas tareas en una máquina del cluster. El JobTracker se encarga, a su vez, de controlarlos.

NameNode:

Las funciones principales del NameNode son el almacenamiento y la gestión de los metadatos del sistema de ficheros distribuido y ofrecer el interfaz principal que tiene el usuario para acceder al contenido HDFS. En un cluster hay un único proceso NameNode.

Sin los metadatos que mantiene el NameNode no se sabría en qué nodo está cada bloque, además de perderse la información sobre la estructura de directorios. Es, por lo tanto, el componente más importante del cluster, y debe estar redundado. Hadoop no ofrece de manera nativa ningún mecanismo de alta disponibilidad, pero sí dispone de herramientas que permiten replicar la información. Entre ellas está el NameNode secundario, que permite guardar una copia de los metadatos (en realidad hace más cosas) en una máquina diferente al NameNode. Eso sí, no es una copia en tiempo real, y no ofrece failover automático en caso de fallo del primario. Para esto necesitamos usar otro tipo de herramientas, entre las que destacan DRBD y Heartbeat.

DataNode:

Estos procesos ofrecen el servicio de almacenamiento de bloques para el sistema de ficheros distribuido. Son coordinados por el NameNode.

Estos son los procesos principales del sistema, pero hay otros, como el Balancer, que se encarga de equilibrar la distribución de los bloques entre los DataNodes, por ejemplo cuando se añade un nuevo servidor.

Mi laboratorio se limita a cuatro servidores: un NameNode+JobTracker y tres DataNode+TaskTracker, en cuatro máquinas virtuales de un core y 1G de memoria, para un total de 5G de almacenamiento.

  hadoop@fn140:/usr/local/hadoop/conf$ hadoop dfsadmin -report
  Configured Capacity: 8695013376 (8.1 GB)
  Present Capacity: 5455364096 (5.08 GB)
  DFS Remaining: 5455290368 (5.08 GB)
  DFS Used: 73728 (72 KB)
  DFS Used%: 0%
  Under replicated blocks: 0
  Blocks with corrupt replicas: 0
  Missing blocks: 0

  -------------------------------------------------
  Datanodes available: 3 (3 total, 0 dead)

  Name: 192.168.10.142:50010
  Decommission Status : Normal
  Configured Capacity: 2898337792 (2.7 GB)
  DFS Used: 24576 (24 KB)
  Non DFS Used: 1079918592 (1.01 GB)
  DFS Remaining: 1818394624(1.69 GB)
  DFS Used%: 0%
  DFS Remaining%: 62.74%
  Last contact: Sun Jan 31 16:10:58 CET 2010


  Name: 192.168.10.143:50010
  Decommission Status : Normal
  Configured Capacity: 2898337792 (2.7 GB)
  DFS Used: 24576 (24 KB)
  Non DFS Used: 1080020992 (1.01 GB)
  DFS Remaining: 1818292224(1.69 GB)
  DFS Used%: 0%
  DFS Remaining%: 62.74%
  Last contact: Sun Jan 31 16:10:58 CET 2010


  Name: 192.168.10.141:50010
  Decommission Status : Normal
  Configured Capacity: 2898337792 (2.7 GB)
  DFS Used: 24576 (24 KB)
  Non DFS Used: 1079709696 (1.01 GB)
  DFS Remaining: 1818603520(1.69 GB)
  DFS Used%: 0%
  DFS Remaining%: 62.75%
  Last contact: Sun Jan 31 16:10:56 CET 2010

En los próximos dos posts voy a dejar la "teoría" para pasar a la práctica, empezando por una de las aplicaciones más obvias de Hadoop, como es el tratamiento de logs; para seguir por una de las utilidades que existen para facilitar el desarrollo de aplicaciones MapReduce: pig.

read more

Computación distribuida con Hadoop III

dom, 21 mar 2010 by Foron

En este tercer post de la serie dejamos la teoría para ver un caso práctico de aplicación de Hadoop, que completaré en un cuarto post.

Como sabemos, gracias a Hadoop conseguimos un sistema con mucha capacidad de almacenamiento, a un coste por mega reducido, redundado, de rendimiento prácticamente lineal (más máquinas = más rendimiento) y sobre el que ejecutar de forma pararela todo tipo de aplicaciones que se puedan implementar como MapReduce.

Con estos datos, hay una utilidad práctica que viene inmediatamente a la mente: tratamiento de logs. Veamos:

  • Me gustaría decir lo contrario, pero personalmente no conozco a nadie que maneje el dinero destinado a proyectos que no ponga pegas a una inversión fuerte para el tratamiento de logs. El coste por mega de Hadoop es difícil de superar.
  • Si combinamos Hadoop, por ejemplo para datos con hasta un año de antiguedad, con un soporte en cinta para el resto, lo tendremos fácil para cumplir cualquier legislación en lo relacionado a retención de datos.
  • Sin llegar a los niveles de Facebook o Rackspace, es probable que el volumen de logs que guardemos durante un año sea lo suficientemente grande como para hacer difícil su tratamiento si un juez (por ejemplo) pide todos los correos desde la cuenta x a la cuenta z de los últimos siete meses, o todos los acceso al foro web de fulanito. MapReduce y Hadoop permiten ejecutar este tipo de tareas sin mayores problemas, y con un rendimiento prácticamente lineal.

En este entorno, planteo un ejemplo de tratamiento de logs de Postfix de lo más simple, con un filtro de contenido en modo post-queue (Amavis, por ejemplo, en su uso más habitual). Vamos, nada nuevo.

El objetivo sería, además de los puntos mencionados anteriormente, poder realizar cualquier tipo de consulta sobre los logs de meses (o años), en un tiempo razonable. Y digo tiempo razonable porque en este tipo de consultas no suele hacer falta tener el resultado al momento. Se acepta esperar durante 1,5 o 10 minutos hasta que se genere el resultado.

Resumiendo, tenemos una infraestructura parecida a esta:

Infraestructura Hadoop y Amavis

Todos los servidores envían los logs de manera centralizada a un syslog remoto, que además de hacer una copia diaria a cinta, almacena la información en Hadoop, usando alguno de los muchos interfaces que se ofrecen, como pueden ser FUSE o FTP.

Junto a esto tendremos una máquina, preferiblemente un interfaz web, para generar trabajos MapReduce. Básicamente necesitaremos un formulario (cuenta origen, destino, fechas, ...), y algo de control sobre los trabajos que ya se han realizado para mostrar los resultados.

Streaming

Se pueden crear trabajos MapReduce para Hadoop de varias formas, pero en este ejemplo me voy a limitar a lo que se llama "Streaming". Dicho de otra forma (aunque no sea del todo cierto), a programar aplicaciones MapReduce en perl o python (o en cualquier lenguaje que pueda leer y escribir en la entrada y salida estándar).

La gran ventaja que tiene el Streaming es que permite desarrollar prototipos de una forma rápida, aunque no sea lo más eficiente. En cualquier caso, sobra para este ejemplo.

La mejor forma de explicar lo que es el Streaming en Hadoop es ver el equivalente, en versión linux:

  cat datos_entrada.txt | mapper.pl | sort | reducer.pl

El mapper.pl recibirá los datos desde los ficheros de log, línea por línea, y los tratará, escribiendo por salida estándar con el formato clave "tabulador" valor.

Esta salida se ordena, y pasa a un reducer. Hadoop garantiza que todas las claves serán tratadas por el mismo reducer, aunque éste puede, y normalmente así lo hará, procesar varias claves (este es un buen momento para recordar que los ficheros de entrada pueden ser, por decir algo, 200GB de datos, sin que suponga ningún problema para Hadoop).

Volviendo al ejemplo, veamos unas cuantas líneas de log de postfix:

  Feb 20 14:27:03 server postfix/smtpd[17875]: E78D13073B2: client=mail.example.com[10.0.0.6]
  Feb 20 14:27:03 server postfix/cleanup[17822]: E78D13073B2: message-id=
  Feb 20 14:27:04 server postfix/qmgr[5370]: E78D13073B2: from=, size=20827, nrcpt=1 (queue active)
  Feb 20 14:27:04 server postfix/smtp[17980]: E78D13073B2: to=, relay=127.0.0.1[127.0.0.1]:10025, delay=0.6, delays=0.08/0/0/0.51, dsn=2.0.0, status=sent (250 Ok: queued as 6A737307316)
  Feb 20 14:27:04 server postfix/qmgr[5370]: E78D13073B2: removed

  Feb 20 14:27:04 server postfix/smtpd[17882]: 6A737307316: client=localhost[127.0.0.1]
  Feb 20 14:27:04 server postfix/cleanup[17817]: 6A737307316: message-id=
  Feb 20 14:27:04 server postfix/qmgr[5370]: 6A737307316: from=, size=21719, nrcpt=1 (queue active)
  Feb 20 14:27:04 server postfix/pipe[16491]: 6A737307316: to=, relay=maildrop, delay=0.16, delays=0.11/0.01/0/0.04, dsn=2.0.0, status=sent (delivered via maildrop service)
  Feb 20 14:27:04 server postfix/qmgr[5370]: 6A737307316: removed

Esta es la información que postfix loguea al recibir y entregar un mensaje usando un filtro after-queue. He ordenado los logs para que sea más claro, pero en la realidad no es así.

En este ejemplo se recibe un correo desde mail.example.com, y se le asigna el queue-id E78D13073B2. También se loguea el message-id, el origen y el tamaño.

Una vez se ha encolado el mensaje, se envía a Amavis, a través de 127.0.0.1:10025. Como se puede ver en el ejemplo, en este punto sabemos el destino del mensaje, y también si fue aceptado o no. Lo sabemos porque el log hace referencia al queue id que amavis reinjecta en postfix. De no haberlo aceptado no tendríamos un queue id en el log.

Este nuevo mensaje mantiene el message-id, el tamaño es algo mayor, y termina entregandose, en este caso en la máquina local, usando la aplicación courier-maildrop.

Bien, esto no es más que un ejemplo. En la realidad hay que tratar muchos casos: aliases, reenvíos a máquinas remotas, bounces, mensajes que pasan a deferred por no haber podido entregarse, ....

Además, e insistiendo en que esto no es más que un ejemplo, voy a asumir dos cosas, que aunque se cumplen a menudo, no son ciertas:

  • Que el queue-id es único.
  • Que el message-id es único.

Mapper.pl

El proceso comienza con Hadoop enviando los ficheros de entrada (logs) a la entrada estándar de los mapper.pl que estemos ejecutando en el cluster, que tratarán cada línea y la enviarán por salida estándar con el formato:

  queueidinformacion_separada_por_comas

Para separar la clave del valor se usa un tabulador. Normalmente, los campos del valor se separan con espacios, pero en este ejemplo he preferido las "comas".

Veamos un trozo del código:

 ..............
 while (<>)
 ..............
 if ($linea =~ /^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) .+ postfix\/smtpd\[\d+\]: (\w+): client=(\S+)$/) {
                 $estructura{fecha} = $1; $estructura{qid} = $2; $estructura{host} = $3;
                 print $estructura{qid} . "\t" . "Conexion," . join (",",$estructura{fecha},$estructura{host}) . "\n";
         }
          elsif ($linea =~ /^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) .+ postfix\/smtp\[\d+\]: (\w+): to\=\<([^ ]+)\>, relay=127.0.0.1.*status=(.*) (\w+)\)$/) {

                 $estructura{fecha} = $1; $estructura{qid} = $2; $estructura{to} = $3; $estructura{origto} = $3; $estructura{status} = $4; $estructura{relay} = "Antispam"; $estructura{rqid} = $5; $estructura{status} =~ s/,//g;
                 print $estructura{qid} . "\t" . "Antispam," . join (",",$estructura{fecha},$estructura{to},$estructura{to},$estructura{relay},$estructura{status},$estructura{rqid}) . "\n";
         }
 ...........
 }

Tenemos varios "if/elsif", que adaptan la línea del log de entrada al formato que quiero. Además, he añadido un texto con el tipo de entrada de log que estamos tratando, y que sabemos por el "if" en el que estamos (Antispam en este ejemplo), para luego usarlo en el reducer.

El código completo incluye también la detección de los mensajes que Amavis considera que son spam, en los que pone el rqid como 0. Además, en aquellos casos donde "no proceda" aplicar el concepto de rqid pongo -1 como valor. Todo esto servirá en el reducer.

Volviendo a los logs de ejemplo, el resultado de ejecutar el mapper sobre ellos es:

  E78D13073B2     Conexion,Feb 20 14:27:03,mail.example.com[10.0.0.6]
  E78D13073B2     Msgid,Jan 20 06:27:03,F0E59D47X9B343A7DFRAB1F0E45@EUW0600375
  E78D13073B2     Origen,Feb 20 14:27:04,prueba@example.com,20827
  E78D13073B2     Antispam,Feb 20 14:27:04,destino@example.org,destino@example.org,Antispam,sent (250 Ok: queued as,6A737307316)
  E78D13073B2     Cierre,Feb 20 14:27:04
  6A737307316     Conexion,Feb 20 14:27:04,localhost[127.0.0.1]
  6A737307316     Msgid,Feb 20 14:27:04,F0E59D47X9B343A7DFRAB1F0E45@EUW0600375
  6A737307316     Origen,Feb 20 14:27:04,prueba@example.com,21719
  6A737307316     Maildrop,Feb 20 14:27:04,destino@example.org,destino@example.org,Maildrop,sent (delivered via maildrop service),-1
  6A737307316     Cierre,Feb 20 14:27:04

Después de esto Hadoop ejecutaría un sort (en el ejemplo ya están ordenados por queue-id), y volvería a enviar el resultado a los reducer.pl del cluster. Por lo tanto, sabemos que un reducer va a recibir todas las líneas de log correspondientes a un mismo queue-id.

Reducer.pl

Pasamos directamente a ver algo del código del reducer.pl:

 my %mensaje = (qid => "",
             fecha => "",
             servidor => "",
             msgid => "",
             origen => "",
             size => "",
             to => "",
             origto => "",
             rqid => "",
             relay => "",
             status => "",
             cierre => ""
             );

 ..............
 while () {
     chomp;
     ($qid, $valor) = split /\t/;

         if ($qid ne $qidanterior) {
                 # tenemos un queueid nuevo
                 if ($qidanterior ne "") {
                         &mostrar_datos();

                         %mensaje = (qid => "",
                                 fecha => "",
                                 desdeip => "",
                                 msgid => "",
                                 origen => "",
                                 size => "",
                                 to => "",
                                 origto => "",
                                 rqid => "",
                                 status => "",
                                 cierre => ""
                         );
                 }
                 $qidanterior = $qid;
         }

         # seguimos procesando los datos del mismo qid
         $mensaje{qid} = $qid;
         @datos = split(",",$valor);

         switch ($datos[0]) {
                 case "Conexion" {
                         $mensaje{fecha} = $datos[1];
                         $mensaje{desdeip} = $datos[2];
                 }
                 case "Msgid" {
                         $mensaje{msgid} = $datos[2];
                 }
                 case "Origen" {
                         $mensaje{origen} = $datos[2];
                         $mensaje{size} = $datos[3];
                 }
                 case "Antispam" {
                         $mensaje{to} .= $datos[2] . "-separador-";
                         $mensaje{origto} .= $datos[3] . "-separador-";
                         $mensaje{status} .= $datos[1] . " " .$datos[2] . " " . $datos[3] . " " . $datos[4] . " " . $datos[5] . "-separador-";
                         $mensaje{rqid} = $datos[6];
                 }

 ................
 sub mostrar_datos {
      ................
      if ( ($mensaje{fecha} ne "") && ($mensaje{cierre} ne "") ) {
           # mensaje completo: Salida estandar
           # hay que revisar que se haya pasado por todas las fases
           # y realizar comprobaciones, que aqui no se hacen
           $fechaentrada = `date -u -d '$mensaje{fecha}' '+%s'`; chomp $fechaentrada;
           $fechasalida = `date -u -d '$mensaje{cierre}' '+%s'`; chomp $fechasalida;

           $datoscodificables = $mensaje{desdeip} .",".$mensaje{size}.",".$fechasalida.",".$mensaje{qid}.",".$mensaje{rqid}.",".$mensaje{status};
           $resto = encode_base64($datoscodificables,"");

           print $mensaje{msgid} . "\t" . $fechaentrada . " " . $mensaje{origen} . " " . $mensaje{to} . " " . $mensaje{origto} . $resto . "\n";

     } else {
           # mensaje incompleto. Guardar en otro fichero para poder seguir el tracking
           # Falta por hacer
     }

 }

Por lo tanto, cogemos cada línea de entrada, y ayudándonos de la descripción del tipo de línea que hemos añadido en el mapper, vamos completando la estructura de datos que después será el resultado (por fin) que queremos.

Un ejemplo de la ejecución de este trabajo en mi cluster de prueba:

  $ hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-0.20.2-streaming.jar -input /user/hadoop/mail_logs -output /user/hadoop/logs_formateados/domingo_14_03 -file /home/hadoop/mapper.pl -file /home/hadoop/reducer.pl -mapper "/usr/bin/perl -w mapper.pl" -reducer "/usr/bin/perl -w reducer.pl"

  packageJobJar: [/home/hadoop/mapper.pl, /home/hadoop/reducer.pl, /home/hadoop/hadoop-unjar4902963819550178151/] [] /tmp/streamjob534817239215339193.jar tmpDir=null
  10/03/18 22:20:02 INFO mapred.FileInputFormat: Total input paths to process : 4
  10/03/18 22:20:02 INFO streaming.StreamJob: getLocalDirs(): [/home/hadoop/mapred/local]
  10/03/18 22:20:02 INFO streaming.StreamJob: Running job: job_201003182146_0001
  10/03/18 22:20:02 INFO streaming.StreamJob: To kill this job, run:
  10/03/18 22:20:02 INFO streaming.StreamJob: /usr/local/hadoop/bin/hadoop job  -Dmapred.job.tracker=fn140:8021 -kill job_201003182146_0001
  10/03/18 22:20:02 INFO streaming.StreamJob: Tracking URL: http://fn140.forondarena.net:50030/jobdetails.jsp?jobid=job_201003182146_0001
  10/03/18 22:20:03 INFO streaming.StreamJob:  map 0%  reduce 0%
  10/03/18 22:20:17 INFO streaming.StreamJob:  map 8%  reduce 0%
  10/03/18 22:20:18 INFO streaming.StreamJob:  map 11%  reduce 0%
  10/03/18 22:20:20 INFO streaming.StreamJob:  map 15%  reduce 0%
  .....
  10/03/18 22:21:56 INFO streaming.StreamJob:  map 92%  reduce 25%
  10/03/18 22:21:59 INFO streaming.StreamJob:  map 100%  reduce 25%
  ....
  10/03/18 23:14:13 INFO streaming.StreamJob:  map 100%  reduce 98%
  10/03/18 23:15:55 INFO streaming.StreamJob:  map 100%  reduce 99%
  10/03/18 23:17:31 INFO streaming.StreamJob:  map 100%  reduce 100%
  10/03/18 23:18:25 INFO streaming.StreamJob: Job complete: job_201003182146_0001
  10/03/18 23:18:25 INFO streaming.StreamJob: Output: /user/hadoop/logs_formateados/domingo_14_03

El resultado es un fichero en la ruta "/user/hadoop/logs_formateados/domingo_14_03", con líneas formadas por un message-id, seguido por un tabulador, seguido del origen, el destino, el destino antes de la expansión de aliases, y del resto de la información, codificada en base64, todo separado por espacios.

No hace falta decir que el formato de salida en un entorno real se debería adaptar a lo que se necesitase. De hecho, Rackspace usa una estructura binaria, donde entre otras cosas se guarda la información de los diferentes saltos que hace un correo dentro de su sistema. Este ejemplo, por supuesto, es mucho más sencillo.

Al final del proceso hemos obtenido un fichero, que es el que de verdad queremos en nuestro cluster, con toda la información de cada mensaje que ha entrado en nuestra red en un periodo de tiempo concreto (digamos que cada 4 horas volcamos los logs de syslog a Hadoop y ejecutamos la tarea), como se ve en los siguientes ejemplos (sin la codificación en base64, para que sean más fáciles de seguir):

Mensaje que se entrega en un buzón local:

  ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org 1268225082 origen@example.org destino@example.com destino@example.com mail.example.org[10.0.2.34],2430,1268225085,B6A6A296E55,158AB296F24,Mar 10 12:44:45 destino@example.com destino@example.com Antispam sent (250 OK sent queued as 158AB296F24

  ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org 1268225085 origen@example.org destino@example.com destino@example.com localhost[127.0.0.1],3081,1268225085,158AB296F24,-1,Mar 10 12:44:45 destino@example.com destino@example.com Maildrop sent (delivered via maildrop service)

Por lo tanto:

  • El mensaje con id ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org
  • Entró en el sistema a las 1268225082
  • Desde el servidor mail.example.org[10.0.2.34]
  • El origen era origen@example.org
  • Y el destino destino@example.com (no es un alias)
  • Una vez pasado por el sistema antispam se reencola con queueid 158AB296F24
  • El mensaje de la entrega al servidor antispam es: sent (250 OK sent queued as 158AB296F24)
  • Una vez ha pasado el filtro antispam el mensaje se reencola a las 1268225085
  • Por supuesto desde el servidor localhost[127.0.0.1]
  • Y el mensaje de la entrega local es Mar 10 12:44:45 Maildrop sent (delivered via maildrop service), sólo al destino destino@example.com

Mensaje que se entrega a un servidor remoto:

  1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3 1268219047 origen@example.net destino@example.com destino@example.com mail.example.net[10.5.3.2],1901,1268219051,CA6782FABFB,B6C7A2FAACA,Mar 10 11:04:11 destino@example.com destino@example.com Antispam sent (250 OK sent queued as B6C7A2FAACA

  1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3 1268219049 origen@example.net destinoreal@remoto.example.org destino@example.com localhost[127.0.0.1],2686,1268219051,B6C7A2FAACA,-1,Mar 10 11:04:11 destinoreal@remoto.example.org destino@example.com mail.example.org[10.0.2.34]:25 sent (250 2.0.0 Ok: queued as 423872AB862)
  • Su messageid es 1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3
  • Entro en el sistema a las 1268219047
  • Desde el servidor mail.example.net[10.5.3.2]
  • Desde el origen origen@example.net
  • Y al destino destino@example.com, que en la fase antispam todavía no se expande a ningún alias
  • El mensaje se relaciona con B6C7A2FAACA
  • Y el mensaje de la entrega es Mar 10 11:04:11 destino@example.com destino @example.com Antispam sent (250 OK sent queued as B6C7A2FAACA)
  • Se reencola, a través de localhost, a las 1268219049
  • El mensaje pasa a ser la dirección real destinoreal@remoto.example.org, proveniente del alias destino@example.com
  • El mensaje se entrega en mail.example.org[10.0.2.34]:25
  • Con el mensaje sent (250 2.0.0 Ok: queued as 423A72AB262)

Mensaje que es spam:

  9545850218.UWD8KHA190@spammer.example.info 1268238332 spammer@example.net destino@example.com destino@example.com mail.example.info[10.8.2.12],2141,1268238333,F355429B3A5,0,Mar 10 16:25:33 destino@example.com destino@example.com Antispam sent (250 OK sent mensaje_rechazado por spam

En este caso el mensaje es spam, y no se relaciona con ningún otro queueid.

Mensaje que se entrega a varias cuentas:

  2010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com mail.example.org[10.0.2.34],15853,1268223532,5A4FE29B3B6,43A9B28B357,Mar 10 12:18:52 destinovarios@example.com destinovarios@example.com Antispam sent (250 OK sent queued as 43A9B28B357

  2010031011.AB263566.pc1@mailer.example.org 1268223532 usuario@example.org destinolocal@example.com-separador-destinoremoto@example.info destinovarios@example.com-separador-destinovarios@example.com localhost[127.0.0.1],16525,1268223532,43A9B28B357,-1,Mar 10 12:18:52 destinolocal@example.com destinovarios@example.com Maildrop sent (delivered via maildrop service)-separador-Mar 10 12:18:52 destinoremoto@example.info destinovarios@example.com mail.example.info[10.8.2.12]:25 sent (250 2.0.0 Ok: queued as 452362CA51A)
  • Ahora tenemos un mensaje con id 2010031011.AB263566.pc1@mailer.example.org
  • Que se envía desde usuario@example.org
  • Al destino destinovarios@example.com
  • Que a su vez es un alias a dos cuentas destinolocal@example.com y destinoremoto@example.info
  • Los mensajes de entrega a cada una de ellas son:
    • Mar 10 12:18:52 destinolocal@example.com Maildrop sent (delivered via maildrop service)
    • Mar 10 12:18:52 destinoremoto@example.info mail.example.info[10.8.2.12]:25 sent (250 2.0.0 Ok: queued as 452362CA51A)

Como se puede ver, ya tenemos toda la información necesaria, debidamente preformateada, por lo que ya no necesitamos los ficheros mail.log, que podríamos borrar de Hadoop (que no de las cintas).

La información contenida en estos ficheros nos servirá para preparar consultas, bien directamente con otros trabajos MapReduce, bien con otras herramientas, tal y como mostraré en el siguiente post de la serie, en el que nos aprovecharemos del formato de los ficheros para hacer consultas por message-id, cuenta origen, fecha o cuenta destino (según entró en el sistema o una vez expandidos los aliases).

En este punto hay que volver a recordar que Hadoop está pensado para trabajar con cantidades muy grandes de información, en los que hacer un simple "grep" o usar una base de datos relacional estándar resulta impracticable.

Por último, tened en cuenta que el código de este post, además de ser sólo un ejemplo, es completamente Alpha. Aparte de no funcionar del todo bien (tiene varios fallos que he detectado pero que no he corregido todavía), necesitaría tener en cuenta muchos más casos, como mensajes cuya entrega se retrasa durante días (que no deja de ser algo normal en los sistemas de correo reales).

En el cuarto y último post de la serie: acceso a Hadoop a través de una herramienta de apoyo, como pig.

read more

Computación distribuida con Hadoop IV

dom, 30 may 2010 by Foron

Antes de que me adelanten, y sobre todo por las durísimas presiones por parte de las tres personas que leen este blog :-), voy a terminar esta serie de posts, hablando sobre una de las herramientas que se han implementado para dar un mayor nivel de abstracción (o facilidad de uso, como se prefiera decir) a Hadoop.

Hadoop tiene un problema, o al menos un argumento que se usa en su contra: Desarrollar una aplicación MapReduce lleva demasiado tiempo. A pesar de que el Streaming del que hablamos en posts anteriores lo solucione en parte, es cierto que si queremos trabajar de forma "nativa" con el framework tenemos que estar preparados para escribir un buen puñado de líneas de código.

Para dar salida este problema han ido apareciendo distintas alternativas, entre las que destacan Hive (by Facebook) y Pig (by Yahoo). En este post me voy a centrar en esta última, aunque esto no significa, ni mucho menos, que sea mejor que Hive.

Gracias a Pig se pueden escribir "en 4 lineas" aplicaciones MapReduce. Dicho de otra forma, permite que nos centremos en el qué queremos hacer con nuestros datos, y no en el cómo lo hacemos. Se basa en dos componentes:

  • Un sencillo lenguaje para escribir los scripts, llamado "pig latin".
  • El ""compilador"", que trasforma los scripts en aplicaciones MapReduce para su ejecución en el cluster Hadoop.

Junto a esto, Pig tambén ofrece un shell interactivo (se llama grunt), para poder ejecutar comandos.

Bien, terminada la teoría (fácil, ¿no?), vamos a centrarnos en los datos, tal y como hemos dicho antes. (Para el que no haya leido los posts anteriores, este sería un buen momento).

Hasta ahora hemos estado preparando nuestra plataforma de logueo para poder disponer de años, y quizá terabytes, de logs en un mismo lugar, barato y redundado, del que poder sacar información útil en unos pocos minutos. Recordemos:

  1. Los logs de todos nuestros servidores se consolidan en una/varias máquinas syslog (o similar).
  2. Los logs se copian tal cual en Hadoop. A pesar de que vayan a estar replicados, no es mala idea tener una copia extra en cinta.
  3. Ejecutamos un primer script MapReduce que lee los scripts y los adapta al formato que queremos y sobre el que trabajaremos.
  4. Ya no necesitamos los ficheros originales, los podemos borrar de Hadoop. En caso de desastre, siempre podríamos recurrir a las cintas.

¿Cómo guardábamos los logs?

El resultado del reducer.pl son líneas como esta:

  010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com mail.example.org[10.0.2.34],15853,1268223532,5A4FE29B3B6,43A9B28B357,Mar 10 12:18:52 destinovarios@example.com destinovarios@example.com Antispam sent (250 OK sent queued as 43A9B28B357

Recordad que el script guarda mucha información formateada en base64, por lo que en realidad tenemos algo similar a esto:

  2010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com texto_en_base64

¿Por qué?

Porque todas las consultas que se suelen hacer son del tipo:

  • No he recibido un mensaje enviado entre las x y las z del martes, enviado desde esta cuenta, y con destino a esta otra.
  • Quiero todos los mensajes que se recibieron en tal buzón.

La estructura de logs que hemos preparado va a permitir que los scripts sean muy sencillos, sin ningún tipo de join o similar (ahora lo vemos). Dejaremos al interfaz web desde el que se van a ver los resultados el peso de formatear las entradas y hacerlas "visuales".

¿Interfaz web?

Nada mejor que un sencillo interfaz web en el que escribir los parámetros de búsqueda, preparar el script .pig, ejecutarlo, y mostrar los resultados una vez termine. Evidentemente, no voy a escribir aquí el formulario, pero por dar un ejemplo, imaginemos que queremos saber los correos que han llegado desde infojobs entre las 8 a.m. del 11 de marzo, y las 12 a.m. del mismo día.

Empezamos pasando las fechas a timestamp. Muy fácil:

  $ date -u -d 'Thu Mar 11 08:00:00 2010' '+%s'
  1268294400

  $ date -u -d 'Thu Mar 11 12:00:00 2010' '+%s'
  1268308800

Con esto y con la dirección origen ya tenemos todo lo necesario. Ahora lo normal sería escribir un fichero .pig, pero aquí vamos a usar el intérprete de comandos grunt para ver los resultados paso a paso:

  $ pig
  10/05/25 21:57:29 INFO pig.Main: Logging error messages to: /usr/local/pig-0.7.0/pig_1274817449369.log
  2010-05-25 21:57:29,688 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://fn140
  2010-05-25 21:57:30,149 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: fn140:8021
  grunt>

Empezamos cargando los datos:

  grunt> REGISTROS = LOAD '/user/hadoop/logs_formateados' USING PigStorage('\t') AS (msgid:chararray, fecha:long, origen:chararray, destinoreal:chararray, destinoentrada:chararray, resto:chararray);

Con este comando hemos cargado los datos desde el directorio /user/hadoop/logs_formateados en la variable REGISTROS. Los ficheros de entrada se separan con un tabulador [1], con el schema que hemos venido usando. Veamos, antes de seguir, si Pig ha entendido lo que realmente queremos:

  grunt> ILLUSTRATE REGISTROS
  2010-05-25 23:58:36,627 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://fn140
  2010-05-25 23:58:36,681 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: fn140:8021
  2010-05-25 23:58:36,791 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
  2010-05-25 23:58:36,792 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
  --------------------------------------------------------------------------------------------------------------------
  | REGISTROS | msgid: chararray | fecha: long | origen: chararray   | destinoreal: chararray | destinoentrada: chararray | resto: chararray |
  --------------------------------------------------------------------------------------------------------------------
  |           | 6B160A8CF29E064XA067E116FA2A458CAD8A66@GLOBL0310V01.exampleserv.com | 1268304751  | pr1user@example.com | mga@example.net        | mga@example.net           | texto_base64     |
  --------------------------------------------------------------------------------------------------------------------

Ahora que tenemos todos los registros, vamos a filtrarlos en base a los criterios que hemos definido:

  grunt> FILTRADOS = FILTER REGISTROS BY (origen matches '.+infojobs.+') AND (fecha > 1268294400) AND (fecha < 1268301600);

Hecho esto sólo nos queda agruparlos por msgid. Recordad, tenemos dos entradas por mensaje: una para el nivel antispam, y otra para la entrega. En realidad, en base al msgid podríamos seguir un mensaje por tantos saltos como tuviéramos (algo parecido a lo que Mailtrust ha documentado que hace):

  grunt> AGRUPADOS = GROUP FILTRADOS BY msgid;

Y con esto ya tenemos el resultado. Lo podríamos confirmar con el comando DUMP, que muestra los datos por salida estándar, pero lo vamos a guardar directamente en un fichero:

  grunt> STORE AGRUPADOS INTO '/tmp/fichero_resultados.txt' USING PigStorage();
  2010-05-26 22:17:03,942 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_LONG 2 time(s).
  2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.impl.logicalLayer.optimizer.PruneColumns - No column pruned for REGISTROS
  2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.impl.logicalLayer.optimizer.PruneColumns - No map keys pruned for REGISTROS
  2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.ResourceSchema - Insert two-level access to Resource Schema
  2010-05-26 22:17:03,983 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_LONG 2 time(s).
  2010-05-26 22:17:03,985 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: Store(hdfs://fn140/tmp/fichero_resultados.txt:PigStorage) - 1-1007 Operator Key: 1-1007)
  2010-05-26 22:17:03,986 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
  2010-05-26 22:17:03,986 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
  2010-05-26 22:17:04,019 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
  2010-05-26 22:17:05,180 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
  2010-05-26 22:17:05,188 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
  2010-05-26 22:17:05,191 [Thread-120] WARN  org.apache.hadoop.mapred.JobClient - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
  2010-05-26 22:17:05,545 [Thread-120] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
  2010-05-26 22:17:05,546 [Thread-120] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
  2010-05-26 22:17:06,382 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201005262134_0008
  2010-05-26 22:17:06,382 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://fn140:50030/jobdetails.jsp?jobid=job_201005262134_0008
  2010-05-26 22:17:06,384 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
  2010-05-26 22:17:19,214 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 10% complete
  2010-05-26 22:17:20,719 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 20% complete
  ...............
  2010-05-26 22:17:28,241 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 60% complete
  2010-05-26 22:17:41,788 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
  2010-05-26 22:17:41,788 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Successfully stored result in: "hdfs://fn140/tmp/fichero_resultados.txt"
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Records written : 72
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Bytes written : 44727
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Spillable Memory Manager spill count : 0
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Proactive spill count : 0
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

Vaya, parece que tenemos 72 usuarios suscritos a infojobs. Veamos un ejemplo de lo que se ha escrito en "/tmp/fichero_resultados.txt" (es un path de HDFS, no el /tmp local). Los saltos de línea los he puesto yo para que sea más visual, y he escrito la versión sin codificar en base64:

  1268304853778.112.POCFQELSVS@[hellboy07]
  {
      (1268304853778.112.POCFQELSVS@[hellboy07],1268308417,aviso_incripciones@push.infojobs.net,usuario@example.com,
        usuario@example.com,localhost[127.0.0.1],11854,1268308418,9ABC12FACB4,-1,
        Mar 11 11:53:38 usuario@example.com usuario@example.com Maildrop sent (delivered via maildrop service))
  ,
      (1268304853778.112.POCFQELSVS@[hellboy07],1268308417,aviso_incripciones@push.infojobs.net,usuario@example.com,
        usuario@example.com,push3.infojobs.net[79.171.25.71],11107,1268308418,3B0213FAECD,9ABC12FACB4,
        Mar 11 11:53:38 usuario@example.com usuario@example.com Antispam sent (250 OK queued as 9ABC12FACB4)
  }

Ahora sólo necesitamos que nuestro interfaz web formatee el resultado y lo saque por pantalla. Fácil y sencillo.

Resumiendo, con estas cuatro líneas podemos hacer cualquier búsqueda en los logs, tengan el tamaño que tengan:

  REGISTROS = LOAD '/user/hadoop/logs_formateados' USING PigStorage('\t') AS (msgid:chararray, fecha:long, origen:chararray, destinoreal:chararray, destinoentrada:chararray, resto:chararray);
  FILTRADOS = FILTER REGISTROS BY (origen matches '.+infojobs.+') AND (fecha > 1268294400) AND (fecha < 1268301600);
  AGRUPADOS = GROUP FILTRADOS BY msgid;
  STORE AGRUPADOS INTO '/tmp/fichero_resultados.txt' USING PigStorage();

Referencias

Hay muchos artículos y documentación disponible en la red sobre Hadoop. En cuanto a bibliografía, que yo sepa hay al menos tres libros disponibles:

  • Pro Hadoop, de la editorial Apress.
  • Hadoop: The Definitive Guide, de la editorial O'Reilly.
  • Hadoop in Action, de la editorial Manning.

Notas

[1]La verdad es que he hecho trampa. El reducer original separaba el id de mensaje del resto de datos con un tabulador, pero luego usaba un espacio simple. Esto es porque al principio tenía pensado trabajar otros aspectos de Hadoop y Pig, como el Chaining y las "user defined functions". Al final, para no tener que escribir más posts, he cambiado el reducer para que separe todos los campos con tabuladores.
read more