Systemtap para detectar la actividad de ficheros de un proceso

lun, 18 jul 2011 by Foron
Update: La versión de github del script ya es capaz de monitorizar n pids en ejecución y n programas que todavía no lo estén.

Para retomar un poco el blog, y como hace mucho que no escribo nada sobre SystemTap, me he planteado otro sencillo ejercicio "de repaso".

La verdad es que en la actualidad ya hay un buen montón de artículos, documentación y ejemplos sobre lo que se puede hacer con SystemTap, ya sea para la inspección del kernel como para la de aplicaciones tipo mysql, postgresql o python. El rpm de Scientific Linux 6, por ejemplo, incluye más de 80 ejemplos de todo tipo, desde el análisis de procesos hasta el del tráfico de red.

Aún así, como digo, aquí va un ejemplo más.

El problema

Vamos a suponer que tenemos un proceso ejecutando en una máquina, y que queremos vigilar los ficheros que va abriendo.

Para ir un poco más lejos, queremos que se muestren también los ficheros abierto por los hijos de ese proceso principal (sólo un nivel por ahora).

Y ya puestos, queremos poder limitar la monitorización a los ficheros de ciertas rutas. Por ejemplo, podemos querer mostrar sólo los ficheros que se abrán en /etc, o en /lib.

Vamos, que el ejercicio es prácticamente una extensión de uno de los scripts de un post anterior de este mismo blog. Pero que le voy a hacer, sólo se me ocurren ejercicios sobre ficheros.

Resumiendo, y poniendo un ejemplo, tenemos un servidor postfix, con sus habituales procesos hijo:

  master─┬─pickup
         │─qmgr
         └─tlsmgr

Queremos saber que ficheros abren todos los procesos que cuelgan de master, ya sean los tres que se están ejecutando en este momento, ya sean los nuevos que se vayan creando. Además, queremos poder limitar el número de ficheros vigilados a una serie de rutas determinadas.

La solución

El núcleo de la solución está en dos "probes":

  • syscall.open.return
  • syscall.close.return

O sea, las llamadas al sistema open y close. Más concretamente, al momento en el que terminan de ejecutarse (return).

Además, vamos a usar el probe "begin" para leer argumentos, preparar variables y demás parafernalia.

Comenzamos:

  probe begin {
          procesoArgumento = $1
          printf ("El pid que vamos a tratar es %d\n", procesoArgumento);
          if (argc > 1) {
                  for (i=2; i <= argc; i++) {
                          ficherosEnPath[argv[i]] = argv[i]
                          numeroPaths += 1
                  }
          }
  }

Poca explicación hace falta. En $1 tenemos el primer argumento, que va a ser el pid a vigilar. A partir del segundo argumento, y de manera opcional, se pueden incluir una serie de rutas a monitorizar. Todas estas son llamadas válidas:

  stap -v monitorFicheros.stp $(pidof master)
  stap -v monitorFicheros.stp $(pidof master) /etc
  stap -v monitorFicheros.stp $(pidof master) /etc /usr/lib

Una vez definido este bloque básico, pasamos a la llamada al sistema "open":

  probe syscall.open.return {
         proceso = pid()
         padre = ppid()
         hilo = tid()
         ejecutable = execname()
         insertarEnTabla = 0

         if ( (procesoArgumento == proceso) || (procesoArgumento == padre) || (procesoArgumento == hilo) ) {
                 if ( (procesoArgumento == proceso) && (env_var("PWD") != "") ) {
                         pwd = env_var("PWD")
                 }
                 localpwd = (isinstr(env_var("PWD"), "/"))?env_var("PWD"):pwd;

                 filename = user_string($filename)
                 descriptor = $return

                 filename = (substr(filename, 0, 1) == "/")?filename:localpwd . "/" . filename;

                 if ([proceso,padre,hilo,descriptor] in tablaProcesos)  {
                         printf ("{codigo: \"error\", proceso: \"%s\", pid: %d, ppid: %d, tid: %d, fichero: \"%s\", descriptor: %d}\n", ejecutable, proceso, padre, hilo, filename, descriptor)
                 } else {
                         if (descriptor >= 0) {
                                 if (numeroPaths > 0 ) {
                                         foreach (ruta in ficherosEnPath) {
                                                 if (substr(filename, 0, strlen(ruta)) == ruta) {
                                                         insertarEnTabla = 1
                                                         break
                                                 }
                                         }
                                 }
                                 if ( (insertarEnTabla == 1) || (numeroPaths == 0) ) {
                                         tablaProcesos[proceso,padre,hilo,descriptor] = gettimeofday_ms()
                                         tablaFicheros[proceso,padre,hilo,descriptor] = filename
                                         printf ("{codigo: \"open\", proceso: \"%s\", pid: %d, ppid: %d, tid: %d, fichero: \"%s\", descriptor: %d, date: %d}\n", ejecutable, proceso, padre, hilo, filename, descriptor, tablaProcesos[proceso,padre,hilo,descriptor])
                                 }
                         }
                 }
         }
  }

A través de las funciones pid(), ppid() y tid() vemos si el proceso que está ejecutando open en este momento es el proceso que nos interesa o un hijo suyo.

Si cumple este requerimiento, pasamos al bloque que revisa si el fichero que se está abriendo está en el path que nos interesa. En este caso, para hacer el ejercicio más completo, he optado por dar unas cuantas vueltas "de más", y he usado la función env_var("PWD") para acceder al entorno del proceso. En la práctica esta no es la mejor forma, y por eso hay más control en el probe, ya que no siempre existe la variable PWD en el entorno de los procesos que llegan a este open.

La llamada al sistema open carga las variables $return (el valor de retorno de la función es el id del descriptor del fichero) y $filename (el nombre del fichero). Ojo! Cada llamada al sistema tiene unos argumentos, y con ello "ofrece" unas variables para nuestros scripts. Por ejemplo, mientras que en open tenemos filename, flags y mode; en close tenemos fd, filp, files, fdt y retval. El propio SystemTap, Google y el código fuente del kernel son ... vuestros amigos.

En cualquier caso, lo importante es que, cuando se dan todas las condiciones, hacemos lo siguiente:

  tablaProcesos[proceso,padre,hilo,descriptor] = gettimeofday_ms()
  tablaFicheros[proceso,padre,hilo,descriptor] = filename
  printf ("{codigo: \"open\", proceso: \"%s\", pid: %d, ppid: %d, tid: %d, fichero: \"%s\", descriptor: %d, date: %d}\n", ejecutable, proceso, padre, hilo, filename, descriptor, tablaProcesos[proceso,padre,hilo,descriptor])

Con el printf escribimos la información del fichero abierto por salida estándar. Las dos tablas (tablaProcesos y tablaFicheros) sirven para guardar el momento en el que se ha abierto el fichero (con una precisión de milisegundos), y el nombre del fichero en sí mismo, con lo que lo tendremos más a mano cuando pasemos a la llamada al sistema close. Para identificar un fichero usamos el índice formado con [el proceso, el padre, el hilo y el descriptor] del fichero que se ha abierto.

Con toda la información en su sitio ya tenemos la primera parte del script. Seguimos:

  probe syscall.close.return {
         proceso = pid()
         padre = ppid()
         hilo = tid()
         descriptor = $fd
         ejecutable = execname()

         if ( ( procesoArgumento == proceso ) || ( procesoArgumento == padre ) || (procesoArgumento == hilo ) ) {
                 if ([proceso,padre,hilo,descriptor] in tablaProcesos) {
                         filename = tablaFicheros[proceso,padre,hilo,descriptor]
                         date = gettimeofday_ms() - tablaProcesos[proceso,padre,hilo,descriptor]
                         printf ("{codigo: \"close\", proceso: \"%s\", pid: %d, ppid: %d, tid: %d, fichero: \"%s\", descriptor: %d, date: %d}\n", ejecutable, proceso, padre, hilo, filename, descriptor, date)
                         delete tablaProcesos[proceso,padre,hilo,descriptor]
                         delete tablaFicheros[proceso,padre,hilo,descriptor]
                 }
         }
  }

Fácil. Si se está haciendo un close de uno de los ficheros monitorizados, se calcula el tiempo que ha estado abierto y se muestra por la salida estándar.

