Computación distribuida con Hadoop III

dom, 21 mar 2010 by Foron

En este tercer post de la serie dejamos la teoría para ver un caso práctico de aplicación de Hadoop, que completaré en un cuarto post.

Como sabemos, gracias a Hadoop conseguimos un sistema con mucha capacidad de almacenamiento, a un coste por mega reducido, redundado, de rendimiento prácticamente lineal (más máquinas = más rendimiento) y sobre el que ejecutar de forma pararela todo tipo de aplicaciones que se puedan implementar como MapReduce.

Con estos datos, hay una utilidad práctica que viene inmediatamente a la mente: tratamiento de logs. Veamos:

  • Me gustaría decir lo contrario, pero personalmente no conozco a nadie que maneje el dinero destinado a proyectos que no ponga pegas a una inversión fuerte para el tratamiento de logs. El coste por mega de Hadoop es difícil de superar.
  • Si combinamos Hadoop, por ejemplo para datos con hasta un año de antiguedad, con un soporte en cinta para el resto, lo tendremos fácil para cumplir cualquier legislación en lo relacionado a retención de datos.
  • Sin llegar a los niveles de Facebook o Rackspace, es probable que el volumen de logs que guardemos durante un año sea lo suficientemente grande como para hacer difícil su tratamiento si un juez (por ejemplo) pide todos los correos desde la cuenta x a la cuenta z de los últimos siete meses, o todos los acceso al foro web de fulanito. MapReduce y Hadoop permiten ejecutar este tipo de tareas sin mayores problemas, y con un rendimiento prácticamente lineal.

En este entorno, planteo un ejemplo de tratamiento de logs de Postfix de lo más simple, con un filtro de contenido en modo post-queue (Amavis, por ejemplo, en su uso más habitual). Vamos, nada nuevo.

El objetivo sería, además de los puntos mencionados anteriormente, poder realizar cualquier tipo de consulta sobre los logs de meses (o años), en un tiempo razonable. Y digo tiempo razonable porque en este tipo de consultas no suele hacer falta tener el resultado al momento. Se acepta esperar durante 1,5 o 10 minutos hasta que se genere el resultado.

Resumiendo, tenemos una infraestructura parecida a esta:

Infraestructura Hadoop y Amavis

Todos los servidores envían los logs de manera centralizada a un syslog remoto, que además de hacer una copia diaria a cinta, almacena la información en Hadoop, usando alguno de los muchos interfaces que se ofrecen, como pueden ser FUSE o FTP.

Junto a esto tendremos una máquina, preferiblemente un interfaz web, para generar trabajos MapReduce. Básicamente necesitaremos un formulario (cuenta origen, destino, fechas, ...), y algo de control sobre los trabajos que ya se han realizado para mostrar los resultados.

Streaming

Se pueden crear trabajos MapReduce para Hadoop de varias formas, pero en este ejemplo me voy a limitar a lo que se llama "Streaming". Dicho de otra forma (aunque no sea del todo cierto), a programar aplicaciones MapReduce en perl o python (o en cualquier lenguaje que pueda leer y escribir en la entrada y salida estándar).

La gran ventaja que tiene el Streaming es que permite desarrollar prototipos de una forma rápida, aunque no sea lo más eficiente. En cualquier caso, sobra para este ejemplo.

La mejor forma de explicar lo que es el Streaming en Hadoop es ver el equivalente, en versión linux:

  cat datos_entrada.txt | mapper.pl | sort | reducer.pl

El mapper.pl recibirá los datos desde los ficheros de log, línea por línea, y los tratará, escribiendo por salida estándar con el formato clave "tabulador" valor.

Esta salida se ordena, y pasa a un reducer. Hadoop garantiza que todas las claves serán tratadas por el mismo reducer, aunque éste puede, y normalmente así lo hará, procesar varias claves (este es un buen momento para recordar que los ficheros de entrada pueden ser, por decir algo, 200GB de datos, sin que suponga ningún problema para Hadoop).

