Con la evolución de los sistemas actuales, cada vez más grandes, con más elementos interconectados, con más necesidades de comunicación; con el cloud computing, las nubes, nieblas y demás parafernalia, llevamos ya tiempo viendo como las diferentes implementaciones de Advanced Message Queuing Protocol, como RabbitMQ, van ganando más y más seguidores.

Desde el punto de vista de la administración de sistemas, entornos como Nova (parte de Openstack) hacen uso de este protocolo, y de RabbitMQ más concretamente, para la comunicación entre sus múltiples componentes.

Hace unos años, y en desacuerdo con la evolución de AMQP, la gente de Imatix (ojo, es una de las empresas responsables de su implementación original), decidió apartarse de este protocolo y desarrollar una pequeña librería, zeroMQ, con la que se pudieran escribir aplicaciones con capacidades de comunicación avanzadas, pero de una manera sencilla; de tal manera que en lugar de un elemento más bien central en la infraestructura (AMQP), se tuviera un modelo más distribuido, en el que la mayoría del trabajo se delegase a los extremos (las aplicaciones), y no a ese nodo central. Usando una analogía de un documento de introducción a zeroMQ, estamos hablando de pasar de subversion (AMQP) a git (zeroMQ).

Bien, ahora que ya tengo vuestro interés (¿Verdad?), vamos a olvidarnos de la literatura y a buscar formas de aprovechar lo que zeroMQ puede ofrecernos para el trabajo diario del administrador de sistemas. Recordemos que al hablar de este tipo de tecnologías nos referimos al paso de mensajes entre procesos, a su gestión, a su enrutamiento, …. Y quien dice mensajes, cómo no, puede estar diciendo logs.

Al hablar de logs, por otro lado, hablamos de syslog, de servidores remotos a los que enviar datos, de ficheros que abrir y procesar, de plantillas para bases de datos en las que escribir los logs directamente, y quizá de sistemas más avanzados, como flume. ¿Sería posible usar lo bueno que tienen estas aplicaciones, y mejorarlas con lo que ofrece zeroMQ, sin volvernos locos en el intento?

Algo parecido debieron pensar en Aggregate Knowledge. Sencillamente, han escrito un par de plugins (de entrada y de salida) para rsyslog, de tal manera que podamos usar mucho de lo que nos ofrece zeroMQ (centrándome en el plugin de salida hablamos de sockets pub, push y rep básicamente) desde aplicaciones externas.

Dicho de otra forma, en una frase, podremos conectarnos en cualquier momento al rsyslog de un servidor, y ver lo que está logueando desde una aplicación que nosotros mismos habremos escrito, ya sea con Python, Perl, C, node.js, PHP, … o con cualquiera de los muchos lenguajes para los que hay un “binding”. De esta manera, podremos procesar datos en tiempo real o analizar lo que está pasando en una máquina, sin tener que acceder a ella y sin tener que ejecutar ningún tipo de software adicional en el servidor.

Antes de seguir, si queréis probarlo vosotros mismos, aquí os he dejado un backport de rsyslog 5.8.5 con soporte para zeromq para Debian Squeeze, y otro .deb de la misma versión de rsyslog para Debian Testing. En cualquier caso, recordad que son paquetes que se han creado en el momento en que se ha escrito este post. Por lo tanto, y más para Debian Testing, es muy posible que las versiones hayan cambiado cuando instaléis los paquetes, así que cuidado, bajo vuestra responsabilidad.

Aclarado esto, sigamos con el ejemplo.

Recordad que zeroMQ es una librería, y que como tal puede usarse en cualquier aplicación, sin límites. Accederemos a los logs de rsyslog (me estoy centrando en el plugin de salida) a través del socket adecuado, y a partir de ahí es cosa nuestra. En todo caso, hagamos lo que hagamos, probablemente podremos catalogar nuestra aplicación en uno de estos dos grandes grupos (es una simplificación, obviamente):

  1. Monitorización: A menudo sólo queremos ver lo que está pasando en un servidor. No necesitamos procesar los mensajes, nos vale con hacer un seguimiento de la actividad de una máquina. Cuando se configura rsyslog para usar sockets tipo “pub”, por ejemplo, estamos creando una especie de feed de datos. Todos los clientes que se conecten a ese socket (probablemente de tipo “sub”) recibirán los mismos datos, como si fuera una emisión de radio. Además, no necesitamos guardar los mensajes si no hay nadie escuchando, podemos descartarlos.
  2. Tratamiento: Los sockets tipo “push” balancean la carga automáticamente entre todos los clientes conectados (probablemente de tipo “pull”). Esto es perfecto cuando queremos procesar, por ejemplo, los logs de correo de un servidor. En un primer momento podríamos empezar con un único cliente escuchando y procesando los datos del servidor. Si en algún momento esta aplicación no pudiera con el volumen de logs, sería suficiente con arrancar una segunda instancia, de tal manera que se dividiría el trabajo entre las dos, automáticamente, y siempre que fueran lo suficientemente inteligentes, claro.

Podemos complicarnos mucho más, claro, pero lo dejaremos así por ahora. Como he dicho anteriormente, probablemente haya simplificado demasiado todo lo que puede hacerse, pero creo que estos dos escenarios describen bien el potencial de la librería.

Vamos a ver con un par de ejemplos lo sencillo que es todo esto. Empezamos con nuestro feed de logs.

# cat /etc/rsyslog.conf
...
*.* :‌omzeromq:bind=tcp://*:5557,pattern=pub;RSYSLOG_TraditionalFileFormat
...
*.*		-/var/log/syslog
...

Los clientes tendrán que conectarse al puerto 5557. Además, como se muestra en este ejemplo, el que usemos zeroMQ no significa que no podamos escribirlos también en otros ficheros.

En Fedora 15, si quisieramos escribir una pequeña aplicación con Python 3.x, sería suficiente con lo siguiente:

# yum install python3-zmq

# cat prueba_pub_sub.py
import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)

print ("Conectando a servidores…")
socket.connect ("tcp://servidor1.forondarena.net:5557")
socket.connect ("tcp://servidor2.forondarena.net:5557")
print ("Conectados a dos servidores")

socket.setsockopt(zmq.SUBSCRIBE, b"")

while 1:
	string = socket.recv()
	print ("Recibimos datos: ",string.decode())

Y, básicamente, esto es todo lo necesario. Es más, en este caso, nos conectamos a dos servidores, “servidor1.forondarena.net” y “servidor2.forondarena.net”, con lo que recibiremos todos los mensajes que generen ambos. Con “setsockopt” estamos poniendo un filtro para los mensajes, que en este caso es una cadena vacía, así que no aplicaremos ninguno. Si abrimos 10 terminales y ejecutamos este python en cada una, veremos como se muestran los mismos mensajes en todas ellas.

Con los sockets push/pull la cosa es igual de sencilla:

# cat /etc/rsyslog.conf
...
*.* :‌omzeromq:bind=tcp://*:5557,pattern=push;RSYSLOG_TraditionalFileFormat
...
*.*		-/var/log/syslog
...

La aplicación cliente tampoco tiene ningún misterio:

import zmq

context = zmq.Context()
socket = context.socket(zmq.PULL)

print ("Conectando a servidores…")
socket.connect ("tcp://servidor1.forondarena.net:5557")
print ("Conectados")

while 1:
        string = socket.recv()
        print ("Recibimos datos: ",string.decode())

En este caso, nos conectamos sólo a “servidor1″. La gran diferencia con el ejemplo anterior es que, si ejecutamos el script en dos terminales, veremos un mensaje en una, y el siguiente en otra (load balance). Este modelo es perfecto, por lo tanto, para los casos en los que las aplicaciones generan una única línea de log independiente por evento. En todo caso, como es nuestra aplicación la responsable del tratamiento de los mensajes, tenemos vía libre para hacer lo que queramos con los logs, ya sea insertándolos en MySQL una vez procesados, o en Hadoop, o en Cassandra, o en Redis, o en MongoDB, o en ….

En definitiva, creo que la unión de zeroMQ con rsyslog es una estupenda idea de la gente de Aggregate Knowledge. Además, el que zeroMQ pueda usarse casi con cualquier lenguaje de programación abre la ventana para que desarrollemos todo tipo de aplicaciones, ya sean en modo texto o gráficas, para consola o para interfaces web.

Notas

Este post es extremadamente superficial. AMQP, zeroMQ o rsyslog dan por sí mismos para páginas y páginas de blogs. Es más, ya las hay, y a montones. Aquí tenéis un puñado de referencias que os pueden servir para profundizar en el tema:

  • Introducción a zeroMQ. Estupenda introducción a zeroMQ. Empezad por aquí.
  • Guía de zeroMQ. Magnífica guía de zeroMQ, con ejemplos y propuestas de patrones.
  • Ejemplo de aplicación zeroMQ.
  • Otro ejemplo de uno de los patrones propuestos en la guía de zeroMQ, y unos benchmarks de Aggregate Knowledge.
  • Motivos por los que Imatix no está de acuerdo con la evolución de AMQP. Cuidado, es pelín denso.

No he querido incluir en el post los pasos para preparar los .deb de rsyslog. Si os interesa, dejad un comentario o un tweet y escribo algo al respecto.

 

Otro mini-post que vuelve a salirse completamente de la idea general de este blog. Ya hay mucha documentación sobre PXE y sobre instalaciones automatizadas tanto de CentOS como de Debian, pero bueno, a ver si le es útil a alguien.

En este caso, vamos a montar un sencillo servidor PXE, que podremos usar para hacer instalaciones de CentOS y Debian. Bueno, en realidad, podemos instalar cualquier cosa, pero en mi caso no suelo necesitar nada más.

Todo esto se debe a que, para las pruebas que hago en mi laboratorio, el mínimo de máquinas virtuales que necesito no baja de… tres, y al final tiendo a perder mucho tiempo con las instalaciones, y poco con el trabajo “de verdad”. Por eso, nada mejor que PXE, kickstart y debian-installer para agilizar el proceso.

Ojo, que todo esto, aunque perfectamente válido, no es lo que yo usaría, a priori, en un entorno real. Hay mucho software que automatiza lo que vamos a hacer aquí, con un interfaz web, scripts para actualizar repositorios, …. Por eso, creo que aplicaciones como Cobbler (por citar una) deben ser la primera opción.

Aún así, para los que queréis aprender cómo se montan estos “inventos”, aquí van unas pinceladas.

El problema

Necesitamos una forma rápida de instalar servidores. En mi caso, máquinas virtuales. Aunque hoy en día la gente de VmWare, RedHat o Virsh+libvirt ofrecen alternativas para clonar instancias como churros, vamos a optar por una solución genérica y mucho más divertida.

Las distribuciones que vamos instalar con este sistema son CentOS y Debian. Para el caso de las basadas en kickstart, la cosa está muy clara, pero para Debian, muchos os preguntaréis ¿Por qué no FAI? Bien, la respuesta rápida, en mi caso, es que no quiero instalar un servidor NFS, y la última vez que miré era un requisito. Como decía, buscad la solución que más os guste.

Otro problema posterior es la configuración “fina” de las instancias. Eso no lo voy a documentar aquí, pero como pista: cfengine, puppet, ….

La solución

He dicho que iba a ir rápido, y ya me estoy enrollando…. Venga, lo primero que necesitamos, como no, es una BIOS que permita instalar los clientes vía PXE. Por supuesto, tanto Qemu/KVM como VmWare lo permiten, así que ni una palabra más al respecto.

En cuanto al software necesario para el servidor de instalaciones, necesitamos un DHCPD razonable (por ejemplo el del ISC), un TFTPD (por ejemplo atftpd) y un servidor web, que en mi caso va a ser apache. Situemos todo en contexto:

  1. La máquina virtual arranca y “habla” con DHCPD.
  2. Además de la información IP, se trasmite, vía DHCP+TFTP, pxelinux.0.
  3. En el caso de Debian, también se aprovecha para dar una pista sobre dónde encontrar el fichero para debian-installer.
  4. Cargamos el menú de arranque con TFTP.
  5. El sistema usa apache para obtener los ficheros ks, debian-installer y los rpm, deb y demás parafernalia.

Empezamos por lo fácil: El contenido web. Tan sencillo como copiar el contenido de los CDs de instalación de las distribuciones a una ruta del arbol web.

# ls /var/www/instalaciones/centos/6.0-x86_64/
CentOS_BuildTag  EULA  images    Packages                  repodata              RPM-GPG-KEY-CentOS-Debug-6     RPM-GPG-KEY-CentOS-Testing-6
EFI              GPL   isolinux  RELEASE-NOTES-en-US.html  RPM-GPG-KEY-CentOS-6  RPM-GPG-KEY-CentOS-Security-6  TRANS.TBL

# ls /var/www/instalaciones/debian/squeeze/
autorun.inf  dists  firmware  g2ldr.mbr  install.amd  md5sum.txt  pool         README.mirrors.html  README.source  setup.exe  win32-loader.ini
css          doc    g2ldr     install    isolinux     pics        README.html  README.mirrors.txt   README.txt     tools