Un ejemplo de ejecución del monitor es el siguiente:

  # stap -v monitorFicheros.stp $(pidof master)
  Pass 1: parsed user script and 76 library script(s) using 96688virt/22448res/2744shr kb, in 120usr/10sys/133real ms.
  Pass 2: analyzed script: 8 probe(s), 22 function(s), 7 embed(s), 8 global(s) using 251840virt/48200res/3952shr kb, in 420usr/120sys/551real ms.
  Pass 3: using cached /root/.systemtap/cache/02/stap_02e370ac4942b188c248e7ec11ac8e2c_19586.c
  Pass 4: using cached /root/.systemtap/cache/02/stap_02e370ac4942b188c248e7ec11ac8e2c_19586.ko
  Pass 5: starting run.

  El pid que vamos a tratar es 1082
  {codigo: "open", proceso: "smtpd", pid: 5817, ppid: 1082, tid: 5817, fichero: "/etc/hosts", descriptor: 12, date: 1310920975000}
  {codigo: "close", proceso: "smtpd", pid: 5817, ppid: 1082, tid: 5817, fichero: "/etc/hosts", descriptor: 12, date: 0}

Sorpresa! Los printf se formatéan como datos json, por si alguna vez quisiera hacer algo con node.js y aplicaciones como log.io.

En cuanto a los "date: 0", se debe a que la precisión de milisegundos es demasiado baja para estos ficheros y para esta prueba sin tráfico de correo de ningún tipo.

El código completo (en este post sólo falta la definición de variables globales) está accesible en github, como siempre.

Limitaciones y trabajo futuro

Como ya he comentado, este script está lejos de ser óptimo. Además de mi propia incapacidad, en algunos momentos he optado por dar más vueltas de las necesarias para usar así más funciones y variables.

Mi primer objetivo era permitir la monitorización de procesos que todavía no estuvieran en ejecución. En esta versión, sin embargo, el proceso principal sí tiene que estar ejecutando antes de lanzar el stap. Esto no es algo técnicamente complicado, pero no se hace solo, y no aporta demasiado desde el punto de vista conceptual al post, así que lo he dejado.

¿Y qué pasa si el pidof da más de un pid? Me habéis pillado. Por ahora sólo trato un pid.

Para el que esté interesado, SystemTap permite guardar la salida estándar en un fichero usando el parámetro -o. A partir de aquí es trivial que otro script perl, python, bash, node.js o lo que sea trabaje con él.

read more

Websockets para administradores de sistemas

sáb, 26 feb 2011 by Foron

Ya lo he escrito en el post anterior: Node.js es un proyecto muy interesante, muy adecuado como backend en muchos entornos, y particularmente en aquellos que dependen en gran medida del rendimiento I/O. Como ejemplo, aquí va uno de los últimos casos de éxito.

En este post, además de volver a usar Node.js y Redis (otro proyecto cada vez más importante), voy a dar cuatro pinceladas sobre una de las nuevas tendencias que van apareciendo en la web: WebSockets.

Para completar el ejercicio, escribiré un mini-proyecto para graficar todo tipo de valores numéricos en un plasmoid de KDE. Bueno, en realidad en cualquier navegador con soporte WebSockets (la lista es reducida, como veremos más adelante). Así, pasaremos a un punto de vista más de administrador de sistemas que de desarrollador web.

¿Qué son los WebSockets?

Internet es cada vez más "web". Las aplicaciones son cada vez más dinámicas, complejas e interactivas. Esto no es malo, obviamente, pero supone que mientras que los usuarios tienen más y más medios para interactuar con las webs, los administradores de estos entornos han visto como lo que antes se podía gestionar sin mayores problemas, ahora necesita todo tipo de caches e "inventos" sólo porque el sistema de votación de videos (por ejemplo) triplica la cantidad de accesos al sitio y a la base de datos.

Estos problemas no son, ni mucho menos, nuevos. Durante los últimos años se ha ido desarrollando mucha tecnología, para mejorar la experiencia de usuario, por supuesto, pero también para que la comunicación cliente/servidor sea más controlable y eficaz. Lo último: WebSockets.

Simplificando mucho, los WS básicamente ofrecen un canal full duplex entre el cliente y el servidor. De esta manera, en lugar de abrir una conexión web tradicional vía ajax (o similar) cada vez que se necesite un dato, el navegador crea un único canal permanente con el servidor, y lo usa tanto para enviar como para recibir datos. Esto último es importante, ya que el servidor puede usar la vía de comunicación activamente, por ejemplo para enviar un broadcast a todos sus clientes, y no sólo como respuesta ante una petición.

WS orientado a la administración de sistemas

Entonces, tenemos una nueva tecnología, muy orientada a la web, y por lo tanto muy fácil de usar. Además, ofrece algo que, aún sin ser tiempo real, es lo suficientemente ágil como para usarse para cosas como la monitorización de sistemas, que es precisamente el ejemplo con el que voy a seguir en este post.

Mi planteamiento inicial era hacer una pequeña aplicación que mostrase los datos de una base de datos rrd. Sin embargo, esto centraba el ejercicio demasiado en torno a la programación del módulo para Node.js, así que he preferido reutilizar los conceptos de otros posts anteriores, limitando el servidor WebSockets al envío de los datos que encuentre en una base de datos Redis.

Para hacerlo más interesante, el cliente será un plasmoid de KDE, aunque no deja de ser una página web normal y corriente, teniendo en cuenta que voy a usar el motor WebKit para su implementación (se pueden programar plasmoids en C, python, javascript, ...)

Las limitaciones

El protocolo WebSockets todavía es un draft del IETF. Ya sólo teniendo esto en cuenta podemos decir que está "verde".

En cuanto a navegadores, se necesita lo último de lo último en versiones (Opera, Firefox, Chrome, Safari...). Además, en el caso de Firefox (y Opera), el soporte, aunque presente, está deshabilitado por algunas lagunas en torno a la seguridad del protocolo cuando interactúa con un proxy. Ahora bien, para activarlo noy hay más que cambiar una clave (network.websocket.override-security-block).

Como ejemplo, para escribir este post he usado Ubuntu 10.10 y Fedora 14, con sus versiones respectivas de KDE, y con soporte para ejecutar plasmoids WebKit (cada distribución tiene su nombre de paquete, así que lo buscáis vosotros :-D ).

En la práctica, si queréis usar WebSockets en una aplicación "de verdad", hoy en día os recomiendo usar algo como socket.io, que es capaz de detectar lo que el navegador soporta, y en función de eso usar WS o simularlo con flash.

Bien, empecemos con la aplicación.

El servidor

Para guardar los datos vamos a usar Redis, con un hash que vamos a llamar "datos", y en el que guardaremos los valores a graficar. Traducido a perl:

  my %datos;
  $datos{todos} = 40;
  $datos{server1} = 10;
  $datos{server2} = 20;
  $datos{server3} = 10;

Esto en Redis:

  redis-cli hset datos todos 40
  redis-cli hset datos server1 10
  redis-cli hset datos server1 20
  redis-cli hset datos server1 10

Para las pruebas he usado un bucle bash:

  while true; do redis-cli hset datos todos $((10 + $RANDOM % 20)); sleep 1; done;
  ...

Existen varias formas de escribir un servidor WebSockets, pero pocas tan fácilies como Node.js y su módulo websocket-server. Tambien vamos a usar el módulo de Redis, así que:

  npm install websocket-server
  npm install hiredis redis

Una vez instalados los componentes, es el momento de escribir las 70 líneas que hacen falta para tener un servidor 100% funcional y capaz de gestionar tantos gráficos como queramos:

  var net = require('net');
  var sys = require("sys");
  var ws = require("websocket-server");
  var redis = require("redis");
  var puertoredis = 6379;
  var hostredis = 'localhost';

  var server = ws.createServer({debug: true});
  var client = redis.createClient(puertoredis,hostredis);
  var conexion;
  var mensaje;

  client.on("error", function (err) {
      sys.log("Redis  error en la conexion a: " + client.host + " en el puerto: " + client.port + " - " + err);
  });

  server.addListener("connection", function(conn){
      var graficomostrado = "";
      var servidores = [];
      sys.log("Conexion abierta: " + conn.id);
      client.hkeys("datos", function (err, replies) {
          replies.forEach(function (reply, i) {
              sys.log("Respuesta: " + reply);
              servidores.push(reply);
          });
          graficomostrado = servidores[0];
          mensaje = {"titulo": graficomostrado, "todos": servidores};
          conn.send(JSON.stringify(mensaje));
      });
      conn.addListener("message", function(message){
          var dato = JSON.parse(message);
          if (dato) {
              if (dato.orden == "changeGraphic") {
                  graficomostrado = dato.grafico;
                  conn.send(JSON.stringify({changedGraphic: graficomostrado}));
                  client.hget("datos", graficomostrado, function (err, reply) {
                      mensaje = {"inicio":[(new Date()).getTime(), reply]};
                      conn.send(JSON.stringify(mensaje));
                  });
              } else if (dato.orden == "getData") {
                  client.hget("datos", graficomostrado, function (err, reply) {
                      mensaje = {"actualizacion":[(new Date()).getTime(), reply]};
                      conn.send(JSON.stringify(mensaje));
                  });
              }
          }
      });
  });
  server.addListener("error", function(){
      sys.log("Error de algun tipo en la conexion");
  });
  server.addListener("disconnected", function(conn){
      sys.log("Desconectada la sesion " + conn.id);
  });
  server.listen(80);