Volviendo al ejemplo, veamos unas cuantas líneas de log de postfix:

  Feb 20 14:27:03 server postfix/smtpd[17875]: E78D13073B2: client=mail.example.com[10.0.0.6]
  Feb 20 14:27:03 server postfix/cleanup[17822]: E78D13073B2: message-id=
  Feb 20 14:27:04 server postfix/qmgr[5370]: E78D13073B2: from=, size=20827, nrcpt=1 (queue active)
  Feb 20 14:27:04 server postfix/smtp[17980]: E78D13073B2: to=, relay=127.0.0.1[127.0.0.1]:10025, delay=0.6, delays=0.08/0/0/0.51, dsn=2.0.0, status=sent (250 Ok: queued as 6A737307316)
  Feb 20 14:27:04 server postfix/qmgr[5370]: E78D13073B2: removed

  Feb 20 14:27:04 server postfix/smtpd[17882]: 6A737307316: client=localhost[127.0.0.1]
  Feb 20 14:27:04 server postfix/cleanup[17817]: 6A737307316: message-id=
  Feb 20 14:27:04 server postfix/qmgr[5370]: 6A737307316: from=, size=21719, nrcpt=1 (queue active)
  Feb 20 14:27:04 server postfix/pipe[16491]: 6A737307316: to=, relay=maildrop, delay=0.16, delays=0.11/0.01/0/0.04, dsn=2.0.0, status=sent (delivered via maildrop service)
  Feb 20 14:27:04 server postfix/qmgr[5370]: 6A737307316: removed

Esta es la información que postfix loguea al recibir y entregar un mensaje usando un filtro after-queue. He ordenado los logs para que sea más claro, pero en la realidad no es así.

En este ejemplo se recibe un correo desde mail.example.com, y se le asigna el queue-id E78D13073B2. También se loguea el message-id, el origen y el tamaño.

Una vez se ha encolado el mensaje, se envía a Amavis, a través de 127.0.0.1:10025. Como se puede ver en el ejemplo, en este punto sabemos el destino del mensaje, y también si fue aceptado o no. Lo sabemos porque el log hace referencia al queue id que amavis reinjecta en postfix. De no haberlo aceptado no tendríamos un queue id en el log.

Este nuevo mensaje mantiene el message-id, el tamaño es algo mayor, y termina entregandose, en este caso en la máquina local, usando la aplicación courier-maildrop.

Bien, esto no es más que un ejemplo. En la realidad hay que tratar muchos casos: aliases, reenvíos a máquinas remotas, bounces, mensajes que pasan a deferred por no haber podido entregarse, ....

Además, e insistiendo en que esto no es más que un ejemplo, voy a asumir dos cosas, que aunque se cumplen a menudo, no son ciertas:

  • Que el queue-id es único.
  • Que el message-id es único.

Mapper.pl

El proceso comienza con Hadoop enviando los ficheros de entrada (logs) a la entrada estándar de los mapper.pl que estemos ejecutando en el cluster, que tratarán cada línea y la enviarán por salida estándar con el formato:

  queueidinformacion_separada_por_comas

Para separar la clave del valor se usa un tabulador. Normalmente, los campos del valor se separan con espacios, pero en este ejemplo he preferido las "comas".

Veamos un trozo del código:

 ..............
 while (<>)
 ..............
 if ($linea =~ /^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) .+ postfix\/smtpd\[\d+\]: (\w+): client=(\S+)$/) {
                 $estructura{fecha} = $1; $estructura{qid} = $2; $estructura{host} = $3;
                 print $estructura{qid} . "\t" . "Conexion," . join (",",$estructura{fecha},$estructura{host}) . "\n";
         }
          elsif ($linea =~ /^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) .+ postfix\/smtp\[\d+\]: (\w+): to\=\<([^ ]+)\>, relay=127.0.0.1.*status=(.*) (\w+)\)$/) {

                 $estructura{fecha} = $1; $estructura{qid} = $2; $estructura{to} = $3; $estructura{origto} = $3; $estructura{status} = $4; $estructura{relay} = "Antispam"; $estructura{rqid} = $5; $estructura{status} =~ s/,//g;
                 print $estructura{qid} . "\t" . "Antispam," . join (",",$estructura{fecha},$estructura{to},$estructura{to},$estructura{relay},$estructura{status},$estructura{rqid}) . "\n";
         }
 ...........
 }

Tenemos varios "if/elsif", que adaptan la línea del log de entrada al formato que quiero. Además, he añadido un texto con el tipo de entrada de log que estamos tratando, y que sabemos por el "if" en el que estamos (Antispam en este ejemplo), para luego usarlo en el reducer.

El código completo incluye también la detección de los mensajes que Amavis considera que son spam, en los que pone el rqid como 0. Además, en aquellos casos donde "no proceda" aplicar el concepto de rqid pongo -1 como valor. Todo esto servirá en el reducer.