Sigamos con la configuración para DHCP. Hemos dicho que, además de las obvias IPs, también van a ayudar a las Debian con la definición del fichero para debian-installer. Hay tres bloques de configuración que os quiero enseñar:

# less /etc/dhcp/dhcpd.conf
...
subnet 192.168.10.0 netmask 255.255.255.0 {
   option routers 192.168.10.1;
   option domain-name-servers 192.168.10.1;
   option domain-name "forondarena.net";
   option tftp-server-name "inst.forondarena.net";
   filename "/instalaciones/pxelinux.0";
}
host centos1 {
   hardware ethernet 52:54:00:30:38:c6;
   server-name "fn134.forondarena.net";
   fixed-address 192.168.10.134;
}
host debian1 {
   hardware ethernet 52:54:00:da:4a:92;
   server-name "fn135.forondarena.net";
   fixed-address 192.168.10.135;
   if substring (option vendor-class-identifier, 0, 3) = "d-i" {
     filename "http://inst.forondarena.net/instalaciones/squeeze_preseed_135.cfg";
        }
}
...

Como veis, para CentOS no trasmitimos nada relacionado con el fichero kickstart, pero para Debian sí. ¿Por qué? Pues para enriquecer un poco el post con algo diferente, la verdad. Hay más de una forma de hacerlo.

Con esto ya tenemos la primera fase del arranque. Lo siguiente: El “boot menu”. Y para esto necesitamos TFTP:

/srv/tftp/instalaciones/
├── pxelinux.0
├── pxelinux.cfg
│   ├── C0A80A86
│   └── C0A80A87
├── squeeze-x86_64
│   ├── initrd.gz
│   └── vmlinuz
├── centos-6.0-x86_64
│   ├── initrd.img
│   └── vmlinuz
├── msgs
│   ├── centos
│   │   ├── boot.msg
│   │   └── general.msg
│   └── debian
│       ├── f1.txt
│       └── f2.txt

La mayoría de estos ficheros se pueden bajar casi desde cualquier sitio:


http://ftp.cz.debian.org/debian/dists/squeeze/main/installer-amd64/current/images/netboot/debian-installer/amd64/

Como el contenido de “msg” es obvio, vamos a los fichero importantes, C0A80A86 y C0A80A87. ¿Qué clase de nombres son estos? Pues son las IPs, en hexadecimal, de nuestras dos máquinas virtuales:

$ IP_ADDR="192.168.10.134"; printf '%02X' ${IP_ADDR//./ }; echo
C0A80A86
$ IP_ADDR="192.168.10.135"; printf '%02X' ${IP_ADDR//./ }; echo
C0A80A87

El contenido, para CentOS, en C0A80A86:

timeout 100
prompt 1
display msgs/centos/boot.msg
F1 msgs/centos/boot.msg
F2 msgs/centos/general.msg

default Centos6

label Centos6
  menu label ^Centos 6 amd64
  kernel centos-6.0-x86_64/vmlinuz
  append initrd=centos-6.0-x86_64/initrd.img ramdisk_size=6878 ip=dhcp ks=http://192.168.10.40/instalaciones/ks_rh6_134.ks

Como veis, hemos especificado la ruta web para el fichero kickstart.

Para Debian, en C0A80A87:

# less C0A80A87
menu hshift 15
menu width 49
prompt 1
display msgs/debian/f1.txt
timeout 100
f1 msgs/debian/f1.txt
f2 msgs/debian/f2.txt
menu title Installer boot menu
menu color title	* #FFFFFFFF *
...
menu tabmsgrow 18
menu tabmsg Press ENTER to boot or TAB to edit a menu entry

default install

label install
	menu label ^Debian Squeeze Texto amd64
	menu default
	kernel squeeze-x86_64/vmlinuz
	append video=vesa:ywrap,mtrr vga=788 initrd=squeeze-x86_64/initrd.gz auto=true priority=critical locale=es_ES console-keymaps-at/keymap=es --

En este caso, no hemos dicho dónde encontrar el fichero para debian-installer. Ya lo sabemos.

Y ya casi por último, veamos los ficheros kickstart y debian-installer, a los que vamos a acceder desde apache:

# ls -1  /var/www/instalaciones/
...
ks_rh6_134.ks
squeeze_preseed_135.cfg

Todos conocéis el formato de los .ks; son simples y fáciles de leer.

#version=RHEL6
install
url --url=http://192.168.10.40/instalaciones/centos/6.0-x86_64/
lang es_ES.UTF-8
keyboard es
network --device eth0 --onboot yes --bootproto static --ip 192.168.10.134 --netmask 255.255.255.0 --gateway 192.168.10.1 --nameserver 192.168.10.1 --hostname fn134.forondarena.net
rootpw  --iscrypted contraseña
firewall --service=ssh
authconfig --enableshadow --passalgo=sha512 --enablefingerprint
selinux --enforcing
timezone --utc Europe/Madrid
bootloader --location=mbr --driveorder=vda --append="crashkernel=auto rhgb quiet"
clearpart --all --drives=vda
part /boot --fstype=ext4 --size=200
part pv.gjsqnW-TYE3-SDwW-646R-0SSI-hhDa-ZF6bXm --grow --size=200
volgroup vg_primario --pesize=4096 pv.gjsqnW-TYE3-SDwW-646R-0SSI-hhDa-ZF6bXm
logvol / --fstype=ext4 --name=lv_root --vgname=vg_primario --size=7400
logvol swap --name=lv_swap --vgname=vg_primario --size=588
##repo --name="CentOS"  --baseurl=http://192.168.10.1/instalaciones/centos/6.0-x86_64/ --cost=100

halt

%packages
@core
@server-policy
@spanish-support
openssh*
%end

Es un ejemplo autogenerado para una máquina virtual tipo KVM. No le hagáis mucho caso al contenido en sí. Lo dicho, tenéis un kilo de documentación, e incluso aplicaciones para hacer estos ficheros.

Otra cosa muy diferente es debian-installer. La verdad, no conozco a mucha gente que no se haya atascado con esto en algún momento. El concepto es sencillo, con una opción para cada una de las opciones de cada uno de los menús que pueden aparecer durante el proceso de instalación. El problema es que, esto tan fácil de decir, es un listado …. largo; y con algunos algoritmos, como el del cálculo de espacio para particiones, nada claros. Además, como suele ser demasiado frecuente, el que decidió los nombres de las opciones…. bueno, que no me parece demasiado intuitivo, aunque si estás familiarizado con Debian te haces enseguida.

No voy a escribir un ejemplo completo. Os vais a tener que conformar con una pequeña muestra:

# cat /var/www/instalaciones/squeeze_preseed_135.cfg
...
# Keyboard selection.
d-i console-tools/archs select at
d-i console-keymaps-at/keymap select es
d-i keyboard-configuration/xkb-keymap select es
### Network configuration
d-i netcfg/choose_interface select eth0
d-i netcfg/disable_dhcp boolean true
# Static network configuration.
d-i netcfg/get_nameservers string 192.168.10.1
d-i netcfg/get_ipaddress string 192.168.10.135
d-i netcfg/get_netmask string 255.255.255.0
d-i netcfg/get_gateway string 192.168.10.1
d-i netcfg/confirm_static boolean true
...
# Root password, encrypted using an MD5 hash.
d-i passwd/root-password-crypted password contraseña
...
### Partitioning
d-i partman-auto/disk string /dev/sda
d-i partman-auto/method string lvm
d-i partman-lvm/device_remove_lvm boolean true
d-i partman-lvm/device_remove_lvm_span boolean true
d-i partman-auto/purge_lvm_from_device boolean true
d-i partman-auto-lvm/new_vg_name string vg_forondarenanet
d-i partman-basicmethods/method_only boolean false
d-i partman-auto/expert_recipe string boot-root :: \
100 100 100 ext3 \
        $defaultignore{ } \
        $primary{ } \
        device{ /dev/sda } \
        $bootable{ } \
        method{ format } \
        format{ } \
        use_filesystem{ } \
        filesystem{ ext3 } \
        mountpoint{ /boot } \
        . \
200 1000 -1 ext4 \
        $defaultignore{ } \
        $primary{ } \
        device{ /dev/sda } \
        method{ lvm } \
        vg_name{ vg_forondarenanet } \
        . \
300 4000 -1 ext4 \
        $lvmok{ } \
        lv_name{ lvroot } \
        in_vg{ vg_forondarenanet } \
        method{ format } \
        format{ } \
        use_filesystem{ } \
        filesystem{ ext4 } \
        mountpoint{ / } \
        . \
512 512 512 linux-swap \
        $lvmok{ } \
        in_vg{ vg_forondarenanet } \
        lv_name{ lvswap } \
        method{ swap } \
        format{ } \
        .
d-i partman-auto/choose_recipe select boot-root

d-i partman/confirm_write_new_label boolean true
d-i partman/choose_partition select finish
d-i partman/confirm boolean true
d-i partman/confirm_nooverwrite boolean true
d-i partman/confirm_nochanges boolean true
d-i partman-lvm/confirm boolean true
d-i partman-lvm/confirm_nooverwrite boolean true
d-i partman-lvm/confirm_nochanges boolean true
...
# Individual additional packages to install
d-i pkgsel/include string openssh-server less
d-i pkgsel/upgrade select safe-upgrade
popularity-contest popularity-contest/participate boolean false
d-i finish-install/reboot_in_progress note
d-i debian-installer/exit/halt boolean true
# This will power off the machine instead of just halting it.
d-i debian-installer/exit/poweroff boolean true
...

Y así podría seguir…

En este caso es una máquina virtual para ESX. Me interesa que veáis la parte de particionado. Aunque no os lo creáis, he definido una partición primaria /boot de unos 100M, y el resto en LVM, con aproximadamente 512M de swap, y el resto de espacio disponible para raíz.

Y poco más hace falta. Ahora bien, en este post, las notas que suelo añadir al final son más importantes que nunca.

Notas

Nota 1: Los ficheros de instalación automática de los ejemplos hacen un halt una vez finalizada la instalación. ¿Por qué? Porque no tengo automatizado que la BIOS cambie por arte de magia de arranque por PXE a arranque desde disco.

Nota 2: Más que una nota, un consejo. No os volváis locos con la configuración de ficheros de sistema, usuarios y aplicaciones. Tenéis herramientas mucho mejores para la configuración “fina”, así que plantearos añadir puppet, cfengine, chef, … en los bloques “%packages” y “d-i pkgsel/include”, y dejarles que hagan su trabajo al arrancar la máquina. Para CentOS (desde puppetlabs):

%post
....
/sbin/chkconfig --level 345 puppet on
/bin/echo "$PUPPETIP puppet" >> /etc/hosts
/bin/echo "nameserver $NAMESERVERIP" >> /etc/resolv.conf
hostname $hostname
# Write out the hostname to a file for reboot.
/bin/echo -e "NETWORKING=yes\nHOSTNAME=$hostname" > /etc/sysconfig/network
/usr/sbin/puppetd -tv

y, como mini-ejemplo, un poco diferente, para Debian:

d-i preseed/late_command string in-target wget -P /tmp/ http://servidor_web/script.sh;
  in-target chmod +x /tmp/script.sh; in-target /tmp/script.sh

dejando que ese “script.sh” haga cosas como “sed -i ‘s/START=no/START=yes/’”, siguiendo la estructura de Debian de ficheros en default y todas estas cosas.

Nota 3: Lo más importante. Este post no es suficiente para hacer que las instalaciones automáticas os funcionen. Necesita trabajo. Si os atascáis con algo, tweet o comentario, y lo intentaremos solucionar.

 

Ahora que ando reestructurando mi laboratorio, voy a aprovechar para documentar un par de aplicaciones que estoy moviendo al nuevo hierro.

En realidad, esto se sale un poco del objetivo de este blog, sobre todo teniendo en cuenta que ya hay kilos de documentación sobre, en este caso, Dovecot; pero bueno, a mí me va a servir como referencia rápida, y quizá os sea de utilidad a alguno de los cuatro que pasáis por aquí.

Empezamos con los buzones compartidos en Dovecot. Como siempre, pretendo ser lo más práctico posible, así que lo mejor que se me ha ocurrido es describir exactamente la forma en la que yo mismo tengo montado “el invento”.

El problema

Antes de nada, hablemos sobre las aplicaciones que debemos tener funcionando antes de empezar:

Necesitamos un servidor de correo propio (probablemente Postfix) en el que recibir los mensajes, ya sea porque el MX apunta a él o porque usamos software tipo Fetchmail, por ejemplo. Además, tenemos varios usuarios en nuestro sistema que acceden a su correo a través de Dovecot.

Entre el correo que reciben, además del privado para cada uno, es muy probable que los usuarios estén suscritos a varias listas: de seguridad, de usuarios, anuncios de nuevas versiones de x aplicación, …. Además, también hay cuentas tipo helpdesk y listas para un departamento o grupo que deben estar accesibles para varios usuarios simultaneamente.

Algo habitual en estos casos es, por un lado, que cada usuario gestione sus propias subscripciones a listas de correo, y por otro, para el caso de las cuentas tipo helpdesk de las que hemos hablado, el mandar una copia a cada uno de los n usuarios implicados. Obviamente, esto genera un montón de copias y mensajes repetidos que, aunque no tenga que suponer un enorme problema, sí que es más propenso a fallos, además de no dejar de ser “poco elegante”.

La solución

Bien, aquí ya entramos en la forma en la que yo lo hago, que no tiene que ser ni mucho menos la mejor o la única.

Vamos a empezar por el caso más fácil, los buzones públicos:

Por usar un ejemplo concreto, al hablar de “buzón público” me voy a referir a listas de correo generales y accesibles para todo el mundo. El único control de acceso que se va a hacer sobre ellas es a nivel de sistema de ficheros, por lo que recomiendo que los usuarios del sistema pertenezcan a un grupo concreto (o varios) siempre que quieran acceder a una u otra lista.

Y digo usuarios del sistema porque cada lista va a ser un usuario de sistema (mapeada con una cuenta de correo), de tal manera que la suscripción será única. Si quisieramos hacer que todo el mundo tuviera acceso a la lista de usuarios de Postfix, crearíamos el usuario “postlista”, por ejemplo, y nos suscribiríamos a la lista de Postfix con postlista _A_T_ forondarena.net (no existe ni existirá nunca, que conste, pero ahora ya tengo un spamtrap más :D ). Como podéis suponer, el correo que llegue a esta dirección se guardará físicamente en (esta parte de la configuración de Postfix/Dovecot os la dejo a vosotros):

drwxrwx--- 5 postlista postlista 4096 oct  1 17:57   /home/postlista/Maildir

Volviendo a los permisos a nivel de sistema operativo, hay algo importante a tener en cuenta: Dovecot 2.x (y con ello su agente de entrega local que usamos en Postfix) crea los ficheros de correo con los mismos permisos que su directorio principal, y por lo tanto debemos hacer que Maildir tenga 770, por ejemplo (siempre que optemos por los permisos a nivel de sistema operativo para limitar el acceso).

Una vez hecho esto vamos a crear un directorio común que nos va a servir como referencia para que los distintos usuarios “sepan” donde están las listas. La estructura va a ser la siguiente:

ls -lha /home/listas/Maildir
lrwxrwxrwx 1 root  root    21 sep 22 22:03 .postlista -> /home/postlista/Maildir

En el directorio “/home/listas” (o cualquier otro, no es un usuario del sistema) vamos a crear un enlace simbólico para cada uno de los buzones públicos.

¿Pero cómo sabe un usuario que puede acceder a esa lista pública?

Para esto usaremos los namespaces de IMAP (El que quiera entrar en el detalle sobre lo que son, que busque el RFC).

Dovecot define los namespaces en un bloque similar al siguiente, a veces en un único dovecot.conf, a veces en otro fichero. Debian, por ejemplo, divide los diferentes apartados de configuración en “.conf” diferentes:

namespace {
  type = public
  separator = /
  prefix = listas-public/
  location = maildir:/home/listas/Maildir:INDEX=~/Maildir/listas-public
  subscriptions = no
}

Adaptad la configuración a vuestro gusto, pero lo importante es que cuando el usuario “pepe” se loguee vía IMAP, va a poder suscribirse a todo lo listado en el directorio “/home/listas/Maildir”, que además va a aparecer en su Thunderbird en el subdirectorio listas-public. Además, queremos que cada usuario tenga los ficheros propios de control que usa Dovecot en su maildir privado.

Esta es la forma fácil de crear buzones públicos.

Sin embargo, IMAP define una extensión para ACLs (los interesados también tienen RFCs al respecto), con las que podemos definir un control de acceso mucho más detallado, y que nos permitirán hilar mucho más fino en lo que puede hacer el usuario x en el buzón del usuario z. Para que os hagáis una idea, y cogido directamente del wiki de Dovecot, las ACLs permiten establecer los siguientes permisos:

  • lookup: Mailbox is visible in mailbox list. Mailbox can be subscribed to
  • read: Mailbox can be opened for reading
  • write: Message flags and keywords can be changed, except \Seen and \Deleted
  • write-seen: \Seen flag can be changed
  • write-deleted: \Deleted flag can be changed
  • insert: Messages can be written or copied to the mailbox
  • post: Messages can be posted to the mailbox by LDA, e.g. from Sieve scripts
  • expunge: Messages can be expunged
  • create: Mailboxes can be created (or renamed) directly under this mailbox (but not necessarily under its children, see ACL Inheritance section above) (renaming also requires delete rights)
  • delete: Mailbox can be deleted
  • admin: Administration rights to the mailbox (currently: ability to change ACLs for mailbox)

En el wiki de Dovecot podéis ver que letra corresponde a cada permiso. Por ahora lo dejamos y pasamos a la creación de un buzón compartido. Por cierto, no sé si está claro que con “buzón” me puedo estar refiriendo también a carpetas como “Trash”, “Sent”, o cualquier otra que creemos a mano.

Vamos a suponer que tenemos un usuario “helpdesk” en el que recibimos todo el correo destinado a “helpdesk _A_T_ forondarena.net” (otro spamtrap).

/home/helpdesk/Maildir

Helpdesk es otro usuario del sistema, y su Maildir tiene permisos 770, para limitar el acceso también a nivel de sistema operativo.

Ahora que ya tenemos el usuario del sistema y que estamos recibiendo correo, vamos a activar el soporte para ACLs en Dovecot. Es muy sencillo, sólo hay que añadir un par de plugins, ya sea en dovecot.conf, o en el fichero que defina vuestra distribución.

...
mail_plugins = acl
...
protocol imap {
  mail_plugins = $mail_plugins imap_acl
}

Y una vez tenemos los plugins, añadimos la configuración básica:

plugin {
   acl = vfile
}

Recordemos que queremos que el usuario “pepe” pueda acceder a todo el buzón “helpdesk”. Para conseguirlo, vamos a hacer login con el usuario helpdesk, y vamos a usar el propio protocolo IMAP para dar acceso a pepe:

# openssl s_client -connect 192.168.10.20:993
CONNECTED(00000003)
..... (información SSL) .....
* OK [CAPABILITY IMAP4rev1 LITERAL+ SASL-IR LOGIN-REFERRALS ID ENABLE IDLE AUTH=PLAIN] Dovecot ready.
. login helpdesk password
. OK [CAPABILITY IMAP4rev1 LITERAL+ SASL-IR LOGIN-REFERRALS ID ENABLE IDLE SORT SORT=DISPLAY THREAD=REFERENCES THREAD=REFS MULTIAPPEND UNSELECT CHILDREN NAMESPACE UIDPLUS LIST-EXTENDED I18NLEVEL=1 CONDSTORE QRESYNC ESEARCH ESORT SEARCHRES WITHIN CONTEXT=SEARCH LIST-STATUS ACL RIGHTS=texk] Logged in
. setacl INBOX pepe lrwstipekxacd
. OK Setacl complete.

Dicho de otra forma, hemos permitido “lrwstipekxacd” (cada permiso de la tabla anterior es una de estas letras) en el INBOX de helpdesk al usuario pepe.

¿Pero cómo sabe pepe que puede acceder a helpdesk?

Pues con los namespaces, claro. Vamos a crear uno de tipo “shared”:

namespace {
  ...
  type = shared
  separator = /
  prefix = buzones-shared/%%u/
  location = maildir:%%h/Maildir:INDEX=~/Maildir/buzones-shared/%%u
  ...
}

Igual que con los buzones públicos, cuando pepe acceda a su cuenta va a ver un directorio buzones-shared en el que aparecerá helpdesk. ¿Verdad?

Pues no, porque no hay forma (con un rendimiento razonable) de hacer que Dovecot se recorra todos los buzones configurados (todos esos %%h y %%u), y sepa a cuáles tiene acceso pepe. A fin de cuentas, pepe y helpdesk son usuarios completamente diferentes, y no hay un enlace en un directorio claramente identificado, como “/home/listas” en el caso de los buzones públicos.

¿Entonces qué?

Es aquí donde entran en juego lo que en Dovecot se llaman “directorios”, y que vamos a usar para decir, en este caso a pepe, que puede acceder a helpdesk. La configuración es la siguiente:

plugin {
  acl_shared_dict = file:/etc/dovecot/acls/shared-mailboxes
}

Quien dice “file:” dice base de datos o fichero .db. En cualquier caso, para este ejemplo usaremos un fichero en texto plano, “/etc/dovecot/acls/shared-mailboxes”. Tened en cuenta, una vez más, que pepe tiene que poder leer el contenido de este fichero, y que además le interesa poder crear un lock en el directorio mientras está trabajando con él (vigilad los permisos unix, usad el directorio que queráis).

El contenido de shared-mailboxes es el siguiente:

shared/shared-boxes/user/pepe/helpdesk
1

Es una sintaxis que me parece particularmente “retorcida”, pero es lo que hay si no queréis usar base de datos.

Y con esto lo tenemos ya “casi todo” (ver notas al final del post). Cuando pepe abra su Thunderbird y liste las carpetas a las que puede suscribirse verá el inbox de helpdesk, en este caso a partir de la carpeta “buzones-shared”.

Notas

Primera nota:

En Dovecot, una vez que definimos un namespace, también tenemos que definir explicitamente el namespace privado. Por lo tanto, la configuración completa, en lo que a namespaces se refiere, tiene que parecerse a esto:

namespace {
  inbox = yes
  location =
  prefix =
  separator = /
  subscriptions = yes
  type = private
}
namespace {
  location = maildir:/home/listas/Maildir:INDEX=~/Maildir/listas-public
  prefix = listas-public/
  separator = /
  subscriptions = no
  type = public
}
namespace {
  location = maildir:%%h/Maildir:INDEX=~/Maildir/buzones-shared/%%u
  prefix = buzones-shared/%%u/
  separator = /
  subscriptions = no
  type = shared
}

Una vez más, adaptad la configuración a vuestras preferencias. Por ejeplo, quizá os venga bien añadir un “list = children” en vuestros namespaces.

Segunda nota:

¿Si habéis hecho pruebas antes de llegar al final, os habéis dado cuenta de que, una vez configurados los buzones shared, las listas públicas han dejado de verse?

¡Claro!, hemos empezado a usar ACLs, así que hay que dar acceso a pepe a las listas. Muy fácil, nos logueamos con el usuario de la lista que queramos abrir, y damos acceso a pepe:

...
* OK [CAPABILITY IMAP4rev1 LITERAL+ SASL-IR LOGIN-REFERRALS ID ENABLE IDLE AUTH=PLAIN] Dovecot ready.
. login postlista password
. OK [CAPABILITY IMAP4rev1 LITERAL+ SASL-IR LOGIN-REFERRALS ID ENABLE IDLE SORT SORT=DISPLAY THREAD=REFERENCES THREAD=REFS MULTIAPPEND UNSELECT CHILDREN NAMESPACE UIDPLUS LIST-EXTENDED I18NLEVEL=1 CONDSTORE QRESYNC ESEARCH ESORT SEARCHRES WITHIN CONTEXT=SEARCH LIST-STATUS ACL RIGHTS=texk] Logged in
. setacl INBOX pepe lrwstipekxacd
. OK Setacl complete.
. logout

Tercera nota:

Los ficheros dovecot-acl tienen el siguiente contenido de ejemplo:

user=pepe akxeilprwts

Pero no los editéis directamente. Usad setacl, getacl, myrights, …, o mejor aún, alguna extensión de Thunderbird que lo haga por vosotros.

Y para terminar, tened en cuenta, una vez más, que este post no es una referencia para hacer copy/paste. Necesita trabajo (poco) si queréis hacerlo funcionar.

 

Pasamos a la tercera parte de la serie, en la que ya vamos a ver un ejemplo concreto de lo que puede ser un sencillo interfaz para la gestión de logs usando Solandra. Como siempre, todo el código y los ficheros de configuración más importantes están en Github.

Recordemos los objetivos que nos hemos marcado:

  • Ser capaces de gestionar un volumen muy importante de logs, con la máxima escalabilidad y disponibilidad.
  • Poder añadir los logs en el sistema de una forma sencilla.
  • Tener un interfaz web desde el que poder visualizar los datos.

Ya hemos hablado sobre la escalabilidad de Cassandra en los posts anteriores, así que no vamos a volver a entrar en este punto. Veamos los otros dos:

Inserción de datos en el sistema

Solandra es básicamente una adaptación de Solr, por lo que en realidad vamos a tratar conceptos propios de esta aplicación en lo que queda de post. Cuando veamos algo específico de Solandra, lo señalaré.

Uno de los elementos más importantes de la configuración del sistema es el schema; que viene a ser el lugar en el que se definen los atributos que forman cada “documento” (correo en este caso) que se quiere indexar. Como veremos más adelante, una vez creada esta estructura de datos, usaremos el comando curl para insertarla en el cluster a través de una url determinada. Además, Solandra permite trabajar con varios schemas diferentes de manera simultanea.

Simplicando un poco, el schema está compuesto por dos grandes bloques: La definición de los tipos de datos y la lista de atributos que forman cada documento.

Solr ofrece muchos tipos de datos ya creados de antemano: numéricos, texto, fechas, …. Por si esto fuera poco, la aplicación permite definir nuevos tipos siempre que se considere necesario. Para la gestión de logs de correo, por ejemplo, nos podría venir bien un tipo específico para las direcciones email:

...
<fieldType name="email" class="solr.TextField" >
   <analyzer>
      <tokenizer class="solr.PatternTokenizerFactory" pattern="@" />
      <filter class="solr.LowerCaseFilterFactory" />
      <filter class="solr.TrimFilterFactory" />
   </analyzer>
</fieldType>
...

Este tipo se basa en el TextField clásico, pero forma los tokens alrededor de la “@”. De esta manera, facilitaremos las búsquedas tanto en base a la parte local de las direcciones como al dominio. Además, elimina los espacios y convierte las mayúsculas en minúsculas.

El siguiente paso es la definición de los atributos que componen cada documento, y que por supuesto van a ser de alguno de los tipos disponibles en el schema.

<fields>
    <field name="id" type="uuid" indexed="true" stored="true" required="true" />
    <field name="in_from" type="email" indexed="true" stored="true" />
    <field name="in_size" type="tint" indexed="false" stored="true" />
    <field name="in_fentrada" type="tdate" indexed="true" stored="true" />
    <field name="in_to" type="email" indexed="true" stored="true" />
    <field name="in_to_adicional" type="email" indexed="true" stored="true" multiValued="true" />
    <field name="in_fsalida" type="tdate" indexed="true" stored="true" multiValued="true" />
    <field name="in_estado" type="string" indexed="false" stored="true" multiValued="true" />

    <dynamicField name="*" type="ignored" multiValued="true" />
</fields>

Casi no hace falta explicar nada. En este sencillo ejemplo tenemos un identificador, un origen, un tamaño, una fecha de entrada y una fecha de entrega. Además, con cada mensaje vamos a guardar el estado de entrega para cada destino (pueden haberse realizado varios intentos, por ejemplo a causa del greylisting), y la lista de destinatarios adicionales para los que iba dirigido.

Obviamente, es una estructura limitada. En un entorno real se debería guardar mucha más información (antivirus, antispam, expansión de aliases, …).

Y con esto ya lo tenemos. Sólo queda volcar el schema en el cluster:

curl http://ip_cluster:8983/solandra/schema/correo --data-binary @/root/schema_correo.xml -H 'Content-type:text/xml; charset=utf-8'

Como hemos dicho, al igual que un “/schema/correo”, se podría definir un “/schema/web”, por ejemplo, y usarlos simultáneamente.

El volcado de datos

El volcado de datos se puede hacer de varias formas, pero vamos a limitarnos al uso de ficheros xml. Lo mejor es ver un ejemplo:

<add allowDups="false">
   <doc>
      <field name="in_from">origenspam@example.com</field>
      <field name="id">b309842a-1cf7-11e0-9759-f896edbeae14</field>
      <field name="in_fentrada">2011-04-30T00:17:24Z</field>
      <field name="in_size">941</field>
      <field name="in_to">destino1@target.example.net</field>
      <field name="in_fsalida">2011-04-30T00:17:25Z</field>
      <field name="in_estado">0 - 192.168.10.24_accepted_message./Remote_host_said:_250_2.0.0_Ok:_queued_as_DF2BD74CA71/</field>
      <field name="in_to_adicional">destino1@target.example.net</field>
      <field name="in_to_adicional">destino2@target.example.net</field>
      <field name="in_to_adicional">destino3@target.example.net</field>
      <field name="in_to_adicional">destino4@target.example.net</field>
   </doc>
   <doc>
      <field name="in_from">origenspam@example.com</field>
      <field name="id">b30991b7-1cf7-11e0-9759-e819bb7f7b58</field>
      <field name="in_fentrada">2011-04-30T00:17:24Z</field>
      <field name="in_size">941</field>
      <field name="in_to">destino2@target.example.net</field>
      <field name="in_fsalida">2011-04-30T00:17:24Z</field>
      <field name="in_estado">0 - 192.168.10.24_accepted_message./Remote_host_said:_250_2.0.0_Ok:_queued_as_A945A21CD6B/</field>
      <field name="in_to_adicional">destino1@target.example.net</field>
      <field name="in_to_adicional">destino2@target.example.net</field>
      <field name="in_to_adicional">destino3@target.example.net</field>
      <field name="in_to_adicional">destino4@target.example.net</field>
   </doc>
   <doc>
      <field name="in_from">cliente1@example.net</field>
      <field name="id">b43de232-1cf7-11e0-9759-d71ac36a357f</field>
      <field name="in_fentrada">2011-04-30T00:04:41Z</field>
      <field name="in_size">6077</field>
      <field name="in_to">greylister@example.com</field>
      <field name="in_fsalida">2011-04-30T00:07:27Z</field>
      <field name="in_estado">0 - 172.16.0.14_does_not_like_recipient./Remote_host_said:_450_4.7.1_<cliente1@example.net>:_Sender_address_rejected:_Message_delayed_now.Retry_later,_please./Giving_up_on_172.16.0.14./</field>
      <field name="in_fsalida">2011-04-30T00:12:38Z</field>
      <field name="in_estado">1 - 172.16.0.14_does_not_like_recipient./Remote_host_said:_450_4.7.1_<cliente1@example.net>:_Sender_address_rejected:_Message_delayed_now._Retry_later,_please./Giving_up_on_172.16.0.14./</field>
      <field name="in_fsalida">2011-04-30T00:31:53Z</field>
      <field name="in_estado">2 - 172.16.0.14_does_not_like_recipient./Remote_host_said:_451_4.3.0_<greylister@example.com>:_Temporary_lookup_failure/Giving_up_on_172.16.0.14./</field>
      <field name="in_fsalida">2011-04-30T01:04:43Z</field>
      <field name="in_estado">3 - 172.16.0.14_accepted_message./Remote_host_said:_250_2.0.0_Ok:_queued_as_CB6A3D4A217/</field>
      <field name="in_to_adicional">greylister@example.com</field>
   </doc>
</add>

El volcado, otra vez, es muy sencillo.

curl http://ip_cluster:8983/solandra/correo/update -F stream.file=/tmp/volcado.xml

La conversión de logs desde el más que probable modo texto de syslog a xml, y de ahí al cluster de Solandra, queda fuera de esta serie de posts. De hecho, un servidor piensa que esto es lo realmente importante y difícil para llevar este proyecto a la práctica de una manera “seria”.

El Interfaz

¿Qué mejor que un interfaz web para mostrar los datos que hemos almacenado en Solandra? ¡Sorpresa! la gente detŕas del proyecto ajax-solr ya ha hecho la mayoría del trabajo, así que sólo nos queda modificar un puñado de ficheros, algo de código JavaScript, y ya lo tendremos.

Un detalle más: No queremos permitir el acceso directo a Solandra desde la web, así que necesitamos un proxy que filtre las consultas y las redirija al puerto de Solandra (tcp/8983 por defecto), y que en mi laboratorio escucha en localhost. En este caso, como casi siempre que quiero programar algún tipo de servicio para Internet sin dedicarle mucho tiempo, he usado node.js. Para este ejemplo, y por jugar un poco con GeoIP, he escrito un sencillo proxy que permite conexiones sólo si tienen como referer forondarena.net y si vienen desde Europa o América del Norte. Como no podría ser de otra forma, el código de este proxy también está disponible en Github.


(Nota: Esta demo es un Solr normal, pero el funcionamiento es idéntico al de Solandra).

Conclusiones

El “mercado” está lleno de soluciones de todo tipo que nos pueden ayudar en la gestión de logs. Hay aplicaciones comerciales, como Splunk, sistemas basados en software libre, como Solr, tecnología que nos puede permitir crecer “ilimitadamente”, como Cassandra, Hadoop o Hbase, pero que requieren algo de trabajo; y también tenemos los mágnificos sistemas de bases de datos, como Mysql o Postgresql. ¿Cuál elegir?

En una primera fase, una buena base de datos con un sencillo interfaz web o un Solr estándar pueden servir perfectamente para gestionar todo tipo de logs. De hecho, tanto Mysql como Solr ofrecen alternativas para el particionado que pueden permitir este esquema en la segunda, tercera o cuarta fase.

Un buen consejo que escuche hace tiempo es el de “no arreglar lo que no está roto”. Sólo deberíamos plantearnos el uso de tecnología que probablemente no conozcamos tan bien como las anteriores cuando realmente sea necesario. Llegado ese momento, adelante. Como siempre, la comunidad detrás del software libre es activa y está dispuesta a ayudar. Por si esto fuera poco, cada vez son más comunes las empresas que ofrecen servicios alrededor de este tipo de soluciones, y que pueden asesorarnos llegado el caso.

 

Seguimos con el segundo post de la serie, en el que pasamos a dar una descripción un poco más técnica de los componentes necesarios para poner en marcha todo lo descrito en el primero. No vamos a entrar en demasiado detalle. En todo caso, una vez conocidas las aplicaciones es más fácil buscar información en la red.

Aunque Solandra puede encargarse de la instalación de Cassandra, aquí vamos a usar los componentes por separado.

Cassandra
Cassandra es un tipo de base de datos creado siguiendo los principios propuestos por Dynamo (Amazon) y por BigTable (Google). El que quiera entrar en detalle tiene bibliografía y mucha documentación disponible.

Explicar el modelo de datos, la replicación o los niveles de consistencia va más allá del objetivo de este post. Lo más interesante en nuestro contexto es dejar claro que Cassandra es una base de datos completamente distribuida y descentralizada, en la que todas las máquinas del cluster cumplen el mismo y único rol, sin distinciones entre “maestros”, “esclavos”, “catálogos”, …. Esto significa que añadir capacidad a un cluster de Cassandra supone básicamente añadir más hierro. Nada más.

La instalación puede complicarse todo lo que queramos, pero lo básico es:

# Cuidado con la versión
cd /tmp/ && wget http://apache.rediris.es/cassandra/0.8.2/apache-cassandra-0.8.2-bin.tar.gz
cd /usr/local/ && tar xvzf /tmp/apache-cassandra-0.8.2-bin.tar.gz

A partir de aquí se crean los directorios de datos y logs (por ejemplo /var/lib/cassandra y /var/log/cassandra), y se adaptan los ficheros de configuración (que están en /usr/local/apache-cassandra-0.8.2/conf). El que más nos interesa ahora es cassandra.yaml. En github hay un ejemplo de configuración de este fichero, aunque sea casi por defecto, de cada uno de los tres nodos que he usado en esta prueba.

Hay un par de opciones de configuración que pueden servir para comprender la estructura del sistema. Un cluster de Cassandra se entiende como un anillo en el que cada uno de los nodos gestiona un volumen determinado de datos (replicación aparte). Básicamente estamos hablando de una serie de claves (hashes) que se distribuyen de una forma más o menos equilibrada entre todas las máquinas. Aunque no sea estrictamente necesario, como para esta prueba he usado un número fijo de nodos (3), he asignado ya desde el comienzo un 33% de datos a cada uno. Por supuesto, en un entorno real en el que se añaden y quitan nodos dinámicamente la gestión es diferente. La configuración en mi laboratorio es la siguiente:

# Nodo 1
initial_token: 0
# Nodo 2
initial_token: 56713727820156410577229101238628035242
# Nodo 3
initial_token: 113427455640312821154458202477256070485

La otra opción que merece la pena comentar es “seed_provider”. Cassandra usa un protocolo tipo Gossip para distribuir la información entre los nodos. Esto significa que, cuando se añade un nuevo miembro al cluster, es suficiente con indicarle un servidor activo del mismo. El protocolo se encarga de propagar esta nueva información en todos los nodos. Por lo tanto, la configuración se limita a especificar una (o varias) IPs activas:

seed_provider:
    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
      parameters:
          - seeds: 192.168.10.145

Hay mucha más opciones de configuración, por supuesto, pero lo dejamos aquí.

En este momento ya se podría ejecutar Cassandra, pero esperaremos a instalar Solandra.

Solandra
Hablemos antes de lo que es Solr, una vez más, muy muy por encima.

Solr es una plataforma de búsqueda construida sobre la librería Lucene. Simplificando mucho, y en el contexto de este post, ofrece un interfaz XML (es el que nos interesa aquí, pero no el único) para añadir documentos, y un API HTTP a través de cual recibir resultados en formato JSON (entre otros).

Todo se verá más claro cuando creemos el schema para la gestión de logs. Para el que quiera profundizar más en Solr, aquí tiene un libro.

Volvamos a Solandra. La instalación es sencilla. Una vez descargado el tar.gz desde github, y con ant y los binarios de java en el path, ejecutamos lo siguiente en cada uno de los nodos:

# El nombre del fichero cambia
cd /usr/local && tar xvzf /tmp/tjake-Solandra-4f3eda9.tar.gz
cd tjake-Solandra-4f3eda9/
ant -Dcassandra=/usr/local/apache-cassandra-0.8.2 cassandra-dist

Si todo va bien, en unos minutos la salida estándar mostrará lo siguiente:

....
cassandra-dist:
     [copy] Copying 36 files to /usr/local/apache-cassandra/lib
     [copy] Copying 8 files to /usr/local/apache-cassandra/conf
     [copy] Copying 1 file to /usr/local/apache-cassandra/bin
     [echo] Libraries successfully copied into cassandra distribution
     [echo] Start the cassandra server with /usr/local/apache-cassandra/bin/solandra command

BUILD SUCCESSFUL
Total time: 2 minutes 43 seconds

Durante la instalación deberían haberse copiado las librerías y scripts en el arbol de Cassandra. Incluyendo algunos ficheros de configuración, que en este caso dejamos por defecto, aunque lo normal sería que los adaptásemos.

Sin más, vamos a arrancar Solandra (y con ello Cassandra), en cada nodo. Como no hemos hecho ningún cambio en el logueo, vamos a ver todos los mensajes por la salida estándar:

cd /usr/local/apache-cassandra-0.8.2/bin/
./solandra &

Y con esto ya debería estar todo listo. El siguiente paso es añadir el schema (parecido a como se haría en un Solr estándar), volcar los datos, y preparar el interfaz web. Pero esto es cosa de un tercer post. No pensaba escribirlo, pero bueno, este ya es demasiado largo.

 

Allá por el 2008, Rackspace(Mailtrust) publicaba algunos datos sobre la forma en la que había ido evolucionado su sistema de gestión de logs de la infraestructura de correo electrónico, y que por aquel entonces ya superaba holgadamente los 100GB de crecimiento diario. Junto a este documento, en una de las referencias bibliográficas sobre Hadoop, esta misma empresa explicaba, con algo de detalle técnico, la forma en la que habían implementado su sistema, basado sobre todo en Hadoop y Lucene+Solr.

Aunque Hadoop siga siendo una solución magnífica para la gestión de logs en el contexto del Software Libre (siempre hablando de volúmenes de datos realmente muy grandes), en esta serie de posts vamos a ver cómo podemos llevar la idea de Rackspace a la práctica usando otro tipo de tecnología, y más concretamente, Cassandra.

En realidad, como mis tres lectores no quieren posts demasiado largos, en lugar de entrar en los detalles de lo que sería una implementación más o menos “casera”, vamos a usar una de las aplicaciones que Datastax (una empresa que da servicios comerciales para Cassandra) está potenciando como parte de su ecosistema alrededor de Cassandra, y que se llama Solandra.

El problema
Repasemos, simplificando un poco, la evolución de Rackspace:

  1. En una primera fase, los logs se almacenan en máquinas individuales. Cuando hay alguna incidencia, algún técnico tiene que entrar a hacer un grep. Si el negocio va bien, llegará un momento en el que el tiempo perdido haciendo estas búsquedas será, por lo menos, “crispante”.
  2. En la segunda fase, los logs pasan a gestionarse a través de un syslog centralizado. En realidad, esta no es más que una versión algo mejorada de la primera evolución, pero al menos facilita el trabajo. En cualquier caso, en el fondo se sigue perdiendo mucho tiempo en la búsqueda manual en logs.
  3. La solución más natural en este punto es volcar los datos a una base de datos, y con ello a algún tipo de interfaz web. Dejaremos a un lado el desarrollo del frontend y de los scripts que cargan los datos en la bbdd (que pueden no ser en absoluto triviales, en función de la complejidad de la plataforma).

Hasta aquí, vale, todo es razonablemente sencillo. Sin embargo, cuando se gestionan digamos que 25 millones de mensajes al día, y cuando se quieren mantener 2 años de información (es un decir), nos encontramos con un problema.

¿Cómo se soluciona?

Aquí ya cada uno toma decisiones en función de su capacidad, su presupuesto, los perfiles que tiene disponibles, …. En algunos casos, mantener 30 días en base de datos (lo que genera la mayoría de incidentes), puede ser suficiente. En otros casos, se trabaja con los mecanismos que ofrecen las bases de datos para escalar (el framework Gizzard de Twitter es un estupendo ejemplo, aunque no hablemos de logs). Y por último, algunos pasan a otras soluciones, ya sean de pago o libres. En el caso de Rackspace, por ejemplo, su opción fue Hadoop y Lucene+Solr.

Una vez más, cada una de estas opciones puede ser “la mejor” en función del entorno en el que se desarrolle. Pero claro, si quiero seguir con este post tengo que optar por la tercera alternativa, obviamente :) .

Vamos por partes:

  1. Queremos almacenar un volumen de datos muy significativo. Para unos pocos GB todo esto no tiene demasiado sentido.
  2. Queremos que lo único necesario para aumentar la capacidad sea añadir nuevo hardware. Nada más. Ni cambios en la programación, ni cambios en la arquitectura.
  3. Queremos poder hacer consultas complejas sobre estos logs, en base a origen, destino, rangos de fechas, …. Por ejemplo, sería estupendo poder consultar todo el spam enviado a cuentas tipo info@ en toda la plataforma en un periodo de tiempo concreto.
  4. Aunque el tiempo real no es un requerimiento, es preferible poder hacer estas consultas y recibir los resultados al momento, en una misma sesión de navegador.

Para conseguir los puntos 1 y 2 Hadoop es una solución estupenda. Para 3 y 4 es necesario más trabajo. El acceso “casi inmediato” a los datos se conseguiría con alternativas como HBase, tan de moda ahora que Facebook ha empezado a usarlo. Además, siempre disponemos de Pig y Hive para simplificar las consultas. Ahora bien, de una u otra manera, con Hadoop es bastante probable que tengamos que programar bastante código.

La otra alternativa viene de la mano de Cassandra. Una vez más, los puntos 1 y 2 son inmediatos con esta tecnología. Al igual que con Hadoop, 3 y 4 no lo son; pero gracias a la aplicación llamada Solandra, que no deja de ser un Solr que guarda sus índices en Cassandra, podemos conseguir la capacidad de búsqueda de Lucene, el interfaz tipo REST que ofrece Solr, y la escalabilidad de Cassandra. Todo en uno.

El post se ha alargado un poco. Dejamos la parte práctica para el segundo (y último) mensaje de esta serie.

 

Update: La versión de github del script ya es capaz de monitorizar n pids en ejecución y n programas que todavía no lo estén.

Para retomar un poco el blog, y como hace mucho que no escribo nada sobre SystemTap, me he planteado otro sencillo ejercicio “de repaso”.

La verdad es que en la actualidad ya hay un buen montón de artículos, documentación y ejemplos sobre lo que se puede hacer con SystemTap, ya sea para la inspección del kernel como para la de aplicaciones tipo mysql, postgresql o python. El rpm de Scientific Linux 6, por ejemplo, incluye más de 80 ejemplos de todo tipo, desde el análisis de procesos hasta el del tráfico de red.

Aún así, como digo, aquí va un ejemplo más.

El problema
Vamos a suponer que tenemos un proceso ejecutando en una máquina, y que queremos vigilar los ficheros que va abriendo.

Para ir un poco más lejos, queremos que se muestren también los ficheros abierto por los hijos de ese proceso principal (sólo un nivel por ahora).

Y ya puestos, queremos poder limitar la monitorización a los ficheros de ciertas rutas. Por ejemplo, podemos querer mostrar sólo los ficheros que se abrán en /etc, o en /lib.

Vamos, que el ejercicio es prácticamente una extensión de uno de los scripts de un post anterior de este mismo blog. Pero que le voy a hacer, sólo se me ocurren ejercicios sobre ficheros.

Resumiendo, y poniendo un ejemplo, tenemos un servidor postfix, con sus habituales procesos hijo:

master─┬─pickup
       │─qmgr
       └─tlsmgr

Queremos saber que ficheros abren todos los procesos que cuelgan de master, ya sean los tres que se están ejecutando en este momento, ya sean los nuevos que se vayan creando. Además, queremos poder limitar el número de ficheros vigilados a una serie de rutas determinadas.

La solución
El núcleo de la solución está en dos “probes”:

  • syscall.open.return
  • syscall.close.return

O sea, las llamadas al sistema open y close. Más concretamente, al momento en el que terminan de ejecutarse (return).

Además, vamos a usar el probe “begin” para leer argumentos, preparar variables y demás parafernalia.

Comenzamos:

probe begin {
        procesoArgumento = $1
        printf ("El pid que vamos a tratar es %d\n", procesoArgumento);
        if (argc > 1) {
                for (i=2; i <= argc; i++) {
                        ficherosEnPath[argv[i]] = argv[i]
                        numeroPaths += 1
                }
        }
}

Poca explicación hace falta. En $1 tenemos el primer argumento, que va a ser el pid a vigilar. A partir del segundo argumento, y de manera opcional, se pueden incluir una serie de rutas a monitorizar. Todas estas son llamadas válidas:

stap -v monitorFicheros.stp $(pidof master)
stap -v monitorFicheros.stp $(pidof master) /etc
stap -v monitorFicheros.stp $(pidof master) /etc /usr/lib

Una vez definido este bloque básico, pasamos a la llamada al sistema "open":

probe syscall.open.return {
        proceso = pid()
        padre = ppid()
        hilo = tid()
        ejecutable = execname()
        insertarEnTabla = 0

        if ( (procesoArgumento == proceso) || (procesoArgumento == padre) || (procesoArgumento == hilo) ) {
                if ( (procesoArgumento == proceso) && (env_var("PWD") != "") ) {
                        pwd = env_var("PWD")
                }
                localpwd = (isinstr(env_var("PWD"), "/"))?env_var("PWD"):pwd;

                filename = user_string($filename)
                descriptor = $return

                filename = (substr(filename, 0, 1) == "/")?filename:localpwd . "/" . filename;

                if ([proceso,padre,hilo,descriptor] in tablaProcesos)  {
                        printf ("{codigo: \"error\", proceso: \"%s\", pid: %d, ppid: %d, tid: %d, fichero: \"%s\", descriptor: %d}\n", ejecutable, proceso, padre, hilo, filename, descriptor)
                } else {
                        if (descriptor >= 0) {
                                if (numeroPaths > 0 ) {
                                        foreach (ruta in ficherosEnPath) {
                                                if (substr(filename, 0, strlen(ruta)) == ruta) {
                                                        insertarEnTabla = 1
                                                        break
                                                }
                                        }
                                }
                                if ( (insertarEnTabla == 1) || (numeroPaths == 0) ) {
                                        tablaProcesos[proceso,padre,hilo,descriptor] = gettimeofday_ms()
                                        tablaFicheros[proceso,padre,hilo,descriptor] = filename
                                        printf ("{codigo: \"open\", proceso: \"%s\", pid: %d, ppid: %d, tid: %d, fichero: \"%s\", descriptor: %d, date: %d}\n", ejecutable, proceso, padre, hilo, filename, descriptor, tablaProcesos[proceso,padre,hilo,descriptor])
                                }
                        }
                }
        }
}

A través de las funciones pid(), ppid() y tid() vemos si el proceso que está ejecutando open en este momento es el proceso que nos interesa o un hijo suyo.

Si cumple este requerimiento, pasamos al bloque que revisa si el fichero que se está abriendo está en el path que nos interesa. En este caso, para hacer el ejercicio más completo, he optado por dar unas cuantas vueltas "de más", y he usado la función env_var("PWD") para acceder al entorno del proceso. En la práctica esta no es la mejor forma, y por eso hay más control en el probe, ya que no siempre existe la variable PWD en el entorno de los procesos que llegan a este open.

La llamada al sistema open carga las variables $return (el valor de retorno de la función es el id del descriptor del fichero) y $filename (el nombre del fichero). Ojo! Cada llamada al sistema tiene unos argumentos, y con ello "ofrece" unas variables para nuestros scripts. Por ejemplo, mientras que en open tenemos filename, flags y mode; en close tenemos fd, filp, files, fdt y retval. El propio SystemTap, Google y el código fuente del kernel son ... vuestros amigos.

En cualquier caso, lo importante es que, cuando se dan todas las condiciones, hacemos lo siguiente:

tablaProcesos[proceso,padre,hilo,descriptor] = gettimeofday_ms()
tablaFicheros[proceso,padre,hilo,descriptor] = filename
printf ("{codigo: \"open\", proceso: \"%s\", pid: %d, ppid: %d, tid: %d, fichero: \"%s\", descriptor: %d, date: %d}\n", ejecutable, proceso, padre, hilo, filename, descriptor, tablaProcesos[proceso,padre,hilo,descriptor])

Con el printf escribimos la información del fichero abierto por salida estándar. Las dos tablas (tablaProcesos y tablaFicheros) sirven para guardar el momento en el que se ha abierto el fichero (con una precisión de milisegundos), y el nombre del fichero en sí mismo, con lo que lo tendremos más a mano cuando pasemos a la llamada al sistema close. Para identificar un fichero usamos el índice formado con [el proceso, el padre, el hilo y el descriptor] del fichero que se ha abierto.

Con toda la información en su sitio ya tenemos la primera parte del script. Seguimos:

probe syscall.close.return {
        proceso = pid()
        padre = ppid()
        hilo = tid()
        descriptor = $fd
        ejecutable = execname()

        if ( ( procesoArgumento == proceso ) || ( procesoArgumento == padre ) || (procesoArgumento == hilo ) ) {
                if ([proceso,padre,hilo,descriptor] in tablaProcesos) {
                        filename = tablaFicheros[proceso,padre,hilo,descriptor]
                        date = gettimeofday_ms() - tablaProcesos[proceso,padre,hilo,descriptor]
                        printf ("{codigo: \"close\", proceso: \"%s\", pid: %d, ppid: %d, tid: %d, fichero: \"%s\", descriptor: %d, date: %d}\n", ejecutable, proceso, padre, hilo, filename, descriptor, date)
                        delete tablaProcesos[proceso,padre,hilo,descriptor]
                        delete tablaFicheros[proceso,padre,hilo,descriptor]
                }
        }
}

Fácil. Si se está haciendo un close de uno de los ficheros monitorizados, se calcula el tiempo que ha estado abierto y se muestra por la salida estándar.

Un ejemplo de ejecución del monitor es el siguiente:

# stap -v monitorFicheros.stp $(pidof master)
Pass 1: parsed user script and 76 library script(s) using 96688virt/22448res/2744shr kb, in 120usr/10sys/133real ms.
Pass 2: analyzed script: 8 probe(s), 22 function(s), 7 embed(s), 8 global(s) using 251840virt/48200res/3952shr kb, in 420usr/120sys/551real ms.
Pass 3: using cached /root/.systemtap/cache/02/stap_02e370ac4942b188c248e7ec11ac8e2c_19586.c
Pass 4: using cached /root/.systemtap/cache/02/stap_02e370ac4942b188c248e7ec11ac8e2c_19586.ko
Pass 5: starting run.

El pid que vamos a tratar es 1082
{codigo: "open", proceso: "smtpd", pid: 5817, ppid: 1082, tid: 5817, fichero: "/etc/hosts", descriptor: 12, date: 1310920975000}
{codigo: "close", proceso: "smtpd", pid: 5817, ppid: 1082, tid: 5817, fichero: "/etc/hosts", descriptor: 12, date: 0}

Sorpresa! Los printf se formatéan como datos json, por si alguna vez quisiera hacer algo con node.js y aplicaciones como log.io.

En cuanto a los "date: 0", se debe a que la precisión de milisegundos es demasiado baja para estos ficheros y para esta prueba sin tráfico de correo de ningún tipo.

El código completo (en este post sólo falta la definición de variables globales) está accesible en github, como siempre.

Limitaciones y trabajo futuro
Como ya he comentado, este script está lejos de ser óptimo. Además de mi propia incapacidad, en algunos momentos he optado por dar más vueltas de las necesarias para usar así más funciones y variables.

Mi primer objetivo era permitir la monitorización de procesos que todavía no estuvieran en ejecución. En esta versión, sin embargo, el proceso principal sí tiene que estar ejecutando antes de lanzar el stap. Esto no es algo técnicamente complicado, pero no se hace solo, y no aporta demasiado desde el punto de vista conceptual al post, así que lo he dejado.

¿Y qué pasa si el pidof da más de un pid? Me habéis pillado. Por ahora sólo trato un pid.

Para el que esté interesado, SystemTap permite guardar la salida estándar en un fichero usando el parámetro -o. A partir de aquí es trivial que otro script perl, python, bash, node.js o lo que sea trabaje con él.

 

Ya lo he escrito en el post anterior: Node.js es un proyecto muy interesante, muy adecuado como backend en muchos entornos, y particularmente en aquellos que dependen en gran medida del rendimiento I/O. Como ejemplo, aquí va uno de los últimos casos de éxito.

En este post, además de volver a usar Node.js y Redis (otro proyecto cada vez más importante), voy a dar cuatro pinceladas sobre una de las nuevas tendencias que van apareciendo en la web: WebSockets.

Para completar el ejercicio, escribiré un mini-proyecto para graficar todo tipo de valores numéricos en un plasmoid de KDE. Bueno, en realidad en cualquier navegador con soporte WebSockets (la lista es reducida, como veremos más adelante). Así, pasaremos a un punto de vista más de administrador de sistemas que de desarrollador web.

¿Qué son los WebSockets?

Internet es cada vez más “web”. Las aplicaciones son cada vez más dinámicas, complejas e interactivas. Esto no es malo, obviamente, pero supone que mientras que los usuarios tienen más y más medios para interactuar con las webs, los administradores de estos entornos han visto como lo que antes se podía gestionar sin mayores problemas, ahora necesita todo tipo de caches e “inventos” sólo porque el sistema de votación de videos (por ejemplo) triplica la cantidad de accesos al sitio y a la base de datos.

Estos problemas no son, ni mucho menos, nuevos. Durante los últimos años se ha ido desarrollando mucha tecnología, para mejorar la experiencia de usuario, por supuesto, pero también para que la comunicación cliente/servidor sea más controlable y eficaz. Lo último: WebSockets.

Simplificando mucho, los WS básicamente ofrecen un canal full duplex entre el cliente y el servidor. De esta manera, en lugar de abrir una conexión web tradicional vía ajax (o similar) cada vez que se necesite un dato, el navegador crea un único canal permanente con el servidor, y lo usa tanto para enviar como para recibir datos. Esto último es importante, ya que el servidor puede usar la vía de comunicación activamente, por ejemplo para enviar un broadcast a todos sus clientes, y no sólo como respuesta ante una petición.

WS orientado a la administración de sistemas

Entonces, tenemos una nueva tecnología, muy orientada a la web, y por lo tanto muy fácil de usar. Además, ofrece algo que, aún sin ser tiempo real, es lo suficientemente ágil como para usarse para cosas como la monitorización de sistemas, que es precisamente el ejemplo con el que voy a seguir en este post.

Mi planteamiento inicial era hacer una pequeña aplicación que mostrase los datos de una base de datos rrd. Sin embargo, esto centraba el ejercicio demasiado en torno a la programación del módulo para Node.js, así que he preferido reutilizar los conceptos de otros posts anteriores, limitando el servidor WebSockets al envío de los datos que encuentre en una base de datos Redis.

Para hacerlo más interesante, el cliente será un plasmoid de KDE, aunque no deja de ser una página web normal y corriente, teniendo en cuenta que voy a usar el motor WebKit para su implementación (se pueden programar plasmoids en C, python, javascript, …)

Las limitaciones

El protocolo WebSockets todavía es un draft del IETF. Ya sólo teniendo esto en cuenta podemos decir que está “verde”.

En cuanto a navegadores, se necesita lo último de lo último en versiones (Opera, Firefox, Chrome, Safari…). Además, en el caso de Firefox (y Opera), el soporte, aunque presente, está deshabilitado por algunas lagunas en torno a la seguridad del protocolo cuando interactúa con un proxy. Ahora bien, para activarlo noy hay más que cambiar una clave (network.websocket.override-security-block).

Como ejemplo, para escribir este post he usado Ubuntu 10.10 y Fedora 14, con sus versiones respectivas de KDE, y con soporte para ejecutar plasmoids WebKit (cada distribución tiene su nombre de paquete, así que lo buscáis vosotros :-D ).

En la práctica, si queréis usar WebSockets en una aplicación “de verdad”, hoy en día os recomiendo usar algo como socket.io, que es capaz de detectar lo que el navegador soporta, y en función de eso usar WS o simularlo con flash.

Bien, empecemos con la aplicación.

El servidor

Para guardar los datos vamos a usar Redis, con un hash que vamos a llamar “datos”, y en el que guardaremos los valores a graficar. Traducido a perl:

my %datos;
$datos{todos} = 40;
$datos{server1} = 10;
$datos{server2} = 20;
$datos{server3} = 10;

Esto en Redis:

redis-cli hset datos todos 40
redis-cli hset datos server1 10
redis-cli hset datos server1 20
redis-cli hset datos server1 10

Para las pruebas he usado un bucle bash:

while true; do redis-cli hset datos todos $((10 + $RANDOM % 20)); sleep 1; done;
...

Existen varias formas de escribir un servidor WebSockets, pero pocas tan fácilies como Node.js y su módulo websocket-server. Tambien vamos a usar el módulo de Redis, así que:

npm install websocket-server
npm install hiredis redis

Una vez instalados los componentes, es el momento de escribir las 70 líneas que hacen falta para tener un servidor 100% funcional y capaz de gestionar tantos gráficos como queramos:

var net = require('net');
var sys = require("sys");
var ws = require("websocket-server");
var redis = require("redis");
var puertoredis = 6379;
var hostredis = 'localhost';

var server = ws.createServer({debug: true});
var client = redis.createClient(puertoredis,hostredis);
var conexion;
var mensaje;

client.on("error", function (err) {
	sys.log("Redis  error en la conexion a: " + client.host + " en el puerto: " + client.port + " - " + err);
});

server.addListener("connection", function(conn){
	var graficomostrado = "";
	var servidores = [];
	sys.log("Conexion abierta: " + conn.id);
	client.hkeys("datos", function (err, replies) {
		replies.forEach(function (reply, i) {
			sys.log("Respuesta: " + reply);
			servidores.push(reply);
		});
		graficomostrado = servidores[0];
		mensaje = {"titulo": graficomostrado, "todos": servidores};
		conn.send(JSON.stringify(mensaje));
	});
	conn.addListener("message", function(message){
		var dato = JSON.parse(message);
		if (dato) {
			if (dato.orden == "changeGraphic") {
				graficomostrado = dato.grafico;
				conn.send(JSON.stringify({changedGraphic: graficomostrado}));
				client.hget("datos", graficomostrado, function (err, reply) {
					mensaje = {"inicio":[(new Date()).getTime(), reply]};
					conn.send(JSON.stringify(mensaje));
				});
			} else if (dato.orden == "getData") {
				client.hget("datos", graficomostrado, function (err, reply) {
					mensaje = {"actualizacion":[(new Date()).getTime(), reply]};
					conn.send(JSON.stringify(mensaje));
				});
			}
		}
	});
});
server.addListener("error", function(){
	sys.log("Error de algun tipo en la conexion");
});
server.addListener("disconnected", function(conn){
	sys.log("Desconectada la sesion " + conn.id);
});
server.listen(80);

Así de fácil es. Cuando un cliente establezca una conexión se llamará a:

server.addListener("connection", function(conn) {....

En este ejemplo, el trabajo fundamental de este bloque es enviar al cliente el listado de datos disponibles vía JSON:

....
mensaje = {"titulo": graficomostrado, "todos": servidores};
conn.send(JSON.stringify(mensaje));

A partir de aquí, la conexión está establecida. Cada vez que el cliente la use se llamará a:

conn.addListener("message", function(message) {....

En este caso, el servidor atenderá las peticiones de cambio de gráfico y de actualización de gráfico:

var dato = JSON.parse(message);
if (dato.orden == "changeGraphic") {
	graficomostrado = dato.grafico;
	conn.send(JSON.stringify({changedGraphic: graficomostrado}));
	client.hget("datos", graficomostrado, function (err, reply) {
		mensaje = {"inicio":[(new Date()).getTime(), reply]};
		conn.send(JSON.stringify(mensaje));
	});
} else if (dato.orden == "getData") {
	client.hget("datos", graficomostrado, function (err, reply) {
		mensaje = {"actualizacion":[(new Date()).getTime(), reply]};
		conn.send(JSON.stringify(mensaje));
	});
}

Y con hacer que el servidor escuche en el puerto 80… ¡A correr!

server.listen(80);

El cliente

El código del servidor y del cliente está disponible en Github, así que voy a limitar la explicación a lo fundamental, dejando a un lado el uso de Jquery, Jquery-ui o de la excelente librería Highcharts, que he usado a pesar de no ser completamente libre, porque me ha permitido terminar el ejemplo mucho más rápido.

Una aplicación que quiere usar WebSockets debe comprobar si el navegador lo permite:

if (!("WebSocket" in window))   {
  // No tiene soporte websockets, deshabilitamos botones.
  $('#containergraficos').empty();
  $('#containergraficos').html('<p>Sin soporte WebSockets.</p>');
  $("#conectar").button({ disabled: true });
} else {
  // Sí tiene soporte websockets
 socket = new WebSocket(host);
}

A partir de aquí, sólo queda reaccionar ante cada evento:

socket.onopen = function(){
   .......
}
socket.onmessage = function(msg){
   // código íntegro en github
   var resultado = JSON.parse(msg.data);
   if (resultado.inicio) {
      var datos = [];
      datos.push({x: parseInt(resultado.inicio[0]), y: parseInt(resultado.inicio[1])});
      options.series[0].data = datos;
      chart = new Highcharts.Chart(options);
   } else if (resultado.actualizacion) {
      chart.series[0].addPoint({x: parseInt(resultado.actualizacion[0]), y: parseInt(resultado.actualizacion[1])});
   } else if (resultado.changedGraphic) {
      $("#grafico").button("option", "label", resultado.changedGraphic);
   } else if (resultado.titulo) {
      $("#grafico").button("option", "label", resultado.titulo);
      var mensaje = {"orden": "changeGraphic", "grafico": resultado.titulo};
      socket.send(JSON.stringify(mensaje));
   }
}
socket.onclose = function(){
   ...
}
socket.onerror = function() {
   ...
}

En definitiva, unos pocos mensajes JSON de un lado para otro del socket.

Además, cada 5 segundos vamos a hacer que Highcharts pida datos nuevos:

events: {
   load: function() {
      var series = this.series[0];
      setInterval(function() {
         if (socket) {
            var mensaje = {"orden": "getData", "grafico": grafico};
            socket.send(JSON.stringify(mensaje));
         } else {
            var x = (new Date()).getTime();
            series.addPoint([x, 0], true, true);
         }
      }, 5000); //5 segundos
   }
}

El plasmoid

Evidentemente, hasta ahora no hemos hecho nada que no se pueda ejecutar en un navegador. Ahora lo empaquetaremos en un plasmoid, aunque no deja de ser más que un zip con todo el contenido y un fichero metadata.desktop con la descripción del propio plasmoid, que entre otros inlcuye:

[Desktop Entry]
Name=Forondarenanet
Comment=Pruebas para forondarena.net
Icon=chronometer
Type=Service
X-KDE-ServiceTypes=Plasma/Applet
X-Plasma-API=webkit
X-Plasma-MainScript=code/index.html
X-Plasma-DefaultSize=600,530

KDE tiene una utilidad para probar los plasmoids sin instalarlos, llamada “plasmoidviewer”. Una vez probado, se puede instalar con:

plasmapkg -i forondarenanet.plasmoid

Recordad que el fichero no es más que un zip. En este caso, la estructura es la siguiente:

./metadata.desktop
./contents
./contents/code
./contents/code/css
./contents/code/css/images
./contents/code/css/images/ui-bg_gloss-wave_16_121212_500x100.png
...
./contents/code/css/images/ui-icons_cd0a0a_256x240.png
./contents/code/css/jquery-ui-1.8.9.custom.css
./contents/code/css/forondarenanet.css
./contents/code/index.html
./contents/code/js
./contents/code/js/jquery-1.4.4.min.js
./contents/code/js/highcharts.js
./contents/code/js/forondarenanet.js
./contents/code/js/jquery-ui-1.8.9.custom.min.js
./contents/code/img

Y poco más. Para terminar con un ejemplo, cuatro plasmoids mostrando gráficos diferentes:


Que por supuesto, sólo generan 4 conexiones, a pesar de estar actualizándose cada 5 segundos:

# netstat -nt
Active Internet connections (w/o servers)
Proto Recv-Q Send-Q Local Address               Foreign Address             State
tcp        0      0 192.168.10.136:36903        192.168.10.131:80           ESTABLISHED
tcp        0      0 192.168.10.136:36904        192.168.10.131:80           ESTABLISHED
tcp        0      0 192.168.10.136:36905        192.168.10.131:80           ESTABLISHED
tcp        0      0 192.168.10.136:36906        192.168.10.131:80           ESTABLISHED
 

Llevaba tiempo, la verdad, queriendo escribir un post sobre una de esas nuevas herramientas que aparecen de vez en cuando, que prometen un montón de cosas, pero que en realidad nunca sabemos hasta dónde van a llegar. En este caso me estoy refiriendo a node.js.

Buscando algo que programar se me ha ocurrido escribir un sencillo servidor popbeforesmtp con node.js. La verdad es que resulta algo paradógico usar algo tan nuevo para esto, cuando pbs (vamos abreviando) es uno de los sistemas de autentificación más odiados del mundo del correo electrónico, propio de una época en la que el spam casi ni existía (buff!!). Como decía, no es más que una excusa para hacer algo “práctico” con node.js.

Los tres lectores de mi blog saben lo que es pbs, pero probablemente no hayan oido hablar sobre node.js. A ver si sacamos algo en claro de todo esto.

El problema
Digamos que tenemos un sistema con unos cuantos servidores SMTP que usamos para enviar correo. Aunque normalmente usamos algún tipo de validación ESMTP, tenemos que mantener la compatibilidad con algunos clientes de correo que han venido enviando correo sin ningún problema sólo a través de validación POP.

Pop before smtp permite que un cliente de correo pueda enviar un mensaje una vez se haya validado vía POP. El servidor SMTP, por lo tanto, debe ser capaz de guardar un histórico de conexiones y de permitir envíos, normalmente durante unos pocos minutos, sin pedir ningún otro tipo de credencial.

Ya tenemos todos los elementos que necesitamos sobre la mesa. Veamos:

  • Un servidor SMTP sobre el que implementar “el invento”.
  • Un servidor POP desde el que obtener la información de validación.
  • Una aplicación que relacione ambos mundos, y que sea capaz de funcionar tanto en una infraestructura de una máquina como en clusters de varios servidores POP y SMTP (con todo lo que esto significa).

Un servidor SMTP
Actualmente tenemos en el mercado open source cuatro tres servidores SMTP que podemos usar:

  • Postfix
  • Exim
  • Sendmail
  • Qmail

Cada uno usa sus propios mecanismos de auntentificación. Para este ejemplo me voy a centrar en Postfix.

Postfix tiene muchos controles “nativos” para permitir o denegar envíos de correo. Se pueden establecer políticas de acceso en base a IPs origen, cuentas origen, destino, helo, contenido, …. De hecho, también se pueden escribir reglas compuestas que combinen las anteriores.

Sin embargo, para pbs necesitamos mantener un listado de IPs para las que se permiten envíos(fácil), pero que expiren cada x minutos.

Para estos casos más elaborados Postfix ofrece lo que se llama access policy delegation, y que no es más que un sencillo API para conectarse con una aplicación externa que se encargará de tomar las decisiones de validación.

Dicho de otra forma, Postfix enviará a un socket (UNIX o TCP) la información de la sesión SMTP que se quiere comprobar, y recibirá por ese mismo socket el “OK” (o no) correspondiente.

El software que vamos a escribir, por lo tanto, va a recibir esto por un socket:

request=smtpd_access_policy
protocol_state=RCPT
protocol_name=ESMTP
client_address=192.168.10.131
client_name=fn131.example.org
reverse_client_name=fn131.example.org
helo_name=example.org
sender=patata@example.org
recipient=nospam@example.com
......

A lo que tiene que responder generalmente o “OK”, o “reject”, o “dunno”. [1]

Un servidor POP
El que más nos guste (Dovecot, Courier, …). El único requerimiento es que escriba en un log parseable los login de usuarios.

Una aplicación encargada de la política
Aquí está lo interesante de este post. Veamos lo que necesitamos:

  • Una aplicación, en nuestro caso un servidor TCP, que reciba las conexiones desde Postfix y entregue las respuestas.
  • Una base de datos de algún tipo que almacene la información. A ser posible, debe poder distribuirse entre muchos servidores SMTP de forma transparente.
  • Un procesador de logs POP capaz de insertar registros en la base de datos que usará la aplicación.

Empecemos por la base de datos. Estamos hablando básicamente de “algo” sencillo, en lo que guardar un listado de IPs, y que además auto-expire los registros pasados unos minutos. Además, debe ser lo más rápida posible y permitir un gran número de conexiones simultáneas.

Vaya, si estamos hablando de….Memcached.

Pros:

  • Estable. Muchos años en entornos muy importantes.
  • Perfecta para guardar claves (IPs) y valores (cualquier cosa en este caso. El rcpt nos podría venir bien).
  • Expira automáticamente los registros antiguos.
  • Tiene interfaces perl, python, ….
  • Rápida.

Cons:

  • ¿Cómo usamos una base de datos común en un cluster?
  • ¿Perdemos la información de logins ante un fallo o reinicio de la aplicación?

Por lo tanto, tenemos dos problemas. Memcache no está especialmente pensado para funcionar en “modo cluster”, sincronizando su contenido en n servidores; y además pierde el contenido cuando se reinicia. O sea, si tenemos que reiniciar Memcached perdemos los datos de usuarios que ya se han validado (aunque sólo sea cosa de una media hora).

Obviamente, las ventajas superan a los inconvenientes. Sin embargo, en el “mercado” tenemos una aplicación que, además de tener los pros, supera los cons: Redis, que permite un modelo master/slave, y además permite volcar periódicamente la información de memoria a disco para su recuperación en caso de caida. Es más, permite tener una especie de redo log en disco.

Bien, ya tenemos el almacén de información. ¿Cómo añadimos los datos? Usaremos cualquier aplicación de las muchas que permiten ejecutar una acción en función de haber encontrado un patrón en un log. Un ejemplo en perl es mailwatch, pero se puede usar cualquier otra. En el fondo, lo único que hay que hacer es conectarse al nodo master de Redis, y después ejecutar el comando “SETEX <host_validado> <segundos> <rcpt>”.

Bien, sólo nos queda la aplicación. Necesitamos que:

  • Escuche en un puerto TCP.
  • Esté muy orientado a eventos.
  • Tenga un muy buen rendimiento.
  • Sea sencillo de programar.

Y aquí es donde aparece node.js.

node.js
Hay algo indudable en la informática de hoy en día: Cada vez está más orientada a la web. En este entorno se usan muchos lenguajes, desde php a .net; pero si hay uno que ha usado “casi” todo el mundo, ese es javascript.

Node.js es una forma de llevar javascript fuera del contexto de los navegadores web. Básicamente es una especie de wrapper para V8, el compilador de javascript de Google. Y esto es algo muy importante, porque estamos hablando de uno de los compiladores más avanzados que hay hoy en día. Evidentemente node.js no consigue el rendimiento de C, pero supera con cierta facilidad a perl o python, todo además sin perder la sencillez de javascript.

Node.js, además de usar V8, implementa algunas de las funcionalidades que hacen falta para ejecutarse fuera de un navegador, como la manipulación de datos binarios.

La otra característica de node es que está completamente orientado a eventos, y por lo tanto necesita un cierto cambio de chip por parte del programador. En node.js casi nada es síncrono: Cuando queremos acceder a un fichero, por ejemplo, en lugar de esperar a que el disco se sitúe en la posición adecuada y lea los datos, seguimos con la ejecución del programa y con el resto de eventos, hasta que los datos estén disponibles. Según Yahoo, gracias a este modelo un único procesador Xeon a 2.5GHz ejecutando un proxy inverso puede servir cerca de 2100 peticiones por segundo [2].

Dejo la instalación de node.js (y de Redis, y de mailgraph, …) para el lector. Veamos el código de nuestro servidor:

Empezamos por la configuración de Postfix, en /etc/postfix/main.cf:

smtpd_recipient_restrictions = .... check_policy_service inet:127.0.0.1:10000 ....

Nuestro servidor va a escuchar en localhost y en el puerto 10000.

Y ahora vamos con el código, en sus 67 líneas, aunque para ser correctos, falta la mayoría de la gestión de errores:

var net = require('net');
var sys = require("sys");
var redis = require("redis");

var puertoplcy = 10000;
var hostplcy = 'localhost';

var puertoredis = 6379;
var hostredis = 'localhost';

Hemos definido las variables y las librerías que vamos a usar, incluyendo una “third party” para acceso a Redis que hemos instalado con (npm install redis).

var server = net.createServer(function (stream) {

        var client = redis.createClient(puertoredis,hostredis);

        client.on("error", function (err) {
                sys.log("Redis  error en la conexion a: " + client.host + " en el puerto: " + client.port + " - " + err);
        });

        stream.setEncoding('utf8');
        stream.setNoDelay(true);

        stream.on('connect', function () {
                sys.log("Conexion nueva.");
        });

        stream.on('data', function (data) {
                sys.log("Correo entrante:\n" + data);
                var clientaddr, tmpclientaddr;
                var rcpt, tmprcpt;

                var datos = data.split('\n');
                for (var i = 0; i < datos.length; i++ ) {
                        if (datos[i] && datos[i].match(/client_address=/) ) {
                                tmpclientaddr = datos[i].split("=");
                                clientaddr = tmpclientaddr[1].trim();
                                sys.log("Extraido el client address: " + clientaddr);
                        } else if (datos[i] && datos[i].match(/recipient=/) ) {
                                tmprcpt = datos[i].split("=");
                                rcpt = tmprcpt[1].trim();
                                sys.log("Extraido el rcpt to: " + rcpt);
                        }
                }

                client.get(clientaddr, function (err, reply) {
                        if (err) {
                                sys.log("Get error: " + err);
                        }

                        if (reply) {
                                sys.log("Se ha encontrado el host: " + clientaddr + " que ha usado: " + reply);
                                stream.write('action=ok\n\n');
                        } else {
                                sys.log("El host: " + clientaddr + " con el usuario: " + rcpt + " no se ha validado");
                                stream.write('action=reject 550 Relay denegado.\n\n');
                        }
                });
        });

        stream.on('end', function () {
                sys.log("Fin de la conexion");
                stream.end();
        });

});

Este es el servidor, con un poco de logueo por consola que en condiciones normales se debería quitar. Hemos definido eventos para conexiones nuevas, para cuando entran datos nuevos, ...; tenemos un callback para cuando leemos datos de Redis, .....

Al final, si tenemos el dato devolvemos un "action=ok\n\n", y si no lo tenemos pasamos un "action=reject 550 Relay denegado.\n\n", aunque esto puede variar en función de la configuración de Postfix.

Y por último, hacemos que el servidor escuche en el puerto 10000 de localhost:

server.listen(puertoplcy, hostplcy);

Y ya está. Todo esto lo guardamos en un fichero (pbs_filter.js por ejemplo), y lo ejecutamos desde una shell:

   /usr/local/bin/node pbs_filter.js

Bueno, para terminar, una imagen del planteamiento:

En definitiva, node.js es otro más de esos proyectos que aparecen periódicamente y que cogen cierta fuerza. Esto no es nada nuevo; hay muchos proyectos abandonados que tuvieron "su momento", pero creo que tiene ciertas características que lo hacen interesante. El tiempo dirá si tiene futuro.


[1] postfix tiene muy buena documentación. RTFineM
[2] Node.js, igual que Memcached y otros, no es multi-hilo. En el mismo enlace de Yahoo hay una interesante discusión sobre el uso de varios cores.

 

Antes de que me adelanten, y sobre todo por las durísimas presiones por parte de las tres personas que leen este blog :-) , voy a terminar esta serie de posts, hablando sobre una de las herramientas que se han implementado para dar un mayor nivel de abstracción (o facilidad de uso, como se prefiera decir) a Hadoop.

Hadoop tiene un problema, o al menos un argumento que se usa en su contra: Desarrollar una aplicación MapReduce lleva demasiado tiempo. A pesar de que el Streaming del que hablamos en posts anteriores lo solucione en parte, es cierto que si queremos trabajar de forma “nativa” con el framework tenemos que estar preparados para escribir un buen puñado de líneas de código.

Para dar salida este problema han ido apareciendo distintas alternativas, entre las que destacan Hive (by Facebook) y Pig (by Yahoo). En este post me voy a centrar en esta última, aunque esto no significa, ni mucho menos, que sea mejor que Hive.

Gracias a Pig se pueden escribir “en 4 lineas” aplicaciones MapReduce. Dicho de otra forma, permite que nos centremos en el qué queremos hacer con nuestros datos, y no en el cómo lo hacemos. Se basa en dos componentes:

  • Un sencillo lenguaje para escribir los scripts, llamado “pig latin”.
  • El “”compilador”", que trasforma los scripts en aplicaciones MapReduce para su ejecución en el cluster Hadoop.

Junto a esto, Pig tambén ofrece un shell interactivo (se llama grunt), para poder ejecutar comandos.

Bien, terminada la teoría (fácil, ¿no?), vamos a centrarnos en los datos, tal y como hemos dicho antes. (Para el que no haya leido los posts anteriores, este sería un buen momento).

Hasta ahora hemos estado preparando nuestra plataforma de logueo para poder disponer de años, y quizá terabytes, de logs en un mismo lugar, barato y redundado, del que poder sacar información útil en unos pocos minutos. Recordemos:

  1. Los logs de todos nuestros servidores se consolidan en una/varias máquinas syslog (o similar).
  2. Los logs se copian tal cual en Hadoop. A pesar de que vayan a estar replicados, no es mala idea tener una copia extra en cinta.
  3. Ejecutamos un primer script MapReduce que lee los scripts y los adapta al formato que queremos y sobre el que trabajaremos.
  4. Ya no necesitamos los ficheros originales, los podemos borrar de Hadoop. En caso de desastre, siempre podríamos recurrir a las cintas.

¿Cómo guardábamos los logs?

El resultado del reducer.pl son líneas como esta:

010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com mail.example.org[10.0.2.34],15853,1268223532,5A4FE29B3B6,43A9B28B357,Mar 10 12:18:52 destinovarios@example.com destinovarios@example.com Antispam sent (250 OK sent queued as 43A9B28B357

Recordad que el script guarda mucha información formateada en base64, por lo que en realidad tenemos algo similar a esto:

2010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com texto_en_base64

¿Por qué?

Porque todas las consultas que se suelen hacer son del tipo:

  • No he recibido un mensaje enviado entre las x y las z del martes, enviado desde esta cuenta, y con destino a esta otra.
  • Quiero todos los mensajes que se recibieron en tal buzón.

La estructura de logs que hemos preparado va a permitir que los scripts sean muy sencillos, sin ningún tipo de join o similar (ahora lo vemos). Dejaremos al interfaz web desde el que se van a ver los resultados el peso de formatear las entradas y hacerlas “visuales”.

¿Interfaz web?

Nada mejor que un sencillo interfaz web en el que escribir los parámetros de búsqueda, preparar el script .pig, ejecutarlo, y mostrar los resultados una vez termine. Evidentemente, no voy a escribir aquí el formulario, pero por dar un ejemplo, imaginemos que queremos saber los correos que han llegado desde infojobs entre las 8 a.m. del 11 de marzo, y las 12 a.m. del mismo día.

Empezamos pasando las fechas a timestamp. Muy fácil:

$ date -u -d 'Thu Mar 11 08:00:00 2010' '+%s'
1268294400

$ date -u -d 'Thu Mar 11 12:00:00 2010' '+%s'
1268308800

Con esto y con la dirección origen ya tenemos todo lo necesario. Ahora lo normal sería escribir un fichero .pig, pero aquí vamos a usar el intérprete de comandos grunt para ver los resultados paso a paso:

$ pig
10/05/25 21:57:29 INFO pig.Main: Logging error messages to: /usr/local/pig-0.7.0/pig_1274817449369.log
2010-05-25 21:57:29,688 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://fn140
2010-05-25 21:57:30,149 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: fn140:8021
grunt>

Empezamos cargando los datos:

grunt> REGISTROS = LOAD '/user/hadoop/logs_formateados' USING PigStorage('\t') AS (msgid:chararray, fecha:long, origen:chararray, destinoreal:chararray, destinoentrada:chararray, resto:chararray);

Con este comando hemos cargado los datos desde el directorio /user/hadoop/logs_formateados en la variable REGISTROS. Los ficheros de entrada se separan con un tabulador [1], con el schema que hemos venido usando. Veamos, antes de seguir, si Pig ha entendido lo que realmente queremos:

grunt> ILLUSTRATE REGISTROS
2010-05-25 23:58:36,627 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://fn140
2010-05-25 23:58:36,681 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: fn140:8021
2010-05-25 23:58:36,791 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2010-05-25 23:58:36,792 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| REGISTROS     | msgid: chararray                                                    | fecha: long | origen: chararray   | destinoreal: chararray | destinoentrada: chararray | resto: chararray |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|               | 6B160A8CF29E064XA067E116FA2A458CAD8A66@GLOBL0310V01.exampleserv.com | 1268304751  | pr1user@example.com | mga@example.net        | mga@example.net           | texto_base64     |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Ahora que tenemos todos los registros, vamos a filtrarlos en base a los criterios que hemos definido:

grunt> FILTRADOS = FILTER REGISTROS BY (origen matches '.+infojobs.+') AND (fecha > 1268294400) AND (fecha < 1268301600);

Hecho esto sólo nos queda agruparlos por msgid. Recordad, tenemos dos entradas por mensaje: una para el nivel antispam, y otra para la entrega. En realidad, en base al msgid podríamos seguir un mensaje por tantos saltos como tuviéramos (algo parecido a lo que Mailtrust ha documentado que hace):

grunt> AGRUPADOS = GROUP FILTRADOS BY msgid;

Y con esto ya tenemos el resultado. Lo podríamos confirmar con el comando DUMP, que muestra los datos por salida estándar, pero lo vamos a guardar directamente en un fichero:

grunt> STORE AGRUPADOS INTO '/tmp/fichero_resultados.txt' USING PigStorage();
2010-05-26 22:17:03,942 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_LONG 2 time(s).
2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.impl.logicalLayer.optimizer.PruneColumns - No column pruned for REGISTROS
2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.impl.logicalLayer.optimizer.PruneColumns - No map keys pruned for REGISTROS
2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.ResourceSchema - Insert two-level access to Resource Schema
2010-05-26 22:17:03,983 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_LONG 2 time(s).
2010-05-26 22:17:03,985 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: Store(hdfs://fn140/tmp/fichero_resultados.txt:PigStorage) - 1-1007 Operator Key: 1-1007)
2010-05-26 22:17:03,986 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2010-05-26 22:17:03,986 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2010-05-26 22:17:04,019 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
2010-05-26 22:17:05,180 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
2010-05-26 22:17:05,188 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
2010-05-26 22:17:05,191 [Thread-120] WARN  org.apache.hadoop.mapred.JobClient - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2010-05-26 22:17:05,545 [Thread-120] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2010-05-26 22:17:05,546 [Thread-120] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
2010-05-26 22:17:06,382 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201005262134_0008
2010-05-26 22:17:06,382 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://fn140:50030/jobdetails.jsp?jobid=job_201005262134_0008
2010-05-26 22:17:06,384 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
2010-05-26 22:17:19,214 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 10% complete
2010-05-26 22:17:20,719 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 20% complete
...............
2010-05-26 22:17:28,241 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 60% complete
2010-05-26 22:17:41,788 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2010-05-26 22:17:41,788 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Successfully stored result in: "hdfs://fn140/tmp/fichero_resultados.txt"
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Records written : 72
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Bytes written : 44727
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Spillable Memory Manager spill count : 0
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Proactive spill count : 0
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

Vaya, parece que tenemos 72 usuarios suscritos a infojobs. Veamos un ejemplo de lo que se ha escrito en "/tmp/fichero_resultados.txt" (es un path de HDFS, no el /tmp local). Los saltos de línea los he puesto yo para que sea más visual, y he escrito la versión sin codificar en base64:

1268304853778.112.POCFQELSVS@[hellboy07]
{
    (1268304853778.112.POCFQELSVS@[hellboy07],1268308417,aviso_incripciones@push.infojobs.net,usuario@example.com,usuario@example.com,localhost[127.0.0.1],11854,1268308418,9ABC12FACB4,-1,Mar 11 11:53:38 usuario@example.com usuario@example.com Maildrop sent (delivered via maildrop service))
,
    (1268304853778.112.POCFQELSVS@[hellboy07],1268308417,aviso_incripciones@push.infojobs.net,usuario@example.com,usuario@example.com,push3.infojobs.net[79.171.25.71],11107,1268308418,3B0213FAECD,9ABC12FACB4,Mar 11 11:53:38 usuario@example.com usuario@example.com Antispam sent (250 OK queued as 9ABC12FACB4)
}

Ahora sólo necesitamos que nuestro interfaz web formatee el resultado y lo saque por pantalla. Fácil y sencillo.

Resumiendo, con estas cuatro líneas podemos hacer cualquier búsqueda en los logs, tengan el tamaño que tengan:

REGISTROS = LOAD '/user/hadoop/logs_formateados' USING PigStorage('\t') AS (msgid:chararray, fecha:long, origen:chararray, destinoreal:chararray, destinoentrada:chararray, resto:chararray);
FILTRADOS = FILTER REGISTROS BY (origen matches '.+infojobs.+') AND (fecha > 1268294400) AND (fecha < 1268301600);
AGRUPADOS = GROUP FILTRADOS BY msgid;
STORE AGRUPADOS INTO '/tmp/fichero_resultados.txt' USING PigStorage();

Referencias

Hay muchos artículos y documentación disponible en la red sobre Hadoop. En cuanto a bibliografía, que yo sepa hay al menos tres libros disponibles:

  • Pro Hadoop, de la editorial Apress.
  • Hadoop: The Definitive Guide, de la editorial O'Reilly.
  • Hadoop in Action, de la editorial Manning.

[1]: La verdad es que he hecho trampa. El reducer original separaba el id de mensaje del resto de datos con un tabulador, pero luego usaba un espacio simple. Esto es porque al principio tenía pensado trabajar otros aspectos de Hadoop y Pig, como el Chaining y las "user defined functions".
Al final, para no tener que escribir más posts, he cambiado el reducer para que separe todos los campos con tabuladores.

© 2012 forondarenanet Suffusion theme by Sayontan Sinha