Así de fácil es. Cuando un cliente establezca una conexión se llamará a:

  server.addListener("connection", function(conn) {....

En este ejemplo, el trabajo fundamental de este bloque es enviar al cliente el listado de datos disponibles vía JSON:

  ....
  mensaje = {"titulo": graficomostrado, "todos": servidores};
  conn.send(JSON.stringify(mensaje));

A partir de aquí, la conexión está establecida. Cada vez que el cliente la use se llamará a:

  conn.addListener("message", function(message) {....

En este caso, el servidor atenderá las peticiones de cambio de gráfico y de actualización de gráfico:

  var dato = JSON.parse(message);
  if (dato.orden == "changeGraphic") {
      graficomostrado = dato.grafico;
      conn.send(JSON.stringify({changedGraphic: graficomostrado}));
      client.hget("datos", graficomostrado, function (err, reply) {
          mensaje = {"inicio":[(new Date()).getTime(), reply]};
          conn.send(JSON.stringify(mensaje));
      });
  } else if (dato.orden == "getData") {
      client.hget("datos", graficomostrado, function (err, reply) {
          mensaje = {"actualizacion":[(new Date()).getTime(), reply]};
          conn.send(JSON.stringify(mensaje));
      });
  }

Y con hacer que el servidor escuche en el puerto 80... ¡A correr!

  server.listen(80);

El cliente

El código del servidor y del cliente está disponible en Github, así que voy a limitar la explicación a lo fundamental, dejando a un lado el uso de Jquery, Jquery-ui o de la excelente librería Highcharts, que he usado a pesar de no ser completamente libre, porque me ha permitido terminar el ejemplo mucho más rápido.

Una aplicación que quiere usar WebSockets debe comprobar si el navegador lo permite:

  if (!("WebSocket" in window))   {
    // No tiene soporte websockets, deshabilitamos botones.
    $('#containergraficos').empty();
    $('#containergraficos').html('

Sin soporte WebSockets.

'); $("#conectar").button({ disabled: true }); } else { // Sí tiene soporte websockets socket = new WebSocket(host); }

A partir de aquí, sólo queda reaccionar ante cada evento:

  socket.onopen = function(){
     .......
  }
  socket.onmessage = function(msg){
     // código íntegro en github
     var resultado = JSON.parse(msg.data);
     if (resultado.inicio) {
        var datos = [];
        datos.push({x: parseInt(resultado.inicio[0]), y: parseInt(resultado.inicio[1])});
        options.series[0].data = datos;
        chart = new Highcharts.Chart(options);
     } else if (resultado.actualizacion) {
        chart.series[0].addPoint({x: parseInt(resultado.actualizacion[0]), y: parseInt(resultado.actualizacion[1])});
     } else if (resultado.changedGraphic) {
        $("#grafico").button("option", "label", resultado.changedGraphic);
     } else if (resultado.titulo) {
        $("#grafico").button("option", "label", resultado.titulo);
        var mensaje = {"orden": "changeGraphic", "grafico": resultado.titulo};
        socket.send(JSON.stringify(mensaje));
     }
  }
  socket.onclose = function(){
     ...
  }
  socket.onerror = function() {
     ...
  }

En definitiva, unos pocos mensajes JSON de un lado para otro del socket.

Además, cada 5 segundos vamos a hacer que Highcharts pida datos nuevos:

  events: {
     load: function() {
        var series = this.series[0];
        setInterval(function() {
           if (socket) {
              var mensaje = {"orden": "getData", "grafico": grafico};
              socket.send(JSON.stringify(mensaje));
           } else {
              var x = (new Date()).getTime();
              series.addPoint([x, 0], true, true);
           }
        }, 5000); //5 segundos
     }
  }

El plasmoid

Evidentemente, hasta ahora no hemos hecho nada que no se pueda ejecutar en un navegador. Ahora lo empaquetaremos en un plasmoid, aunque no deja de ser más que un zip con todo el contenido y un fichero metadata.desktop con la descripción del propio plasmoid, que entre otros inlcuye:

  [Desktop Entry]
  Name=Forondarenanet
  Comment=Pruebas para forondarena.net
  Icon=chronometer
  Type=Service
  X-KDE-ServiceTypes=Plasma/Applet
  X-Plasma-API=webkit
  X-Plasma-MainScript=code/index.html
  X-Plasma-DefaultSize=600,530

KDE tiene una utilidad para probar los plasmoids sin instalarlos, llamada "plasmoidviewer". Una vez probado, se puede instalar con:

  plasmapkg -i forondarenanet.plasmoid

Recordad que el fichero no es más que un zip. En este caso, la estructura es la siguiente:

  ./metadata.desktop
  ./contents
  ./contents/code
  ./contents/code/css
  ./contents/code/css/images
  ./contents/code/css/images/ui-bg_gloss-wave_16_121212_500x100.png
  ...
  ./contents/code/css/images/ui-icons_cd0a0a_256x240.png
  ./contents/code/css/jquery-ui-1.8.9.custom.css
  ./contents/code/css/forondarenanet.css
  ./contents/code/index.html
  ./contents/code/js
  ./contents/code/js/jquery-1.4.4.min.js
  ./contents/code/js/highcharts.js
  ./contents/code/js/forondarenanet.js
  ./contents/code/js/jquery-ui-1.8.9.custom.min.js
  ./contents/code/img

Y poco más. Para terminar con un ejemplo, cuatro plasmoids mostrando gráficos diferentes:

Pantallazo plasmoids

Que por supuesto, sólo generan 4 conexiones, a pesar de estar actualizándose cada 5 segundos:

  # netstat -nt
  Active Internet connections (w/o servers)
  Proto Recv-Q Send-Q Local Address               Foreign Address             State
  tcp        0      0 192.168.10.136:36903        192.168.10.131:80           ESTABLISHED
  tcp        0      0 192.168.10.136:36904        192.168.10.131:80           ESTABLISHED
  tcp        0      0 192.168.10.136:36905        192.168.10.131:80           ESTABLISHED
  tcp        0      0 192.168.10.136:36906        192.168.10.131:80           ESTABLISHED
read more

Servidor pop-before-smtp en 67 líneas con nodejs y redis

mié, 01 dic 2010 by Foron

Llevaba tiempo, la verdad, queriendo escribir un post sobre una de esas nuevas herramientas que aparecen de vez en cuando, que prometen un montón de cosas, pero que en realidad nunca sabemos hasta dónde van a llegar. En este caso me estoy refiriendo a node.js.

Buscando algo que programar se me ha ocurrido escribir un sencillo servidor popbeforesmtp con node.js. La verdad es que resulta algo paradógico usar algo tan nuevo para esto, cuando pbs (vamos abreviando) es uno de los sistemas de autentificación más odiados del mundo del correo electrónico, propio de una época en la que el spam casi ni existía (buff!!). Como decía, no es más que una excusa para hacer algo "práctico" con node.js.

Los tres lectores de mi blog saben lo que es pbs, pero probablemente no hayan oido hablar sobre node.js. A ver si sacamos algo en claro de todo esto.

El problema

Digamos que tenemos un sistema con unos cuantos servidores SMTP que usamos para enviar correo. Aunque normalmente usamos algún tipo de validación ESMTP, tenemos que mantener la compatibilidad con algunos clientes de correo que han venido enviando correo sin ningún problema sólo a través de validación POP.

Pop before smtp permite que un cliente de correo pueda enviar un mensaje una vez se haya validado vía POP. El servidor SMTP, por lo tanto, debe ser capaz de guardar un histórico de conexiones y de permitir envíos, normalmente durante unos pocos minutos, sin pedir ningún otro tipo de credencial.

Ya tenemos todos los elementos que necesitamos sobre la mesa. Veamos:

  • Un servidor SMTP sobre el que implementar "el invento".
  • Un servidor POP desde el que obtener la información de validación.
  • Una aplicación que relacione ambos mundos, y que sea capaz de funcionar tanto en una infraestructura de una máquina como en clusters de varios servidores POP y SMTP (con todo lo que esto significa).

Un servidor SMTP

Actualmente tenemos en el mercado open source cuatro tres servidores SMTP que podemos usar:

  • Postfix
  • Exim
  • Sendmail
  • Qmail

Cada uno usa sus propios mecanismos de auntentificación. Para este ejemplo me voy a centrar en Postfix.

Postfix tiene muchos controles "nativos" para permitir o denegar envíos de correo. Se pueden establecer políticas de acceso en base a IPs origen, cuentas origen, destino, helo, contenido, .... De hecho, también se pueden escribir reglas compuestas que combinen las anteriores.

Sin embargo, para pbs necesitamos mantener un listado de IPs para las que se permiten envíos(fácil), pero que expiren cada x minutos.

Para estos casos más elaborados Postfix ofrece lo que se llama access policy delegation, y que no es más que un sencillo API para conectarse con una aplicación externa que se encargará de tomar las decisiones de validación.

Dicho de otra forma, Postfix enviará a un socket (UNIX o TCP) la información de la sesión SMTP que se quiere comprobar, y recibirá por ese mismo socket el "OK" (o no) correspondiente.

El software que vamos a escribir, por lo tanto, va a recibir esto por un socket:

  request=smtpd_access_policy
  protocol_state=RCPT
  protocol_name=ESMTP
  client_address=192.168.10.131
  client_name=fn131.example.org
  reverse_client_name=fn131.example.org
  helo_name=example.org
  sender=patata@example.org
  recipient=nospam@example.com
  ......

A lo que tiene que responder generalmente o "OK", o "reject", o "dunno". [1]

Un servidor POP

El que más nos guste (Dovecot, Courier, ...). El único requerimiento es que escriba en un log parseable los login de usuarios.

Una aplicación encargada de la política

Aquí está lo interesante de este post. Veamos lo que necesitamos:

  • Una aplicación, en nuestro caso un servidor TCP, que reciba las conexiones desde Postfix y entregue las respuestas.
  • Una base de datos de algún tipo que almacene la información. A ser posible, debe poder distribuirse entre muchos servidores SMTP de forma transparente.
  • Un procesador de logs POP capaz de insertar registros en la base de datos que usará la aplicación.

Empecemos por la base de datos. Estamos hablando básicamente de "algo" sencillo, en lo que guardar un listado de IPs, y que además auto-expire los registros pasados unos minutos. Además, debe ser lo más rápida posible y permitir un gran número de conexiones simultáneas.

Vaya, si estamos hablando de....Memcached.

Pros:

  • Estable. Muchos años en entornos muy importantes.
  • Perfecta para guardar claves (IPs) y valores (cualquier cosa en este caso. El rcpt nos podría venir bien).
  • Expira automáticamente los registros antiguos.
  • Tiene interfaces perl, python, ....
  • Rápida.

Cons:

  • ¿Cómo usamos una base de datos común en un cluster?
  • ¿Perdemos la información de logins ante un fallo o reinicio de la aplicación?

Por lo tanto, tenemos dos problemas. Memcache no está especialmente pensado para funcionar en "modo cluster", sincronizando su contenido en n servidores; y además pierde el contenido cuando se reinicia. O sea, si tenemos que reiniciar Memcached perdemos los datos de usuarios que ya se han validado (aunque sólo sea cosa de una media hora).

Obviamente, las ventajas superan a los inconvenientes. Sin embargo, en el "mercado" tenemos una aplicación que, además de tener los pros, supera los cons: Redis, que permite un modelo master/slave, y además permite volcar periódicamente la información de memoria a disco para su recuperación en caso de caida. Es más, permite tener una especie de redo log en disco.

Bien, ya tenemos el almacén de información. ¿Cómo añadimos los datos? Usaremos cualquier aplicación de las muchas que permiten ejecutar una acción en función de haber encontrado un patrón en un log. Un ejemplo en perl es mailwatch, pero se puede usar cualquier otra. En el fondo, lo único que hay que hacer es conectarse al nodo master de Redis, y después ejecutar el comando "SETEX <host_validado> <segundos> <rcpt>".

Bien, sólo nos queda la aplicación. Necesitamos que:

  • Escuche en un puerto TCP.
  • Esté muy orientado a eventos.
  • Tenga un muy buen rendimiento.
  • Sea sencillo de programar.

Y aquí es donde aparece node.js.

node.js

Hay algo indudable en la informática de hoy en día: Cada vez está más orientada a la web. En este entorno se usan muchos lenguajes, desde php a .net; pero si hay uno que ha usado "casi" todo el mundo, ese es javascript.

Node.js es una forma de llevar javascript fuera del contexto de los navegadores web. Básicamente es una especie de wrapper para V8, el compilador de javascript de Google. Y esto es algo muy importante, porque estamos hablando de uno de los compiladores más avanzados que hay hoy en día. Evidentemente, node.js no consigue el rendimiento de C, pero supera con cierta facilidad a perl o python, todo además sin perder la familiaridad y versatilidad de javascript.

Node.js, además de usar V8, implementa algunas de las funcionalidades que hacen falta para ejecutarse fuera de un navegador, como la manipulación de datos binarios.

La otra característica de node es que está completamente orientado a eventos, y por lo tanto necesita un cierto cambio de chip por parte del programador. En node.js casi nada es síncrono: Cuando queremos acceder a un fichero, por ejemplo, en lugar de esperar a que el disco se sitúe en la posición adecuada y lea los datos, seguimos con la ejecución del programa y con el resto de eventos, hasta que los datos estén disponibles. Según Yahoo, gracias a este modelo un único procesador Xeon a 2.5GHz ejecutando un proxy inverso puede servir cerca de 2100 peticiones por segundo [2].

Dejo la instalación de node.js (y de Redis, y de mailgraph, ...) para el lector. Veamos el código de nuestro servidor:

Empezamos por la configuración de Postfix, en /etc/postfix/main.cf:

  smtpd_recipient_restrictions = .... check_policy_service inet:127.0.0.1:10000 ....

Nuestro servidor va a escuchar en localhost y en el puerto 10000.

Y ahora vamos con el código, en sus 67 líneas, aunque para ser correctos, falta la mayoría de la gestión de errores:

 var net = require('net');
 var sys = require("sys");
 var redis = require("redis");

 var puertoplcy = 10000;
 var hostplcy = 'localhost';

 var puertoredis = 6379;
 var hostredis = 'localhost';

Hemos definido las variables y las librerías que vamos a usar, incluyendo una "third party" para acceso a Redis que hemos instalado con (npm install redis).

 var server = net.createServer(function (stream) {

         var client = redis.createClient(puertoredis,hostredis);

         client.on("error", function (err) {
                 sys.log("Redis  error en la conexion a: " + client.host + " en el puerto: " + client.port + " - " + err);
         });

         stream.setEncoding('utf8');
         stream.setNoDelay(true);

         stream.on('connect', function () {
                 sys.log("Conexion nueva.");
         });

         stream.on('data', function (data) {
                 sys.log("Correo entrante:\n" + data);
                 var clientaddr, tmpclientaddr;
                 var rcpt, tmprcpt;

                 var datos = data.split('\n');
                 for (var i = 0; i < datos.length; i++ ) {
                         if (datos[i] && datos[i].match(/client_address=/) ) {
                                 tmpclientaddr = datos[i].split("=");
                                 clientaddr = tmpclientaddr[1].trim();
                                 sys.log("Extraido el client address: " + clientaddr);
                         } else if (datos[i] && datos[i].match(/recipient=/) ) {
                                 tmprcpt = datos[i].split("=");
                                 rcpt = tmprcpt[1].trim();
                                 sys.log("Extraido el rcpt to: " + rcpt);
                         }
                 }

                 client.get(clientaddr, function (err, reply) {
                         if (err) {
                                 sys.log("Get error: " + err);
                         }

                         if (reply) {
                                 sys.log("Se ha encontrado el host: " + clientaddr + " que ha usado: " + reply);
                                 stream.write('action=ok\n\n');
                         } else {
                                 sys.log("El host: " + clientaddr + " con el usuario: " + rcpt + " no se ha validado");
                                 stream.write('action=reject 550 Relay denegado.\n\n');
                         }
                 });
         });

         stream.on('end', function () {
                 sys.log("Fin de la conexion");
                 stream.end();
         });

 });

Este es el servidor, con un poco de logueo por consola que en condiciones normales se debería quitar. Hemos definido eventos para conexiones nuevas, para cuando entran datos nuevos, ...; tenemos un callback para cuando leemos datos de Redis, .....

Al final, si tenemos el dato devolvemos un "action=ok\n\n", y si no lo tenemos pasamos un "action=reject 550 Relay denegado.\n\n", aunque esto puede variar en función de la configuración de Postfix.

Y por último, hacemos que el servidor escuche en el puerto 10000 de localhost:

  server.listen(puertoplcy, hostplcy);

Y ya está. Todo esto lo guardamos en un fichero (pbs_filter.js por ejemplo), y lo ejecutamos desde una shell:

  /usr/local/bin/node pbs_filter.js

Bueno, para terminar, una imagen del planteamiento:

Esquema ejemplo

En definitiva, node.js es otro más de esos proyectos que aparecen periódicamente y que cogen cierta fuerza. Esto no es nada nuevo; hay muchos proyectos abandonados que tuvieron "su momento", pero creo que tiene ciertas características que lo hacen interesante. El tiempo dirá si tiene futuro.

Notas

[1]postfix tiene muy buena documentación. RTFineM.
[2]Node.js, igual que Memcached y otros, no es multi-hilo. En el mismo enlace de Yahoo hay una interesante discusión sobre el uso de varios cores.
read more

Computación distribuida con Hadoop IV

dom, 30 may 2010 by Foron

Antes de que me adelanten, y sobre todo por las durísimas presiones por parte de las tres personas que leen este blog :-), voy a terminar esta serie de posts, hablando sobre una de las herramientas que se han implementado para dar un mayor nivel de abstracción (o facilidad de uso, como se prefiera decir) a Hadoop.

Hadoop tiene un problema, o al menos un argumento que se usa en su contra: Desarrollar una aplicación MapReduce lleva demasiado tiempo. A pesar de que el Streaming del que hablamos en posts anteriores lo solucione en parte, es cierto que si queremos trabajar de forma "nativa" con el framework tenemos que estar preparados para escribir un buen puñado de líneas de código.

Para dar salida este problema han ido apareciendo distintas alternativas, entre las que destacan Hive (by Facebook) y Pig (by Yahoo). En este post me voy a centrar en esta última, aunque esto no significa, ni mucho menos, que sea mejor que Hive.

Gracias a Pig se pueden escribir "en 4 lineas" aplicaciones MapReduce. Dicho de otra forma, permite que nos centremos en el qué queremos hacer con nuestros datos, y no en el cómo lo hacemos. Se basa en dos componentes:

  • Un sencillo lenguaje para escribir los scripts, llamado "pig latin".
  • El ""compilador"", que trasforma los scripts en aplicaciones MapReduce para su ejecución en el cluster Hadoop.

Junto a esto, Pig tambén ofrece un shell interactivo (se llama grunt), para poder ejecutar comandos.

Bien, terminada la teoría (fácil, ¿no?), vamos a centrarnos en los datos, tal y como hemos dicho antes. (Para el que no haya leido los posts anteriores, este sería un buen momento).

Hasta ahora hemos estado preparando nuestra plataforma de logueo para poder disponer de años, y quizá terabytes, de logs en un mismo lugar, barato y redundado, del que poder sacar información útil en unos pocos minutos. Recordemos:

  1. Los logs de todos nuestros servidores se consolidan en una/varias máquinas syslog (o similar).
  2. Los logs se copian tal cual en Hadoop. A pesar de que vayan a estar replicados, no es mala idea tener una copia extra en cinta.
  3. Ejecutamos un primer script MapReduce que lee los scripts y los adapta al formato que queremos y sobre el que trabajaremos.
  4. Ya no necesitamos los ficheros originales, los podemos borrar de Hadoop. En caso de desastre, siempre podríamos recurrir a las cintas.

¿Cómo guardábamos los logs?

El resultado del reducer.pl son líneas como esta:

  010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com mail.example.org[10.0.2.34],15853,1268223532,5A4FE29B3B6,43A9B28B357,Mar 10 12:18:52 destinovarios@example.com destinovarios@example.com Antispam sent (250 OK sent queued as 43A9B28B357

Recordad que el script guarda mucha información formateada en base64, por lo que en realidad tenemos algo similar a esto:

  2010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com texto_en_base64

¿Por qué?

Porque todas las consultas que se suelen hacer son del tipo:

  • No he recibido un mensaje enviado entre las x y las z del martes, enviado desde esta cuenta, y con destino a esta otra.
  • Quiero todos los mensajes que se recibieron en tal buzón.

La estructura de logs que hemos preparado va a permitir que los scripts sean muy sencillos, sin ningún tipo de join o similar (ahora lo vemos). Dejaremos al interfaz web desde el que se van a ver los resultados el peso de formatear las entradas y hacerlas "visuales".

¿Interfaz web?

Nada mejor que un sencillo interfaz web en el que escribir los parámetros de búsqueda, preparar el script .pig, ejecutarlo, y mostrar los resultados una vez termine. Evidentemente, no voy a escribir aquí el formulario, pero por dar un ejemplo, imaginemos que queremos saber los correos que han llegado desde infojobs entre las 8 a.m. del 11 de marzo, y las 12 a.m. del mismo día.

Empezamos pasando las fechas a timestamp. Muy fácil:

  $ date -u -d 'Thu Mar 11 08:00:00 2010' '+%s'
  1268294400

  $ date -u -d 'Thu Mar 11 12:00:00 2010' '+%s'
  1268308800

Con esto y con la dirección origen ya tenemos todo lo necesario. Ahora lo normal sería escribir un fichero .pig, pero aquí vamos a usar el intérprete de comandos grunt para ver los resultados paso a paso:

  $ pig
  10/05/25 21:57:29 INFO pig.Main: Logging error messages to: /usr/local/pig-0.7.0/pig_1274817449369.log
  2010-05-25 21:57:29,688 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://fn140
  2010-05-25 21:57:30,149 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: fn140:8021
  grunt>

Empezamos cargando los datos:

  grunt> REGISTROS = LOAD '/user/hadoop/logs_formateados' USING PigStorage('\t') AS (msgid:chararray, fecha:long, origen:chararray, destinoreal:chararray, destinoentrada:chararray, resto:chararray);

Con este comando hemos cargado los datos desde el directorio /user/hadoop/logs_formateados en la variable REGISTROS. Los ficheros de entrada se separan con un tabulador [1], con el schema que hemos venido usando. Veamos, antes de seguir, si Pig ha entendido lo que realmente queremos:

  grunt> ILLUSTRATE REGISTROS
  2010-05-25 23:58:36,627 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://fn140
  2010-05-25 23:58:36,681 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: fn140:8021
  2010-05-25 23:58:36,791 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
  2010-05-25 23:58:36,792 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
  --------------------------------------------------------------------------------------------------------------------
  | REGISTROS | msgid: chararray | fecha: long | origen: chararray   | destinoreal: chararray | destinoentrada: chararray | resto: chararray |
  --------------------------------------------------------------------------------------------------------------------
  |           | 6B160A8CF29E064XA067E116FA2A458CAD8A66@GLOBL0310V01.exampleserv.com | 1268304751  | pr1user@example.com | mga@example.net        | mga@example.net           | texto_base64     |
  --------------------------------------------------------------------------------------------------------------------

Ahora que tenemos todos los registros, vamos a filtrarlos en base a los criterios que hemos definido:

  grunt> FILTRADOS = FILTER REGISTROS BY (origen matches '.+infojobs.+') AND (fecha > 1268294400) AND (fecha < 1268301600);

Hecho esto sólo nos queda agruparlos por msgid. Recordad, tenemos dos entradas por mensaje: una para el nivel antispam, y otra para la entrega. En realidad, en base al msgid podríamos seguir un mensaje por tantos saltos como tuviéramos (algo parecido a lo que Mailtrust ha documentado que hace):

  grunt> AGRUPADOS = GROUP FILTRADOS BY msgid;

Y con esto ya tenemos el resultado. Lo podríamos confirmar con el comando DUMP, que muestra los datos por salida estándar, pero lo vamos a guardar directamente en un fichero:

  grunt> STORE AGRUPADOS INTO '/tmp/fichero_resultados.txt' USING PigStorage();
  2010-05-26 22:17:03,942 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_LONG 2 time(s).
  2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.impl.logicalLayer.optimizer.PruneColumns - No column pruned for REGISTROS
  2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.impl.logicalLayer.optimizer.PruneColumns - No map keys pruned for REGISTROS
  2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.ResourceSchema - Insert two-level access to Resource Schema
  2010-05-26 22:17:03,983 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_LONG 2 time(s).
  2010-05-26 22:17:03,985 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: Store(hdfs://fn140/tmp/fichero_resultados.txt:PigStorage) - 1-1007 Operator Key: 1-1007)
  2010-05-26 22:17:03,986 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
  2010-05-26 22:17:03,986 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
  2010-05-26 22:17:04,019 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
  2010-05-26 22:17:05,180 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
  2010-05-26 22:17:05,188 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
  2010-05-26 22:17:05,191 [Thread-120] WARN  org.apache.hadoop.mapred.JobClient - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
  2010-05-26 22:17:05,545 [Thread-120] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
  2010-05-26 22:17:05,546 [Thread-120] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
  2010-05-26 22:17:06,382 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201005262134_0008
  2010-05-26 22:17:06,382 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://fn140:50030/jobdetails.jsp?jobid=job_201005262134_0008
  2010-05-26 22:17:06,384 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
  2010-05-26 22:17:19,214 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 10% complete
  2010-05-26 22:17:20,719 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 20% complete
  ...............
  2010-05-26 22:17:28,241 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 60% complete
  2010-05-26 22:17:41,788 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
  2010-05-26 22:17:41,788 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Successfully stored result in: "hdfs://fn140/tmp/fichero_resultados.txt"
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Records written : 72
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Bytes written : 44727
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Spillable Memory Manager spill count : 0
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Proactive spill count : 0
  2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

Vaya, parece que tenemos 72 usuarios suscritos a infojobs. Veamos un ejemplo de lo que se ha escrito en "/tmp/fichero_resultados.txt" (es un path de HDFS, no el /tmp local). Los saltos de línea los he puesto yo para que sea más visual, y he escrito la versión sin codificar en base64:

  1268304853778.112.POCFQELSVS@[hellboy07]
  {
      (1268304853778.112.POCFQELSVS@[hellboy07],1268308417,aviso_incripciones@push.infojobs.net,usuario@example.com,
        usuario@example.com,localhost[127.0.0.1],11854,1268308418,9ABC12FACB4,-1,
        Mar 11 11:53:38 usuario@example.com usuario@example.com Maildrop sent (delivered via maildrop service))
  ,
      (1268304853778.112.POCFQELSVS@[hellboy07],1268308417,aviso_incripciones@push.infojobs.net,usuario@example.com,
        usuario@example.com,push3.infojobs.net[79.171.25.71],11107,1268308418,3B0213FAECD,9ABC12FACB4,
        Mar 11 11:53:38 usuario@example.com usuario@example.com Antispam sent (250 OK queued as 9ABC12FACB4)
  }

Ahora sólo necesitamos que nuestro interfaz web formatee el resultado y lo saque por pantalla. Fácil y sencillo.

Resumiendo, con estas cuatro líneas podemos hacer cualquier búsqueda en los logs, tengan el tamaño que tengan:

  REGISTROS = LOAD '/user/hadoop/logs_formateados' USING PigStorage('\t') AS (msgid:chararray, fecha:long, origen:chararray, destinoreal:chararray, destinoentrada:chararray, resto:chararray);
  FILTRADOS = FILTER REGISTROS BY (origen matches '.+infojobs.+') AND (fecha > 1268294400) AND (fecha < 1268301600);
  AGRUPADOS = GROUP FILTRADOS BY msgid;
  STORE AGRUPADOS INTO '/tmp/fichero_resultados.txt' USING PigStorage();

Referencias

Hay muchos artículos y documentación disponible en la red sobre Hadoop. En cuanto a bibliografía, que yo sepa hay al menos tres libros disponibles:

  • Pro Hadoop, de la editorial Apress.
  • Hadoop: The Definitive Guide, de la editorial O'Reilly.
  • Hadoop in Action, de la editorial Manning.

Notas

[1]La verdad es que he hecho trampa. El reducer original separaba el id de mensaje del resto de datos con un tabulador, pero luego usaba un espacio simple. Esto es porque al principio tenía pensado trabajar otros aspectos de Hadoop y Pig, como el Chaining y las "user defined functions". Al final, para no tener que escribir más posts, he cambiado el reducer para que separe todos los campos con tabuladores.
read more

Computación distribuida con Hadoop III

dom, 21 mar 2010 by Foron

En este tercer post de la serie dejamos la teoría para ver un caso práctico de aplicación de Hadoop, que completaré en un cuarto post.

Como sabemos, gracias a Hadoop conseguimos un sistema con mucha capacidad de almacenamiento, a un coste por mega reducido, redundado, de rendimiento prácticamente lineal (más máquinas = más rendimiento) y sobre el que ejecutar de forma pararela todo tipo de aplicaciones que se puedan implementar como MapReduce.

Con estos datos, hay una utilidad práctica que viene inmediatamente a la mente: tratamiento de logs. Veamos:

  • Me gustaría decir lo contrario, pero personalmente no conozco a nadie que maneje el dinero destinado a proyectos que no ponga pegas a una inversión fuerte para el tratamiento de logs. El coste por mega de Hadoop es difícil de superar.
  • Si combinamos Hadoop, por ejemplo para datos con hasta un año de antiguedad, con un soporte en cinta para el resto, lo tendremos fácil para cumplir cualquier legislación en lo relacionado a retención de datos.
  • Sin llegar a los niveles de Facebook o Rackspace, es probable que el volumen de logs que guardemos durante un año sea lo suficientemente grande como para hacer difícil su tratamiento si un juez (por ejemplo) pide todos los correos desde la cuenta x a la cuenta z de los últimos siete meses, o todos los acceso al foro web de fulanito. MapReduce y Hadoop permiten ejecutar este tipo de tareas sin mayores problemas, y con un rendimiento prácticamente lineal.

En este entorno, planteo un ejemplo de tratamiento de logs de Postfix de lo más simple, con un filtro de contenido en modo post-queue (Amavis, por ejemplo, en su uso más habitual). Vamos, nada nuevo.

El objetivo sería, además de los puntos mencionados anteriormente, poder realizar cualquier tipo de consulta sobre los logs de meses (o años), en un tiempo razonable. Y digo tiempo razonable porque en este tipo de consultas no suele hacer falta tener el resultado al momento. Se acepta esperar durante 1,5 o 10 minutos hasta que se genere el resultado.

Resumiendo, tenemos una infraestructura parecida a esta:

Infraestructura Hadoop y Amavis

Todos los servidores envían los logs de manera centralizada a un syslog remoto, que además de hacer una copia diaria a cinta, almacena la información en Hadoop, usando alguno de los muchos interfaces que se ofrecen, como pueden ser FUSE o FTP.

Junto a esto tendremos una máquina, preferiblemente un interfaz web, para generar trabajos MapReduce. Básicamente necesitaremos un formulario (cuenta origen, destino, fechas, ...), y algo de control sobre los trabajos que ya se han realizado para mostrar los resultados.

Streaming

Se pueden crear trabajos MapReduce para Hadoop de varias formas, pero en este ejemplo me voy a limitar a lo que se llama "Streaming". Dicho de otra forma (aunque no sea del todo cierto), a programar aplicaciones MapReduce en perl o python (o en cualquier lenguaje que pueda leer y escribir en la entrada y salida estándar).

La gran ventaja que tiene el Streaming es que permite desarrollar prototipos de una forma rápida, aunque no sea lo más eficiente. En cualquier caso, sobra para este ejemplo.

La mejor forma de explicar lo que es el Streaming en Hadoop es ver el equivalente, en versión linux:

  cat datos_entrada.txt | mapper.pl | sort | reducer.pl

El mapper.pl recibirá los datos desde los ficheros de log, línea por línea, y los tratará, escribiendo por salida estándar con el formato clave "tabulador" valor.

Esta salida se ordena, y pasa a un reducer. Hadoop garantiza que todas las claves serán tratadas por el mismo reducer, aunque éste puede, y normalmente así lo hará, procesar varias claves (este es un buen momento para recordar que los ficheros de entrada pueden ser, por decir algo, 200GB de datos, sin que suponga ningún problema para Hadoop).

Volviendo al ejemplo, veamos unas cuantas líneas de log de postfix:

  Feb 20 14:27:03 server postfix/smtpd[17875]: E78D13073B2: client=mail.example.com[10.0.0.6]
  Feb 20 14:27:03 server postfix/cleanup[17822]: E78D13073B2: message-id=
  Feb 20 14:27:04 server postfix/qmgr[5370]: E78D13073B2: from=, size=20827, nrcpt=1 (queue active)
  Feb 20 14:27:04 server postfix/smtp[17980]: E78D13073B2: to=, relay=127.0.0.1[127.0.0.1]:10025, delay=0.6, delays=0.08/0/0/0.51, dsn=2.0.0, status=sent (250 Ok: queued as 6A737307316)
  Feb 20 14:27:04 server postfix/qmgr[5370]: E78D13073B2: removed

  Feb 20 14:27:04 server postfix/smtpd[17882]: 6A737307316: client=localhost[127.0.0.1]
  Feb 20 14:27:04 server postfix/cleanup[17817]: 6A737307316: message-id=
  Feb 20 14:27:04 server postfix/qmgr[5370]: 6A737307316: from=, size=21719, nrcpt=1 (queue active)
  Feb 20 14:27:04 server postfix/pipe[16491]: 6A737307316: to=, relay=maildrop, delay=0.16, delays=0.11/0.01/0/0.04, dsn=2.0.0, status=sent (delivered via maildrop service)
  Feb 20 14:27:04 server postfix/qmgr[5370]: 6A737307316: removed

Esta es la información que postfix loguea al recibir y entregar un mensaje usando un filtro after-queue. He ordenado los logs para que sea más claro, pero en la realidad no es así.

En este ejemplo se recibe un correo desde mail.example.com, y se le asigna el queue-id E78D13073B2. También se loguea el message-id, el origen y el tamaño.

Una vez se ha encolado el mensaje, se envía a Amavis, a través de 127.0.0.1:10025. Como se puede ver en el ejemplo, en este punto sabemos el destino del mensaje, y también si fue aceptado o no. Lo sabemos porque el log hace referencia al queue id que amavis reinjecta en postfix. De no haberlo aceptado no tendríamos un queue id en el log.

Este nuevo mensaje mantiene el message-id, el tamaño es algo mayor, y termina entregandose, en este caso en la máquina local, usando la aplicación courier-maildrop.

Bien, esto no es más que un ejemplo. En la realidad hay que tratar muchos casos: aliases, reenvíos a máquinas remotas, bounces, mensajes que pasan a deferred por no haber podido entregarse, ....

Además, e insistiendo en que esto no es más que un ejemplo, voy a asumir dos cosas, que aunque se cumplen a menudo, no son ciertas:

  • Que el queue-id es único.
  • Que el message-id es único.

Mapper.pl

El proceso comienza con Hadoop enviando los ficheros de entrada (logs) a la entrada estándar de los mapper.pl que estemos ejecutando en el cluster, que tratarán cada línea y la enviarán por salida estándar con el formato:

  queueidinformacion_separada_por_comas

Para separar la clave del valor se usa un tabulador. Normalmente, los campos del valor se separan con espacios, pero en este ejemplo he preferido las "comas".

Veamos un trozo del código:

 ..............
 while (<>)
 ..............
 if ($linea =~ /^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) .+ postfix\/smtpd\[\d+\]: (\w+): client=(\S+)$/) {
                 $estructura{fecha} = $1; $estructura{qid} = $2; $estructura{host} = $3;
                 print $estructura{qid} . "\t" . "Conexion," . join (",",$estructura{fecha},$estructura{host}) . "\n";
         }
          elsif ($linea =~ /^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) .+ postfix\/smtp\[\d+\]: (\w+): to\=\<([^ ]+)\>, relay=127.0.0.1.*status=(.*) (\w+)\)$/) {

                 $estructura{fecha} = $1; $estructura{qid} = $2; $estructura{to} = $3; $estructura{origto} = $3; $estructura{status} = $4; $estructura{relay} = "Antispam"; $estructura{rqid} = $5; $estructura{status} =~ s/,//g;
                 print $estructura{qid} . "\t" . "Antispam," . join (",",$estructura{fecha},$estructura{to},$estructura{to},$estructura{relay},$estructura{status},$estructura{rqid}) . "\n";
         }
 ...........
 }

Tenemos varios "if/elsif", que adaptan la línea del log de entrada al formato que quiero. Además, he añadido un texto con el tipo de entrada de log que estamos tratando, y que sabemos por el "if" en el que estamos (Antispam en este ejemplo), para luego usarlo en el reducer.

El código completo incluye también la detección de los mensajes que Amavis considera que son spam, en los que pone el rqid como 0. Además, en aquellos casos donde "no proceda" aplicar el concepto de rqid pongo -1 como valor. Todo esto servirá en el reducer.

Volviendo a los logs de ejemplo, el resultado de ejecutar el mapper sobre ellos es:

  E78D13073B2     Conexion,Feb 20 14:27:03,mail.example.com[10.0.0.6]
  E78D13073B2     Msgid,Jan 20 06:27:03,F0E59D47X9B343A7DFRAB1F0E45@EUW0600375
  E78D13073B2     Origen,Feb 20 14:27:04,prueba@example.com,20827
  E78D13073B2     Antispam,Feb 20 14:27:04,destino@example.org,destino@example.org,Antispam,sent (250 Ok: queued as,6A737307316)
  E78D13073B2     Cierre,Feb 20 14:27:04
  6A737307316     Conexion,Feb 20 14:27:04,localhost[127.0.0.1]
  6A737307316     Msgid,Feb 20 14:27:04,F0E59D47X9B343A7DFRAB1F0E45@EUW0600375
  6A737307316     Origen,Feb 20 14:27:04,prueba@example.com,21719
  6A737307316     Maildrop,Feb 20 14:27:04,destino@example.org,destino@example.org,Maildrop,sent (delivered via maildrop service),-1
  6A737307316     Cierre,Feb 20 14:27:04

Después de esto Hadoop ejecutaría un sort (en el ejemplo ya están ordenados por queue-id), y volvería a enviar el resultado a los reducer.pl del cluster. Por lo tanto, sabemos que un reducer va a recibir todas las líneas de log correspondientes a un mismo queue-id.

Reducer.pl

Pasamos directamente a ver algo del código del reducer.pl:

 my %mensaje = (qid => "",
             fecha => "",
             servidor => "",
             msgid => "",
             origen => "",
             size => "",
             to => "",
             origto => "",
             rqid => "",
             relay => "",
             status => "",
             cierre => ""
             );

 ..............
 while () {
     chomp;
     ($qid, $valor) = split /\t/;

         if ($qid ne $qidanterior) {
                 # tenemos un queueid nuevo
                 if ($qidanterior ne "") {
                         &mostrar_datos();

                         %mensaje = (qid => "",
                                 fecha => "",
                                 desdeip => "",
                                 msgid => "",
                                 origen => "",
                                 size => "",
                                 to => "",
                                 origto => "",
                                 rqid => "",
                                 status => "",
                                 cierre => ""
                         );
                 }
                 $qidanterior = $qid;
         }

         # seguimos procesando los datos del mismo qid
         $mensaje{qid} = $qid;
         @datos = split(",",$valor);

         switch ($datos[0]) {
                 case "Conexion" {
                         $mensaje{fecha} = $datos[1];
                         $mensaje{desdeip} = $datos[2];
                 }
                 case "Msgid" {
                         $mensaje{msgid} = $datos[2];
                 }
                 case "Origen" {
                         $mensaje{origen} = $datos[2];
                         $mensaje{size} = $datos[3];
                 }
                 case "Antispam" {
                         $mensaje{to} .= $datos[2] . "-separador-";
                         $mensaje{origto} .= $datos[3] . "-separador-";
                         $mensaje{status} .= $datos[1] . " " .$datos[2] . " " . $datos[3] . " " . $datos[4] . " " . $datos[5] . "-separador-";
                         $mensaje{rqid} = $datos[6];
                 }

 ................
 sub mostrar_datos {
      ................
      if ( ($mensaje{fecha} ne "") && ($mensaje{cierre} ne "") ) {
           # mensaje completo: Salida estandar
           # hay que revisar que se haya pasado por todas las fases
           # y realizar comprobaciones, que aqui no se hacen
           $fechaentrada = `date -u -d '$mensaje{fecha}' '+%s'`; chomp $fechaentrada;
           $fechasalida = `date -u -d '$mensaje{cierre}' '+%s'`; chomp $fechasalida;

           $datoscodificables = $mensaje{desdeip} .",".$mensaje{size}.",".$fechasalida.",".$mensaje{qid}.",".$mensaje{rqid}.",".$mensaje{status};
           $resto = encode_base64($datoscodificables,"");

           print $mensaje{msgid} . "\t" . $fechaentrada . " " . $mensaje{origen} . " " . $mensaje{to} . " " . $mensaje{origto} . $resto . "\n";

     } else {
           # mensaje incompleto. Guardar en otro fichero para poder seguir el tracking
           # Falta por hacer
     }

 }

Por lo tanto, cogemos cada línea de entrada, y ayudándonos de la descripción del tipo de línea que hemos añadido en el mapper, vamos completando la estructura de datos que después será el resultado (por fin) que queremos.

Un ejemplo de la ejecución de este trabajo en mi cluster de prueba:

  $ hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-0.20.2-streaming.jar -input /user/hadoop/mail_logs -output /user/hadoop/logs_formateados/domingo_14_03 -file /home/hadoop/mapper.pl -file /home/hadoop/reducer.pl -mapper "/usr/bin/perl -w mapper.pl" -reducer "/usr/bin/perl -w reducer.pl"

  packageJobJar: [/home/hadoop/mapper.pl, /home/hadoop/reducer.pl, /home/hadoop/hadoop-unjar4902963819550178151/] [] /tmp/streamjob534817239215339193.jar tmpDir=null
  10/03/18 22:20:02 INFO mapred.FileInputFormat: Total input paths to process : 4
  10/03/18 22:20:02 INFO streaming.StreamJob: getLocalDirs(): [/home/hadoop/mapred/local]
  10/03/18 22:20:02 INFO streaming.StreamJob: Running job: job_201003182146_0001
  10/03/18 22:20:02 INFO streaming.StreamJob: To kill this job, run:
  10/03/18 22:20:02 INFO streaming.StreamJob: /usr/local/hadoop/bin/hadoop job  -Dmapred.job.tracker=fn140:8021 -kill job_201003182146_0001
  10/03/18 22:20:02 INFO streaming.StreamJob: Tracking URL: http://fn140.forondarena.net:50030/jobdetails.jsp?jobid=job_201003182146_0001
  10/03/18 22:20:03 INFO streaming.StreamJob:  map 0%  reduce 0%
  10/03/18 22:20:17 INFO streaming.StreamJob:  map 8%  reduce 0%
  10/03/18 22:20:18 INFO streaming.StreamJob:  map 11%  reduce 0%
  10/03/18 22:20:20 INFO streaming.StreamJob:  map 15%  reduce 0%
  .....
  10/03/18 22:21:56 INFO streaming.StreamJob:  map 92%  reduce 25%
  10/03/18 22:21:59 INFO streaming.StreamJob:  map 100%  reduce 25%
  ....
  10/03/18 23:14:13 INFO streaming.StreamJob:  map 100%  reduce 98%
  10/03/18 23:15:55 INFO streaming.StreamJob:  map 100%  reduce 99%
  10/03/18 23:17:31 INFO streaming.StreamJob:  map 100%  reduce 100%
  10/03/18 23:18:25 INFO streaming.StreamJob: Job complete: job_201003182146_0001
  10/03/18 23:18:25 INFO streaming.StreamJob: Output: /user/hadoop/logs_formateados/domingo_14_03

El resultado es un fichero en la ruta "/user/hadoop/logs_formateados/domingo_14_03", con líneas formadas por un message-id, seguido por un tabulador, seguido del origen, el destino, el destino antes de la expansión de aliases, y del resto de la información, codificada en base64, todo separado por espacios.

No hace falta decir que el formato de salida en un entorno real se debería adaptar a lo que se necesitase. De hecho, Rackspace usa una estructura binaria, donde entre otras cosas se guarda la información de los diferentes saltos que hace un correo dentro de su sistema. Este ejemplo, por supuesto, es mucho más sencillo.

Al final del proceso hemos obtenido un fichero, que es el que de verdad queremos en nuestro cluster, con toda la información de cada mensaje que ha entrado en nuestra red en un periodo de tiempo concreto (digamos que cada 4 horas volcamos los logs de syslog a Hadoop y ejecutamos la tarea), como se ve en los siguientes ejemplos (sin la codificación en base64, para que sean más fáciles de seguir):

Mensaje que se entrega en un buzón local:

  ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org 1268225082 origen@example.org destino@example.com destino@example.com mail.example.org[10.0.2.34],2430,1268225085,B6A6A296E55,158AB296F24,Mar 10 12:44:45 destino@example.com destino@example.com Antispam sent (250 OK sent queued as 158AB296F24

  ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org 1268225085 origen@example.org destino@example.com destino@example.com localhost[127.0.0.1],3081,1268225085,158AB296F24,-1,Mar 10 12:44:45 destino@example.com destino@example.com Maildrop sent (delivered via maildrop service)

Por lo tanto:

  • El mensaje con id ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org
  • Entró en el sistema a las 1268225082
  • Desde el servidor mail.example.org[10.0.2.34]
  • El origen era origen@example.org
  • Y el destino destino@example.com (no es un alias)
  • Una vez pasado por el sistema antispam se reencola con queueid 158AB296F24
  • El mensaje de la entrega al servidor antispam es: sent (250 OK sent queued as 158AB296F24)
  • Una vez ha pasado el filtro antispam el mensaje se reencola a las 1268225085
  • Por supuesto desde el servidor localhost[127.0.0.1]
  • Y el mensaje de la entrega local es Mar 10 12:44:45 Maildrop sent (delivered via maildrop service), sólo al destino destino@example.com

Mensaje que se entrega a un servidor remoto:

  1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3 1268219047 origen@example.net destino@example.com destino@example.com mail.example.net[10.5.3.2],1901,1268219051,CA6782FABFB,B6C7A2FAACA,Mar 10 11:04:11 destino@example.com destino@example.com Antispam sent (250 OK sent queued as B6C7A2FAACA

  1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3 1268219049 origen@example.net destinoreal@remoto.example.org destino@example.com localhost[127.0.0.1],2686,1268219051,B6C7A2FAACA,-1,Mar 10 11:04:11 destinoreal@remoto.example.org destino@example.com mail.example.org[10.0.2.34]:25 sent (250 2.0.0 Ok: queued as 423872AB862)
  • Su messageid es 1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3
  • Entro en el sistema a las 1268219047
  • Desde el servidor mail.example.net[10.5.3.2]
  • Desde el origen origen@example.net
  • Y al destino destino@example.com, que en la fase antispam todavía no se expande a ningún alias
  • El mensaje se relaciona con B6C7A2FAACA
  • Y el mensaje de la entrega es Mar 10 11:04:11 destino@example.com destino @example.com Antispam sent (250 OK sent queued as B6C7A2FAACA)
  • Se reencola, a través de localhost, a las 1268219049
  • El mensaje pasa a ser la dirección real destinoreal@remoto.example.org, proveniente del alias destino@example.com
  • El mensaje se entrega en mail.example.org[10.0.2.34]:25
  • Con el mensaje sent (250 2.0.0 Ok: queued as 423A72AB262)

Mensaje que es spam:

  9545850218.UWD8KHA190@spammer.example.info 1268238332 spammer@example.net destino@example.com destino@example.com mail.example.info[10.8.2.12],2141,1268238333,F355429B3A5,0,Mar 10 16:25:33 destino@example.com destino@example.com Antispam sent (250 OK sent mensaje_rechazado por spam

En este caso el mensaje es spam, y no se relaciona con ningún otro queueid.

Mensaje que se entrega a varias cuentas:

  2010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com mail.example.org[10.0.2.34],15853,1268223532,5A4FE29B3B6,43A9B28B357,Mar 10 12:18:52 destinovarios@example.com destinovarios@example.com Antispam sent (250 OK sent queued as 43A9B28B357

  2010031011.AB263566.pc1@mailer.example.org 1268223532 usuario@example.org destinolocal@example.com-separador-destinoremoto@example.info destinovarios@example.com-separador-destinovarios@example.com localhost[127.0.0.1],16525,1268223532,43A9B28B357,-1,Mar 10 12:18:52 destinolocal@example.com destinovarios@example.com Maildrop sent (delivered via maildrop service)-separador-Mar 10 12:18:52 destinoremoto@example.info destinovarios@example.com mail.example.info[10.8.2.12]:25 sent (250 2.0.0 Ok: queued as 452362CA51A)
  • Ahora tenemos un mensaje con id 2010031011.AB263566.pc1@mailer.example.org
  • Que se envía desde usuario@example.org
  • Al destino destinovarios@example.com
  • Que a su vez es un alias a dos cuentas destinolocal@example.com y destinoremoto@example.info
  • Los mensajes de entrega a cada una de ellas son:
    • Mar 10 12:18:52 destinolocal@example.com Maildrop sent (delivered via maildrop service)
    • Mar 10 12:18:52 destinoremoto@example.info mail.example.info[10.8.2.12]:25 sent (250 2.0.0 Ok: queued as 452362CA51A)

Como se puede ver, ya tenemos toda la información necesaria, debidamente preformateada, por lo que ya no necesitamos los ficheros mail.log, que podríamos borrar de Hadoop (que no de las cintas).

La información contenida en estos ficheros nos servirá para preparar consultas, bien directamente con otros trabajos MapReduce, bien con otras herramientas, tal y como mostraré en el siguiente post de la serie, en el que nos aprovecharemos del formato de los ficheros para hacer consultas por message-id, cuenta origen, fecha o cuenta destino (según entró en el sistema o una vez expandidos los aliases).

En este punto hay que volver a recordar que Hadoop está pensado para trabajar con cantidades muy grandes de información, en los que hacer un simple "grep" o usar una base de datos relacional estándar resulta impracticable.

Por último, tened en cuenta que el código de este post, además de ser sólo un ejemplo, es completamente Alpha. Aparte de no funcionar del todo bien (tiene varios fallos que he detectado pero que no he corregido todavía), necesitaría tener en cuenta muchos más casos, como mensajes cuya entrega se retrasa durante días (que no deja de ser algo normal en los sistemas de correo reales).

En el cuarto y último post de la serie: acceso a Hadoop a través de una herramienta de apoyo, como pig.

read more