Volviendo a los logs de ejemplo, el resultado de ejecutar el mapper sobre ellos es:

  E78D13073B2     Conexion,Feb 20 14:27:03,mail.example.com[10.0.0.6]
  E78D13073B2     Msgid,Jan 20 06:27:03,F0E59D47X9B343A7DFRAB1F0E45@EUW0600375
  E78D13073B2     Origen,Feb 20 14:27:04,prueba@example.com,20827
  E78D13073B2     Antispam,Feb 20 14:27:04,destino@example.org,destino@example.org,Antispam,sent (250 Ok: queued as,6A737307316)
  E78D13073B2     Cierre,Feb 20 14:27:04
  6A737307316     Conexion,Feb 20 14:27:04,localhost[127.0.0.1]
  6A737307316     Msgid,Feb 20 14:27:04,F0E59D47X9B343A7DFRAB1F0E45@EUW0600375
  6A737307316     Origen,Feb 20 14:27:04,prueba@example.com,21719
  6A737307316     Maildrop,Feb 20 14:27:04,destino@example.org,destino@example.org,Maildrop,sent (delivered via maildrop service),-1
  6A737307316     Cierre,Feb 20 14:27:04

Después de esto Hadoop ejecutaría un sort (en el ejemplo ya están ordenados por queue-id), y volvería a enviar el resultado a los reducer.pl del cluster. Por lo tanto, sabemos que un reducer va a recibir todas las líneas de log correspondientes a un mismo queue-id.

Reducer.pl

Pasamos directamente a ver algo del código del reducer.pl:

 my %mensaje = (qid => "",
             fecha => "",
             servidor => "",
             msgid => "",
             origen => "",
             size => "",
             to => "",
             origto => "",
             rqid => "",
             relay => "",
             status => "",
             cierre => ""
             );

 ..............
 while () {
     chomp;
     ($qid, $valor) = split /\t/;

         if ($qid ne $qidanterior) {
                 # tenemos un queueid nuevo
                 if ($qidanterior ne "") {
                         &mostrar_datos();

                         %mensaje = (qid => "",
                                 fecha => "",
                                 desdeip => "",
                                 msgid => "",
                                 origen => "",
                                 size => "",
                                 to => "",
                                 origto => "",
                                 rqid => "",
                                 status => "",
                                 cierre => ""
                         );
                 }
                 $qidanterior = $qid;
         }

         # seguimos procesando los datos del mismo qid
         $mensaje{qid} = $qid;
         @datos = split(",",$valor);

         switch ($datos[0]) {
                 case "Conexion" {
                         $mensaje{fecha} = $datos[1];
                         $mensaje{desdeip} = $datos[2];
                 }
                 case "Msgid" {
                         $mensaje{msgid} = $datos[2];
                 }
                 case "Origen" {
                         $mensaje{origen} = $datos[2];
                         $mensaje{size} = $datos[3];
                 }
                 case "Antispam" {
                         $mensaje{to} .= $datos[2] . "-separador-";
                         $mensaje{origto} .= $datos[3] . "-separador-";
                         $mensaje{status} .= $datos[1] . " " .$datos[2] . " " . $datos[3] . " " . $datos[4] . " " . $datos[5] . "-separador-";
                         $mensaje{rqid} = $datos[6];
                 }

 ................
 sub mostrar_datos {
      ................
      if ( ($mensaje{fecha} ne "") && ($mensaje{cierre} ne "") ) {
           # mensaje completo: Salida estandar
           # hay que revisar que se haya pasado por todas las fases
           # y realizar comprobaciones, que aqui no se hacen
           $fechaentrada = `date -u -d '$mensaje{fecha}' '+%s'`; chomp $fechaentrada;
           $fechasalida = `date -u -d '$mensaje{cierre}' '+%s'`; chomp $fechasalida;

           $datoscodificables = $mensaje{desdeip} .",".$mensaje{size}.",".$fechasalida.",".$mensaje{qid}.",".$mensaje{rqid}.",".$mensaje{status};
           $resto = encode_base64($datoscodificables,"");

           print $mensaje{msgid} . "\t" . $fechaentrada . " " . $mensaje{origen} . " " . $mensaje{to} . " " . $mensaje{origto} . $resto . "\n";

     } else {
           # mensaje incompleto. Guardar en otro fichero para poder seguir el tracking
           # Falta por hacer
     }

 }

Por lo tanto, cogemos cada línea de entrada, y ayudándonos de la descripción del tipo de línea que hemos añadido en el mapper, vamos completando la estructura de datos que después será el resultado (por fin) que queremos.

Un ejemplo de la ejecución de este trabajo en mi cluster de prueba:

  $ hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-0.20.2-streaming.jar -input /user/hadoop/mail_logs -output /user/hadoop/logs_formateados/domingo_14_03 -file /home/hadoop/mapper.pl -file /home/hadoop/reducer.pl -mapper "/usr/bin/perl -w mapper.pl" -reducer "/usr/bin/perl -w reducer.pl"

  packageJobJar: [/home/hadoop/mapper.pl, /home/hadoop/reducer.pl, /home/hadoop/hadoop-unjar4902963819550178151/] [] /tmp/streamjob534817239215339193.jar tmpDir=null
  10/03/18 22:20:02 INFO mapred.FileInputFormat: Total input paths to process : 4
  10/03/18 22:20:02 INFO streaming.StreamJob: getLocalDirs(): [/home/hadoop/mapred/local]
  10/03/18 22:20:02 INFO streaming.StreamJob: Running job: job_201003182146_0001
  10/03/18 22:20:02 INFO streaming.StreamJob: To kill this job, run:
  10/03/18 22:20:02 INFO streaming.StreamJob: /usr/local/hadoop/bin/hadoop job  -Dmapred.job.tracker=fn140:8021 -kill job_201003182146_0001
  10/03/18 22:20:02 INFO streaming.StreamJob: Tracking URL: http://fn140.forondarena.net:50030/jobdetails.jsp?jobid=job_201003182146_0001
  10/03/18 22:20:03 INFO streaming.StreamJob:  map 0%  reduce 0%
  10/03/18 22:20:17 INFO streaming.StreamJob:  map 8%  reduce 0%
  10/03/18 22:20:18 INFO streaming.StreamJob:  map 11%  reduce 0%
  10/03/18 22:20:20 INFO streaming.StreamJob:  map 15%  reduce 0%
  .....
  10/03/18 22:21:56 INFO streaming.StreamJob:  map 92%  reduce 25%
  10/03/18 22:21:59 INFO streaming.StreamJob:  map 100%  reduce 25%
  ....
  10/03/18 23:14:13 INFO streaming.StreamJob:  map 100%  reduce 98%
  10/03/18 23:15:55 INFO streaming.StreamJob:  map 100%  reduce 99%
  10/03/18 23:17:31 INFO streaming.StreamJob:  map 100%  reduce 100%
  10/03/18 23:18:25 INFO streaming.StreamJob: Job complete: job_201003182146_0001
  10/03/18 23:18:25 INFO streaming.StreamJob: Output: /user/hadoop/logs_formateados/domingo_14_03

El resultado es un fichero en la ruta "/user/hadoop/logs_formateados/domingo_14_03", con líneas formadas por un message-id, seguido por un tabulador, seguido del origen, el destino, el destino antes de la expansión de aliases, y del resto de la información, codificada en base64, todo separado por espacios.

No hace falta decir que el formato de salida en un entorno real se debería adaptar a lo que se necesitase. De hecho, Rackspace usa una estructura binaria, donde entre otras cosas se guarda la información de los diferentes saltos que hace un correo dentro de su sistema. Este ejemplo, por supuesto, es mucho más sencillo.

Al final del proceso hemos obtenido un fichero, que es el que de verdad queremos en nuestro cluster, con toda la información de cada mensaje que ha entrado en nuestra red en un periodo de tiempo concreto (digamos que cada 4 horas volcamos los logs de syslog a Hadoop y ejecutamos la tarea), como se ve en los siguientes ejemplos (sin la codificación en base64, para que sean más fáciles de seguir):

Mensaje que se entrega en un buzón local:

  ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org 1268225082 origen@example.org destino@example.com destino@example.com mail.example.org[10.0.2.34],2430,1268225085,B6A6A296E55,158AB296F24,Mar 10 12:44:45 destino@example.com destino@example.com Antispam sent (250 OK sent queued as 158AB296F24

  ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org 1268225085 origen@example.org destino@example.com destino@example.com localhost[127.0.0.1],3081,1268225085,158AB296F24,-1,Mar 10 12:44:45 destino@example.com destino@example.com Maildrop sent (delivered via maildrop service)

Por lo tanto:

  • El mensaje con id ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org
  • Entró en el sistema a las 1268225082
  • Desde el servidor mail.example.org[10.0.2.34]
  • El origen era origen@example.org
  • Y el destino destino@example.com (no es un alias)
  • Una vez pasado por el sistema antispam se reencola con queueid 158AB296F24
  • El mensaje de la entrega al servidor antispam es: sent (250 OK sent queued as 158AB296F24)
  • Una vez ha pasado el filtro antispam el mensaje se reencola a las 1268225085
  • Por supuesto desde el servidor localhost[127.0.0.1]
  • Y el mensaje de la entrega local es Mar 10 12:44:45 Maildrop sent (delivered via maildrop service), sólo al destino destino@example.com

Mensaje que se entrega a un servidor remoto:

  1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3 1268219047 origen@example.net destino@example.com destino@example.com mail.example.net[10.5.3.2],1901,1268219051,CA6782FABFB,B6C7A2FAACA,Mar 10 11:04:11 destino@example.com destino@example.com Antispam sent (250 OK sent queued as B6C7A2FAACA

  1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3 1268219049 origen@example.net destinoreal@remoto.example.org destino@example.com localhost[127.0.0.1],2686,1268219051,B6C7A2FAACA,-1,Mar 10 11:04:11 destinoreal@remoto.example.org destino@example.com mail.example.org[10.0.2.34]:25 sent (250 2.0.0 Ok: queued as 423872AB862)
  • Su messageid es 1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3
  • Entro en el sistema a las 1268219047
  • Desde el servidor mail.example.net[10.5.3.2]
  • Desde el origen origen@example.net
  • Y al destino destino@example.com, que en la fase antispam todavía no se expande a ningún alias
  • El mensaje se relaciona con B6C7A2FAACA
  • Y el mensaje de la entrega es Mar 10 11:04:11 destino@example.com destino @example.com Antispam sent (250 OK sent queued as B6C7A2FAACA)
  • Se reencola, a través de localhost, a las 1268219049
  • El mensaje pasa a ser la dirección real destinoreal@remoto.example.org, proveniente del alias destino@example.com
  • El mensaje se entrega en mail.example.org[10.0.2.34]:25
  • Con el mensaje sent (250 2.0.0 Ok: queued as 423A72AB262)

Mensaje que es spam:

  9545850218.UWD8KHA190@spammer.example.info 1268238332 spammer@example.net destino@example.com destino@example.com mail.example.info[10.8.2.12],2141,1268238333,F355429B3A5,0,Mar 10 16:25:33 destino@example.com destino@example.com Antispam sent (250 OK sent mensaje_rechazado por spam

En este caso el mensaje es spam, y no se relaciona con ningún otro queueid.

Mensaje que se entrega a varias cuentas:

  2010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com mail.example.org[10.0.2.34],15853,1268223532,5A4FE29B3B6,43A9B28B357,Mar 10 12:18:52 destinovarios@example.com destinovarios@example.com Antispam sent (250 OK sent queued as 43A9B28B357

  2010031011.AB263566.pc1@mailer.example.org 1268223532 usuario@example.org destinolocal@example.com-separador-destinoremoto@example.info destinovarios@example.com-separador-destinovarios@example.com localhost[127.0.0.1],16525,1268223532,43A9B28B357,-1,Mar 10 12:18:52 destinolocal@example.com destinovarios@example.com Maildrop sent (delivered via maildrop service)-separador-Mar 10 12:18:52 destinoremoto@example.info destinovarios@example.com mail.example.info[10.8.2.12]:25 sent (250 2.0.0 Ok: queued as 452362CA51A)
  • Ahora tenemos un mensaje con id 2010031011.AB263566.pc1@mailer.example.org
  • Que se envía desde usuario@example.org
  • Al destino destinovarios@example.com
  • Que a su vez es un alias a dos cuentas destinolocal@example.com y destinoremoto@example.info
  • Los mensajes de entrega a cada una de ellas son:
    • Mar 10 12:18:52 destinolocal@example.com Maildrop sent (delivered via maildrop service)
    • Mar 10 12:18:52 destinoremoto@example.info mail.example.info[10.8.2.12]:25 sent (250 2.0.0 Ok: queued as 452362CA51A)

Como se puede ver, ya tenemos toda la información necesaria, debidamente preformateada, por lo que ya no necesitamos los ficheros mail.log, que podríamos borrar de Hadoop (que no de las cintas).

La información contenida en estos ficheros nos servirá para preparar consultas, bien directamente con otros trabajos MapReduce, bien con otras herramientas, tal y como mostraré en el siguiente post de la serie, en el que nos aprovecharemos del formato de los ficheros para hacer consultas por message-id, cuenta origen, fecha o cuenta destino (según entró en el sistema o una vez expandidos los aliases).

En este punto hay que volver a recordar que Hadoop está pensado para trabajar con cantidades muy grandes de información, en los que hacer un simple "grep" o usar una base de datos relacional estándar resulta impracticable.

Por último, tened en cuenta que el código de este post, además de ser sólo un ejemplo, es completamente Alpha. Aparte de no funcionar del todo bien (tiene varios fallos que he detectado pero que no he corregido todavía), necesitaría tener en cuenta muchos más casos, como mensajes cuya entrega se retrasa durante días (que no deja de ser algo normal en los sistemas de correo reales).

En el cuarto y último post de la serie: acceso a Hadoop a través de una herramienta de apoyo, como pig.


Comments