foron

 

Antes de que me adelanten, y sobre todo por las durísimas presiones por parte de las tres personas que leen este blog :-) , voy a terminar esta serie de posts, hablando sobre una de las herramientas que se han implementado para dar un mayor nivel de abstracción (o facilidad de uso, como se prefiera decir) a Hadoop.

Hadoop tiene un problema, o al menos un argumento que se usa en su contra: Desarrollar una aplicación MapReduce lleva demasiado tiempo. A pesar de que el Streaming del que hablamos en posts anteriores lo solucione en parte, es cierto que si queremos trabajar de forma “nativa” con el framework tenemos que estar preparados para escribir un buen puñado de líneas de código.

Para dar salida este problema han ido apareciendo distintas alternativas, entre las que destacan Hive (by Facebook) y Pig (by Yahoo). En este post me voy a centrar en esta última, aunque esto no significa, ni mucho menos, que sea mejor que Hive.

Gracias a Pig se pueden escribir “en 4 lineas” aplicaciones MapReduce. Dicho de otra forma, permite que nos centremos en el qué queremos hacer con nuestros datos, y no en el cómo lo hacemos. Se basa en dos componentes:

  • Un sencillo lenguaje para escribir los scripts, llamado “pig latin”.
  • El “”compilador”", que trasforma los scripts en aplicaciones MapReduce para su ejecución en el cluster Hadoop.

Junto a esto, Pig tambén ofrece un shell interactivo (se llama grunt), para poder ejecutar comandos.

Bien, terminada la teoría (fácil, ¿no?), vamos a centrarnos en los datos, tal y como hemos dicho antes. (Para el que no haya leido los posts anteriores, este sería un buen momento).

Hasta ahora hemos estado preparando nuestra plataforma de logueo para poder disponer de años, y quizá terabytes, de logs en un mismo lugar, barato y redundado, del que poder sacar información útil en unos pocos minutos. Recordemos:

  1. Los logs de todos nuestros servidores se consolidan en una/varias máquinas syslog (o similar).
  2. Los logs se copian tal cual en Hadoop. A pesar de que vayan a estar replicados, no es mala idea tener una copia extra en cinta.
  3. Ejecutamos un primer script MapReduce que lee los scripts y los adapta al formato que queremos y sobre el que trabajaremos.
  4. Ya no necesitamos los ficheros originales, los podemos borrar de Hadoop. En caso de desastre, siempre podríamos recurrir a las cintas.

¿Cómo guardábamos los logs?

El resultado del reducer.pl son líneas como esta:

010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com mail.example.org[10.0.2.34],15853,1268223532,5A4FE29B3B6,43A9B28B357,Mar 10 12:18:52 destinovarios@example.com destinovarios@example.com Antispam sent (250 OK sent queued as 43A9B28B357

Recordad que el script guarda mucha información formateada en base64, por lo que en realidad tenemos algo similar a esto:

2010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com texto_en_base64

¿Por qué?

Porque todas las consultas que se suelen hacer son del tipo:

  • No he recibido un mensaje enviado entre las x y las z del martes, enviado desde esta cuenta, y con destino a esta otra.
  • Quiero todos los mensajes que se recibieron en tal buzón.

La estructura de logs que hemos preparado va a permitir que los scripts sean muy sencillos, sin ningún tipo de join o similar (ahora lo vemos). Dejaremos al interfaz web desde el que se van a ver los resultados el peso de formatear las entradas y hacerlas “visuales”.

¿Interfaz web?

Nada mejor que un sencillo interfaz web en el que escribir los parámetros de búsqueda, preparar el script .pig, ejecutarlo, y mostrar los resultados una vez termine. Evidentemente, no voy a escribir aquí el formulario, pero por dar un ejemplo, imaginemos que queremos saber los correos que han llegado desde infojobs entre las 8 a.m. del 11 de marzo, y las 12 a.m. del mismo día.

Empezamos pasando las fechas a timestamp. Muy fácil:

$ date -u -d 'Thu Mar 11 08:00:00 2010' '+%s'
1268294400

$ date -u -d 'Thu Mar 11 12:00:00 2010' '+%s'
1268308800

Con esto y con la dirección origen ya tenemos todo lo necesario. Ahora lo normal sería escribir un fichero .pig, pero aquí vamos a usar el intérprete de comandos grunt para ver los resultados paso a paso:

$ pig
10/05/25 21:57:29 INFO pig.Main: Logging error messages to: /usr/local/pig-0.7.0/pig_1274817449369.log
2010-05-25 21:57:29,688 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://fn140
2010-05-25 21:57:30,149 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: fn140:8021
grunt>

Empezamos cargando los datos:

grunt> REGISTROS = LOAD '/user/hadoop/logs_formateados' USING PigStorage('\t') AS (msgid:chararray, fecha:long, origen:chararray, destinoreal:chararray, destinoentrada:chararray, resto:chararray);

Con este comando hemos cargado los datos desde el directorio /user/hadoop/logs_formateados en la variable REGISTROS. Los ficheros de entrada se separan con un tabulador [1], con el schema que hemos venido usando. Veamos, antes de seguir, si Pig ha entendido lo que realmente queremos:

grunt> ILLUSTRATE REGISTROS
2010-05-25 23:58:36,627 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://fn140
2010-05-25 23:58:36,681 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: fn140:8021
2010-05-25 23:58:36,791 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2010-05-25 23:58:36,792 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| REGISTROS     | msgid: chararray                                                    | fecha: long | origen: chararray   | destinoreal: chararray | destinoentrada: chararray | resto: chararray |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|               | 6B160A8CF29E064XA067E116FA2A458CAD8A66@GLOBL0310V01.exampleserv.com | 1268304751  | pr1user@example.com | mga@example.net        | mga@example.net           | texto_base64     |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Ahora que tenemos todos los registros, vamos a filtrarlos en base a los criterios que hemos definido:

grunt> FILTRADOS = FILTER REGISTROS BY (origen matches '.+infojobs.+') AND (fecha > 1268294400) AND (fecha < 1268301600);

Hecho esto sólo nos queda agruparlos por msgid. Recordad, tenemos dos entradas por mensaje: una para el nivel antispam, y otra para la entrega. En realidad, en base al msgid podríamos seguir un mensaje por tantos saltos como tuviéramos (algo parecido a lo que Mailtrust ha documentado que hace):

grunt> AGRUPADOS = GROUP FILTRADOS BY msgid;

Y con esto ya tenemos el resultado. Lo podríamos confirmar con el comando DUMP, que muestra los datos por salida estándar, pero lo vamos a guardar directamente en un fichero:

grunt> STORE AGRUPADOS INTO '/tmp/fichero_resultados.txt' USING PigStorage();
2010-05-26 22:17:03,942 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_LONG 2 time(s).
2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.impl.logicalLayer.optimizer.PruneColumns - No column pruned for REGISTROS
2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.impl.logicalLayer.optimizer.PruneColumns - No map keys pruned for REGISTROS
2010-05-26 22:17:03,946 [main] INFO  org.apache.pig.ResourceSchema - Insert two-level access to Resource Schema
2010-05-26 22:17:03,983 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_LONG 2 time(s).
2010-05-26 22:17:03,985 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: Store(hdfs://fn140/tmp/fichero_resultados.txt:PigStorage) - 1-1007 Operator Key: 1-1007)
2010-05-26 22:17:03,986 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2010-05-26 22:17:03,986 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2010-05-26 22:17:04,019 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
2010-05-26 22:17:05,180 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
2010-05-26 22:17:05,188 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
2010-05-26 22:17:05,191 [Thread-120] WARN  org.apache.hadoop.mapred.JobClient - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2010-05-26 22:17:05,545 [Thread-120] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2010-05-26 22:17:05,546 [Thread-120] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
2010-05-26 22:17:06,382 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201005262134_0008
2010-05-26 22:17:06,382 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://fn140:50030/jobdetails.jsp?jobid=job_201005262134_0008
2010-05-26 22:17:06,384 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
2010-05-26 22:17:19,214 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 10% complete
2010-05-26 22:17:20,719 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 20% complete
...............
2010-05-26 22:17:28,241 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 60% complete
2010-05-26 22:17:41,788 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2010-05-26 22:17:41,788 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Successfully stored result in: "hdfs://fn140/tmp/fichero_resultados.txt"
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Records written : 72
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Bytes written : 44727
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Spillable Memory Manager spill count : 0
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Proactive spill count : 0
2010-05-26 22:17:41,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

Vaya, parece que tenemos 72 usuarios suscritos a infojobs. Veamos un ejemplo de lo que se ha escrito en "/tmp/fichero_resultados.txt" (es un path de HDFS, no el /tmp local). Los saltos de línea los he puesto yo para que sea más visual, y he escrito la versión sin codificar en base64:

1268304853778.112.POCFQELSVS@[hellboy07]
{
    (1268304853778.112.POCFQELSVS@[hellboy07],1268308417,aviso_incripciones@push.infojobs.net,usuario@example.com,usuario@example.com,localhost[127.0.0.1],11854,1268308418,9ABC12FACB4,-1,Mar 11 11:53:38 usuario@example.com usuario@example.com Maildrop sent (delivered via maildrop service))
,
    (1268304853778.112.POCFQELSVS@[hellboy07],1268308417,aviso_incripciones@push.infojobs.net,usuario@example.com,usuario@example.com,push3.infojobs.net[79.171.25.71],11107,1268308418,3B0213FAECD,9ABC12FACB4,Mar 11 11:53:38 usuario@example.com usuario@example.com Antispam sent (250 OK queued as 9ABC12FACB4)
}

Ahora sólo necesitamos que nuestro interfaz web formatee el resultado y lo saque por pantalla. Fácil y sencillo.

Resumiendo, con estas cuatro líneas podemos hacer cualquier búsqueda en los logs, tengan el tamaño que tengan:

REGISTROS = LOAD '/user/hadoop/logs_formateados' USING PigStorage('\t') AS (msgid:chararray, fecha:long, origen:chararray, destinoreal:chararray, destinoentrada:chararray, resto:chararray);
FILTRADOS = FILTER REGISTROS BY (origen matches '.+infojobs.+') AND (fecha > 1268294400) AND (fecha < 1268301600);
AGRUPADOS = GROUP FILTRADOS BY msgid;
STORE AGRUPADOS INTO '/tmp/fichero_resultados.txt' USING PigStorage();

Referencias

Hay muchos artículos y documentación disponible en la red sobre Hadoop. En cuanto a bibliografía, que yo sepa hay al menos tres libros disponibles:

  • Pro Hadoop, de la editorial Apress.
  • Hadoop: The Definitive Guide, de la editorial O'Reilly.
  • Hadoop in Action, de la editorial Manning.

[1]: La verdad es que he hecho trampa. El reducer original separaba el id de mensaje del resto de datos con un tabulador, pero luego usaba un espacio simple. Esto es porque al principio tenía pensado trabajar otros aspectos de Hadoop y Pig, como el Chaining y las "user defined functions".
Al final, para no tener que escribir más posts, he cambiado el reducer para que separe todos los campos con tabuladores.

 

En este tercer post de la serie dejamos la teoría para ver un caso práctico de aplicación de Hadoop, que completaré en un cuarto post.

Como sabemos, gracias a Hadoop conseguimos un sistema con mucha capacidad de almacenamiento, a un coste por mega reducido, redundado, de rendimiento prácticamente lineal (más máquinas = más rendimiento) y sobre el que ejecutar de forma pararela todo tipo de aplicaciones que se puedan implementar como MapReduce.

Con estos datos, hay una utilidad práctica que viene inmediatamente a la mente: tratamiento de logs. Veamos:

  • Me gustaría decir lo contrario, pero personalmente no conozco a nadie que maneje el dinero destinado a proyectos que no ponga pegas a una inversión fuerte para el tratamiento de logs. El coste por mega de Hadoop es difícil de superar.
  • Si combinamos Hadoop, por ejemplo para datos con hasta un año de antiguedad, con un soporte en cinta para el resto, lo tendremos fácil para cumplir cualquier legislación en lo relacionado a retención de datos.
  • Sin llegar a los niveles de Facebook o Rackspace, es probable que el volumen de logs que guardemos durante un año sea lo suficientemente grande como para hacer difícil su tratamiento si un juez (por ejemplo) pide todos los correos desde la cuenta x a la cuenta z de los últimos siete meses, o todos los acceso al foro web de fulanito. MapReduce y Hadoop permiten ejecutar este tipo de tareas sin mayores problemas, y con un rendimiento prácticamente lineal.

En este entorno, planteo un ejemplo de tratamiento de logs de Postfix de lo más simple, con un filtro de contenido en modo post-queue (Amavis, por ejemplo, en su uso más habitual). Vamos, nada nuevo.

El objetivo sería, además de los puntos mencionados anteriormente, poder realizar cualquier tipo de consulta sobre los logs de meses (o años), en un tiempo razonable. Y digo tiempo razonable porque en este tipo de consultas no suele hacer falta tener el resultado al momento. Se acepta esperar durante 1,5 o 10 minutos hasta que se genere el resultado.

Resumiendo, tenemos una infraestructura parecida a esta:

Postfix Amavis con Hadoop

Todos los servidores envían los logs de manera centralizada a un syslog remoto, que además de hacer una copia diaria a cinta, almacena la información en Hadoop, usando alguno de los muchos interfaces que se ofrecen, como pueden ser FUSE o FTP.

Junto a esto tendremos una máquina, preferiblemente un interfaz web, para generar trabajos MapReduce. Básicamente necesitaremos un formulario (cuenta origen, destino, fechas, …), y algo de control sobre los trabajos que ya se han realizado para mostrar los resultados.

Streaming

Se pueden crear trabajos MapReduce para Hadoop de varias formas, pero en este ejemplo me voy a limitar a lo que se llama “Streaming”. Dicho de otra forma (aunque no sea del todo cierto), a programar aplicaciones MapReduce en perl o python (o en cualquier lenguaje que pueda leer y escribir en la entrada y salida estándar).

La gran ventaja que tiene el Streaming es que permite desarrollar prototipos de una forma rápida, aunque no sea lo más eficiente. En cualquier caso, sobra para este ejemplo.

La mejor forma de explicar lo que es el Streaming en Hadoop es ver el equivalente, en versión linux:

cat datos_entrada.txt | mapper.pl | sort | reducer.pl

El mapper.pl recibirá los datos desde los ficheros de log, línea por línea, y los tratará, escribiendo por salida estándar con el formato clave “tabulador” valor.

Esta salida se ordena, y pasa a un reducer. Hadoop garantiza que todas las claves serán tratadas por el mismo reducer, aunque éste puede, y normalmente así lo hará, procesar varias claves (este es un buen momento para recordar que los ficheros de entrada pueden ser, por decir algo, 200GB de datos, sin que suponga ningún problema para Hadoop).

Volviendo al ejemplo, veamos unas cuantas líneas de log de postfix:

Feb 20 14:27:03 server postfix/smtpd[17875]: E78D13073B2: client=mail.example.com[10.0.0.6]
Feb 20 14:27:03 server postfix/cleanup[17822]: E78D13073B2: message-id=<F0E59D47X9B343A7DFRAB1F0E45@EUW0600375>
Feb 20 14:27:04 server postfix/qmgr[5370]: E78D13073B2: from=<prueba@example.com>, size=20827, nrcpt=1 (queue active)
Feb 20 14:27:04 server postfix/smtp[17980]: E78D13073B2: to=<destino@example.org>, relay=127.0.0.1[127.0.0.1]:10025, delay=0.6, delays=0.08/0/0/0.51, dsn=2.0.0, status=sent (250 Ok: queued as 6A737307316)
Feb 20 14:27:04 server postfix/qmgr[5370]: E78D13073B2: removed

Feb 20 14:27:04 server postfix/smtpd[17882]: 6A737307316: client=localhost[127.0.0.1]
Feb 20 14:27:04 server postfix/cleanup[17817]: 6A737307316: message-id=<F0E59D47X9B343A7DFRAB1F0E45@EUW0600375>
Feb 20 14:27:04 server postfix/qmgr[5370]: 6A737307316: from=<prueba@example.com>, size=21719, nrcpt=1 (queue active)
Feb 20 14:27:04 server postfix/pipe[16491]: 6A737307316: to=<destino@example.org>, relay=maildrop, delay=0.16, delays=0.11/0.01/0/0.04, dsn=2.0.0, status=sent (delivered via maildrop service)
Feb 20 14:27:04 server postfix/qmgr[5370]: 6A737307316: removed

Esta es la información que postfix loguea al recibir y entregar un mensaje usando un filtro after-queue. He ordenado los logs para que sea más claro, pero en la realidad no es así.

En este ejemplo se recibe un correo desde mail.example.com, y se le asigna el queue-id E78D13073B2. También se loguea el message-id, el origen y el tamaño.

Una vez se ha encolado el mensaje, se envía a Amavis, a través de 127.0.0.1:10025. Como se puede ver en el ejemplo, en este punto sabemos el destino del mensaje, y también si fue aceptado o no. Lo sabemos porque el log hace referencia al queue id que amavis reinjecta en postfix. De no haberlo aceptado no tendríamos un queue id en el log.

Este nuevo mensaje mantiene el message-id, el tamaño es algo mayor, y termina entregandose, en este caso en la máquina local, usando la aplicación courier-maildrop.

Bien, esto no es más que un ejemplo. En la realidad hay que tratar muchos casos: aliases, reenvíos a máquinas remotas, bounces, mensajes que pasan a deferred por no haber podido entregarse, ….

Además, e insistiendo en que esto no es más que un ejemplo, voy a asumir dos cosas, que aunque se cumplen a menudo, no son ciertas:

  • Que el queue-id es único.
  • Que el message-id es único.

Mapper.pl

El proceso comienza con Hadoop enviando los ficheros de entrada (logs) a la entrada estándar de los mapper.pl que estemos ejecutando en el cluster, que tratarán cada línea y la enviarán por salida estándar con el formato:

queueid<tabulador>informacion_separada_por_comas

Para separar la clave del valor se usa un tabulador. Normalmente, los campos del valor se separan con espacios, pero en este ejemplo he preferido las “comas”.

Veamos un trozo del código:

..............
while (<>)
..............
if ($linea =~ /^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) .+ postfix\/smtpd\[\d+\]: (\w+): client=(\S+)$/) {
                $estructura{fecha} = $1; $estructura{qid} = $2; $estructura{host} = $3;
                print $estructura{qid} . "\t" . "Conexion," . join (",",$estructura{fecha},$estructura{host}) . "\n";
        }
         elsif ($linea =~ /^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) .+ postfix\/smtp\[\d+\]: (\w+): to\=\<([^ ]+)\>, relay=127.0.0.1.*status=(.*) (\w+)\)$/) {

                $estructura{fecha} = $1; $estructura{qid} = $2; $estructura{to} = $3; $estructura{origto} = $3; $estructura{status} = $4; $estructura{relay} = "Antispam"; $estructura{rqid} = $5; $estructura{status} =~ s/,//g;
                print $estructura{qid} . "\t" . "Antispam," . join (",",$estructura{fecha},$estructura{to},$estructura{to},$estructura{relay},$estructura{status},$estructura{rqid}) . "\n";
        }
...........
}

Tenemos varios “if/elsif”, que adaptan la línea del log de entrada al formato que quiero. Además, he añadido un texto con el tipo de entrada de log que estamos tratando, y que sabemos por el “if” en el que estamos (Antispam en este ejemplo), para luego usarlo en el reducer.

El código completo incluye también la detección de los mensajes que Amavis considera que son spam, en los que pone el rqid como 0. Además, en aquellos casos donde “no proceda” aplicar el concepto de rqid pongo -1 como valor. Todo esto servirá en el reducer.

Volviendo a los logs de ejemplo, el resultado de ejecutar el mapper sobre ellos es:

E78D13073B2     Conexion,Feb 20 14:27:03,mail.example.com[10.0.0.6]
E78D13073B2     Msgid,Jan 20 06:27:03,F0E59D47X9B343A7DFRAB1F0E45@EUW0600375
E78D13073B2     Origen,Feb 20 14:27:04,prueba@example.com,20827
E78D13073B2     Antispam,Feb 20 14:27:04,destino@example.org,destino@example.org,Antispam,sent (250 Ok: queued as,6A737307316)
E78D13073B2     Cierre,Feb 20 14:27:04
6A737307316     Conexion,Feb 20 14:27:04,localhost[127.0.0.1]
6A737307316     Msgid,Feb 20 14:27:04,F0E59D47X9B343A7DFRAB1F0E45@EUW0600375
6A737307316     Origen,Feb 20 14:27:04,prueba@example.com,21719
6A737307316     Maildrop,Feb 20 14:27:04,destino@example.org,destino@example.org,Maildrop,sent (delivered via maildrop service),-1
6A737307316     Cierre,Feb 20 14:27:04

Después de esto Hadoop ejecutaría un sort (en el ejemplo ya están ordenados por queue-id), y volvería a enviar el resultado a los reducer.pl del cluster. Por lo tanto, sabemos que un reducer va a recibir todas las líneas de log correspondientes a un mismo queue-id.

Reducer.pl

Pasamos directamente a ver algo del código del reducer.pl:

my %mensaje = (qid => "",
            fecha => "",
            servidor => "",
            msgid => "",
            origen => "",
            size => "",
            to => "",
            origto => "",
            rqid => "",
            relay => "",
            status => "",
            cierre => ""
            );

..............
while () {
    chomp;
    ($qid, $valor) = split /\t/;

        if ($qid ne $qidanterior) {
                # tenemos un queueid nuevo
                if ($qidanterior ne "") {
                        &mostrar_datos();

                        %mensaje = (qid => "",
                                fecha => "",
                                desdeip => "",
                                msgid => "",
                                origen => "",
                                size => "",
                                to => "",
                                origto => "",
                                rqid => "",
                                status => "",
                                cierre => ""
                        );
                }
                $qidanterior = $qid;
        }

        # seguimos procesando los datos del mismo qid
        $mensaje{qid} = $qid;
        @datos = split(",",$valor);

        switch ($datos[0]) {
                case "Conexion" {
                        $mensaje{fecha} = $datos[1];
                        $mensaje{desdeip} = $datos[2];
                }
                case "Msgid" {
                        $mensaje{msgid} = $datos[2];
                }
                case "Origen" {
                        $mensaje{origen} = $datos[2];
                        $mensaje{size} = $datos[3];
                }
                case "Antispam" {
                        $mensaje{to} .= $datos[2] . "-separador-";
                        $mensaje{origto} .= $datos[3] . "-separador-";
                        $mensaje{status} .= $datos[1] . " " .$datos[2] . " " . $datos[3] . " " . $datos[4] . " " . $datos[5] . "-separador-";
                        $mensaje{rqid} = $datos[6];
                }

................
sub mostrar_datos {
     ................
     if ( ($mensaje{fecha} ne "") && ($mensaje{cierre} ne "") ) {
          # mensaje completo: Salida estandar
          # hay que revisar que se haya pasado por todas las fases
          # y realizar comprobaciones, que aqui no se hacen
          $fechaentrada = `date -u -d '$mensaje{fecha}' '+%s'`; chomp $fechaentrada;
          $fechasalida = `date -u -d '$mensaje{cierre}' '+%s'`; chomp $fechasalida;

          $datoscodificables = $mensaje{desdeip} .",".$mensaje{size}.",".$fechasalida.",".$mensaje{qid}.",".$mensaje{rqid}.",".$mensaje{status};
          $resto = encode_base64($datoscodificables,"");

          print $mensaje{msgid} . "\t" . $fechaentrada . " " . $mensaje{origen} . " " . $mensaje{to} . " " . $mensaje{origto} . $resto . "\n";

    } else {
          # mensaje incompleto. Guardar en otro fichero para poder seguir el tracking
          # Falta por hacer
    }

}

Por lo tanto, cogemos cada línea de entrada, y ayudándonos de la descripción del tipo de línea que hemos añadido en el mapper, vamos completando la estructura de datos que después será el resultado (por fin) que queremos.

Un ejemplo de la ejecución de este trabajo en mi cluster de prueba:

$ hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-0.20.2-streaming.jar -input /user/hadoop/mail_logs -output /user/hadoop/logs_formateados/domingo_14_03 -file /home/hadoop/mapper.pl -file /home/hadoop/reducer.pl -mapper "/usr/bin/perl -w mapper.pl" -reducer "/usr/bin/perl -w reducer.pl"

packageJobJar: [/home/hadoop/mapper.pl, /home/hadoop/reducer.pl, /home/hadoop/hadoop-unjar4902963819550178151/] [] /tmp/streamjob534817239215339193.jar tmpDir=null
10/03/18 22:20:02 INFO mapred.FileInputFormat: Total input paths to process : 4
10/03/18 22:20:02 INFO streaming.StreamJob: getLocalDirs(): [/home/hadoop/mapred/local]
10/03/18 22:20:02 INFO streaming.StreamJob: Running job: job_201003182146_0001
10/03/18 22:20:02 INFO streaming.StreamJob: To kill this job, run:
10/03/18 22:20:02 INFO streaming.StreamJob: /usr/local/hadoop/bin/hadoop job  -Dmapred.job.tracker=fn140:8021 -kill job_201003182146_0001
10/03/18 22:20:02 INFO streaming.StreamJob: Tracking URL: http://fn140.forondarena.net:50030/jobdetails.jsp?jobid=job_201003182146_0001
10/03/18 22:20:03 INFO streaming.StreamJob:  map 0%  reduce 0%
10/03/18 22:20:17 INFO streaming.StreamJob:  map 8%  reduce 0%
10/03/18 22:20:18 INFO streaming.StreamJob:  map 11%  reduce 0%
10/03/18 22:20:20 INFO streaming.StreamJob:  map 15%  reduce 0%
.....
10/03/18 22:21:56 INFO streaming.StreamJob:  map 92%  reduce 25%
10/03/18 22:21:59 INFO streaming.StreamJob:  map 100%  reduce 25%
....
10/03/18 23:14:13 INFO streaming.StreamJob:  map 100%  reduce 98%
10/03/18 23:15:55 INFO streaming.StreamJob:  map 100%  reduce 99%
10/03/18 23:17:31 INFO streaming.StreamJob:  map 100%  reduce 100%
10/03/18 23:18:25 INFO streaming.StreamJob: Job complete: job_201003182146_0001
10/03/18 23:18:25 INFO streaming.StreamJob: Output: /user/hadoop/logs_formateados/domingo_14_03

El resultado es un fichero en la ruta “/user/hadoop/logs_formateados/domingo_14_03″, con líneas formadas por un message-id, seguido por un tabulador, seguido del origen, el destino, el destino antes de la expansión de aliases, y del resto de la información, codificada en base64, todo separado por espacios.

No hace falta decir que el formato de salida en un entorno real se debería adaptar a lo que se necesitase. De hecho, Rackspace usa una estructura binaria, donde entre otras cosas se guarda la información de los diferentes saltos que hace un correo dentro de su sistema. Este ejemplo, por supuesto, es mucho más sencillo.

Al final del proceso hemos obtenido un fichero, que es el que de verdad queremos en nuestro cluster, con toda la información de cada mensaje que ha entrado en nuestra red en un periodo de tiempo concreto (digamos que cada 4 horas volcamos los logs de syslog a Hadoop y ejecutamos la tarea), como se ve en los siguientes ejemplos (sin la codificación en base64, para que sean más fáciles de seguir):

Mensaje que se entrega en un buzón local:

ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org 1268225082 origen@example.org destino@example.com destino@example.com mail.example.org[10.0.2.34],2430,1268225085,B6A6A296E55,158AB296F24,Mar 10 12:44:45 destino@example.com destino@example.com Antispam sent (250 OK sent queued as 158AB296F24

ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org 1268225085 origen@example.org destino@example.com destino@example.com localhost[127.0.0.1],3081,1268225085,158AB296F24,-1,Mar 10 12:44:45 destino@example.com destino@example.com Maildrop sent (delivered via maildrop service)

Por lo tanto:

  • El mensaje con id ED56A03DA2274E90CC2DX6DC158CA01365E@mailer.example.org
  • Entró en el sistema a las 1268225082
  • Desde el servidor mail.example.org[10.0.2.34]
  • El origen era origen@example.org
  • Y el destino destino@example.com (no es un alias)
  • Una vez pasado por el sistema antispam se reencola con queueid 158AB296F24
  • El mensaje de la entrega al servidor antispam es: sent (250 OK sent queued as 158AB296F24)
  • Una vez ha pasado el filtro antispam el mensaje se reencola a las 1268225085
  • Por supuesto desde el servidor localhost[127.0.0.1]
  • Y el mensaje de la entrega local es Mar 10 12:44:45 Maildrop sent (delivered via maildrop service), sólo al destino destino@example.com

Mensaje que se entrega a un servidor remoto:

1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3 1268219047 origen@example.net destino@example.com destino@example.com mail.example.net[10.5.3.2],1901,1268219051,CA6782FABFB,B6C7A2FAACA,Mar 10 11:04:11 destino@example.com destino@example.com Antispam sent (250 OK sent queued as B6C7A2FAACA

1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3 1268219049 origen@example.net destinoreal@remoto.example.org destino@example.com localhost[127.0.0.1],2686,1268219051,B6C7A2FAACA,-1,Mar 10 11:04:11 destinoreal@remoto.example.org destino@example.com mail.example.org[10.0.2.34]:25 sent (250 2.0.0 Ok: queued as 423872AB862)
    En este caso tenemos un alias que se reescribe para entrega remota:

  • Su messageid es 1982C18.15A.1B26824474D6.JavaMail.root@10.1.2.3
  • Entro en el sistema a las 1268219047
  • Desde el servidor mail.example.net[10.5.3.2]
  • Desde el origen origen@example.net
  • Y al destino destino@example.com, que en la fase antispam todavía no se expande a ningún alias
  • El mensaje se relaciona con B6C7A2FAACA
  • Y el mensaje de la entrega es Mar 10 11:04:11 destino@example.com destino@example.com Antispam sent (250 OK sent queued as B6C7A2FAACA)
  • Se reencola, a través de localhost, a las 1268219049
  • El mensaje pasa a ser la dirección real destinoreal@remoto.example.org,
    proveniente del alias destino@example.com
  • El mensaje se entrega en mail.example.org[10.0.2.34]:25
  • Con el mensaje sent (250 2.0.0 Ok: queued as 423A72AB262)

Mensaje que es spam:

9545850218.UWD8KHA190@spammer.example.info 1268238332 spammer@example.net destino@example.com destino@example.com mail.example.info[10.8.2.12],2141,1268238333,F355429B3A5,0,Mar 10 16:25:33 destino@example.com destino@example.com Antispam sent (250 OK sent mensaje_rechazado por spam

En este caso el mensaje es spam, y no se relaciona con ningún otro queueid.

Mensaje que se entrega a varias cuentas:

2010031011.AB263566.pc1@mailer.example.org 1268223531 usuario@example.org destinovarios@example.com destinovarios@example.com mail.example.org[10.0.2.34],15853,1268223532,5A4FE29B3B6,43A9B28B357,Mar 10 12:18:52 destinovarios@example.com destinovarios@example.com Antispam sent (250 OK sent queued as 43A9B28B357

2010031011.AB263566.pc1@mailer.example.org 1268223532 usuario@example.org destinolocal@example.com-separador-destinoremoto@example.info destinovarios@example.com-separador-destinovarios@example.com localhost[127.0.0.1],16525,1268223532,43A9B28B357,-1,Mar 10 12:18:52 destinolocal@example.com destinovarios@example.com Maildrop sent (delivered via maildrop service)-separador-Mar 10 12:18:52 destinoremoto@example.info destinovarios@example.com mail.example.info[10.8.2.12]:25 sent (250 2.0.0 Ok: queued as 452362CA51A)
  • Ahora tenemos un mensaje con id 2010031011.AB263566.pc1@mailer.example.org
  • Que se envía desde usuario@example.org
  • Al destino destinovarios@example.com
  • Que a su vez es un alias a dos cuentas destinolocal@example.com y destinoremoto@example.info
  • Los mensajes de entrega a cada una de ellas son:
    • Mar 10 12:18:52 destinolocal@example.com Maildrop sent (delivered via maildrop service)
    • Mar 10 12:18:52 destinoremoto@example.info mail.example.info[10.8.2.12]:25 sent (250 2.0.0 Ok: queued as 452362CA51A)

Como se puede ver, ya tenemos toda la información necesaria, debidamente preformateada, por lo que ya no necesitamos los ficheros mail.log, que podríamos borrar de Hadoop (que no de las cintas).

La información contenida en estos ficheros nos servirá para preparar consultas, bien directamente con otros trabajos MapReduce, bien con otras herramientas, tal y como mostraré en el siguiente post de la serie, en el que nos aprovecharemos del formato de los ficheros para hacer consultas por message-id, cuenta origen, fecha o cuenta destino (según entró en el sistema o una vez expandidos los aliases).

En este punto hay que volver a recordar que Hadoop está pensado para trabajar con cantidades muy grandes de información, en los que hacer un simple “grep” o usar una base de datos relacional estándar resulta impracticable.

Por último, tened en cuenta que el código de este post, además de ser sólo un ejemplo, es completamente Alpha. Aparte de no funcionar del todo bien (tiene varios fallos que he detectado pero que no he corregido todavía), necesitaría tener en cuenta muchos más casos, como mensajes cuya entrega se retrasa durante días (que no deja de ser algo normal en los sistemas de correo reales).

En el cuarto y último post de la serie: acceso a Hadoop a través de una herramienta de apoyo, como pig.

 

Continúo la serie de posts sobre Hadoop describiendo, muy por encima, los componentes y la estructura que suelen tener este tipo de sistemas. Como ya he dicho en otros posts, este blog pretende ser sobre todo práctico, así que no me voy a extender demasiado.

Gráficamente un cluster Hadoop se puede parecer mucho a esto:

Estructura

JobTracker:
En un cluster hay un único JobTracker. Su labor principal es la gestión de los TaskTrackers, entre los que distribuye los trabajos MapReduce que recibe. Es, por lo tanto, el interfaz principal que tienen los usuarios para acceder al cluster.

TaskTracker:
Un cluster tiene n TaskTrackers que se encargan de la ejecución de las tareas map y reduce en los nodos. Cada uno de los TaskTrackers gestiona la ejecución de estas tareas en una máquina del cluster. El JobTracker se encarga, a su vez, de controlarlos.

NameNode:
Las funciones principales del NameNode son el almacenamiento y la gestión de los metadatos del sistema de ficheros distribuido y ofrecer el interfaz principal que tiene el usuario para acceder al contenido HDFS. En un cluster hay un único proceso NameNode.
Sin los metadatos que mantiene el NameNode no se sabría en qué nodo está cada bloque, además de perderse la información sobre la estructura de directorios. Es, por lo tanto, el componente más importante del cluster, y debe estar redundado. Hadoop no ofrece de manera nativa ningún mecanismo de alta disponibilidad, pero sí dispone de herramientas que permiten replicar la información. Entre ellas está el NameNode secundario, que permite guardar una copia de los metadatos (en realidad hace más cosas) en una máquina diferente al NameNode. Eso sí, no es una copia en tiempo real, y no ofrece failover automático en caso de fallo del primario. Para esto necesitamos usar otro tipo de herramientas, entre las que destacan DRBD y Heartbeat.

DataNode:
Estos procesos ofrecen el servicio de almacenamiento de bloques para el sistema de ficheros distribuido. Son coordinados por el NameNode.

Estos son los procesos principales del sistema, pero hay otros, como el Balancer, que se encarga de equilibrar la distribución de los bloques entre los DataNodes, por ejemplo cuando se añade un nuevo servidor.

Mi laboratorio se limita a cuatro servidores: un NameNode+JobTracker y tres DataNode+TaskTracker, en cuatro máquinas virtuales de un core y 1G de memoria, para un total de 5G de almacenamiento.

hadoop@fn140:/usr/local/hadoop/conf$ hadoop dfsadmin -report
Configured Capacity: 8695013376 (8.1 GB)
Present Capacity: 5455364096 (5.08 GB)
DFS Remaining: 5455290368 (5.08 GB)
DFS Used: 73728 (72 KB)
DFS Used%: 0%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0

-------------------------------------------------
Datanodes available: 3 (3 total, 0 dead)

Name: 192.168.10.142:50010
Decommission Status : Normal
Configured Capacity: 2898337792 (2.7 GB)
DFS Used: 24576 (24 KB)
Non DFS Used: 1079918592 (1.01 GB)
DFS Remaining: 1818394624(1.69 GB)
DFS Used%: 0%
DFS Remaining%: 62.74%
Last contact: Sun Jan 31 16:10:58 CET 2010

Name: 192.168.10.143:50010
Decommission Status : Normal
Configured Capacity: 2898337792 (2.7 GB)
DFS Used: 24576 (24 KB)
Non DFS Used: 1080020992 (1.01 GB)
DFS Remaining: 1818292224(1.69 GB)
DFS Used%: 0%
DFS Remaining%: 62.74%
Last contact: Sun Jan 31 16:10:58 CET 2010

Name: 192.168.10.141:50010
Decommission Status : Normal
Configured Capacity: 2898337792 (2.7 GB)
DFS Used: 24576 (24 KB)
Non DFS Used: 1079709696 (1.01 GB)
DFS Remaining: 1818603520(1.69 GB)
DFS Used%: 0%
DFS Remaining%: 62.75%
Last contact: Sun Jan 31 16:10:56 CET 2010

En los próximos dos posts voy a dejar la “teoría” para pasar a la práctica, empezando por una de las aplicaciones más obvias de Hadoop, como es el tratamiento de logs; para seguir por una de las utilidades que existen para facilitar el desarrollo de aplicaciones MapReduce: pig.

 

Allá por el año 2003, cuando Google ya dominaba el mundo de los buscadores, muchos administradores de sistemas nos preguntábamos por la tecnología que usarían para indexar páginas, calcular Pageranks, gestionar el almacenamiento ….

En ese momento Google publicó varios documentos al respecto, como este sobre MapReduce y este otro sobre su sistema de ficheros GFS (Google File System), que daban algunas pistas, y que a la larga han sido la base de varios proyectos, como Hadoop.

Pretendo que este blog sea sobre todo práctico, así que no voy a escribir demasiado ni sobre la teoría que hay detrás de MapReduce (programación funcional, map/reduce, bla bla bla) ni sobre Google y GFS. Para el que quiera leer algo más sobre la tecnología de Google recomiendo este excelente blog en el que se pueden encontrar varios posts sobre el buscador.

El problema
El problema es sencillo y se resume en el volumen de datos, cada vez mayor, que generan las aplicaciones; y que por supuesto hay que tratar. Por dar un ejemplo, en el 2008 Yahoo! gestionaba unos 5PB de almacenamiento para generar su “webmap”[1], que es uno de los componentes que usa su buscador.
Otro ejemplo es Rackspace, y su división de correo, Mailtrust, que genera más de 150GB de logs de correo al día. Por supuesto, además de ser mucho volumen acomulado, es fundamental que se pueda tratar de una forma ágil para dar una respuesta rápida a clientes que piden cosas como “el correo que me mandó fulanito hace 3 meses no me ha llegado” o “quiero todos los correos recibidos en mi dominio en los últimos 5 meses”. Esto es razonablemente sencillo cuando se controlan unos pocos cientos de GB, pero se hace mucho más difícil cuando la escala pasa al Tera.

El que haya nombrado estos dos ejemplos no es casual, ya que Yahoo! y Rackspace son dos de los usuarios más notables (junto a Facebook, LastFM, …) del software que voy a describir en los siguientes posts: Hadoop.

¿Qué es Hadoop?

Resumiendo mucho, Hadoop es un framework software que intenta implementar los conceptos de MapReduce y GFS que presentó Google. Está pensado, por lo tanto, para el tratamiento distribuido de grandes cantidades de datos en máquinas de bajo coste y con poca fiabilidad.

Partiendo de esta base, parece claro que hace falta un sistema de ficheros pensado para almacenar mucha información y con tolerancia ante fallos. Aquí es donde aparece HDFS (Hadoop Distributed File System). Algunas de sus características:

  • El tamaño de bloque por defecto en este sistema de ficheros es de 64MB, pero muchas veces se aumenta a 128MB.
  • Ofrece la mencionada disponibilidad al dejar varias copias de cada bloque en máquinas diferentes.
  • Usa el concepto de “rack awareness”, para saber dónde está cada bloque y qué proceso de cálculo puede acceder más rápido a él.
  • Su diseño facilita la lectura secuencial de datos (razonable en discos estándar de bajo coste), pero lo hace sacrificando otras características propias de los sistemas de ficheros POSIX. Por ejemplo, que nadie piense editar un fichero en HDFS y borrar la línea número 2000.

Por otro lado, el framework implementa el concepto MapReduce, que como ya he escrito consiste en dividir el trabajo entre n servidores para presentar el resultado después de forma coherente. Veamos un ejemplo:

Digamos[2] que tenemos un cluster de 4 máquinas Hadoop que guardan logs de apache de 40 servidores balanceados, para un total de 100GB al día. Nuestro objetivo es diseñar un trabajo MapReduce para sumar los accesos que ha hecho cada IP en todos los frontales.

Los logs van a ser sencillos y se van a limitar a “FECHA IP URL”.

Servidor 1:

FECHA 172.17.2.34 /index.html
FECHA 172.17.7.13 /contenido/futbol.html
FECHA 172.17.2.34 /img/banner.jpg
FECHA 172.17.2.42 /index.html

Servidor 2:

FECHA 172.17.2.34 /contenido/peliculas.html
FECHA 172.17.7.13 /img/futbol.jpg

Servidor 3:

FECHA 172.17.7.13 /index.html
FECHA 172.17.2.34 /peliculas/bladerunner.html
FECHA 172.17.2.42 /img/banner.jpg

Servidor 4:

FECHA 172.17.2.42 /contenido/series.html
FECHA 172.17.2.42 /img/series.jpg

A partir de estos datos Hadoop empezará la fase Map, que se ejecuta paralelamente en las máquinas del cluster, y que consiste en adaptar las líneas de log al formato clave< tabulador >valor.

Volviendo al ejemplo, la clave será la IP, y el valor la URL (no necesitamos la fecha).
El resultado de la fase Map es el siguiente[3]:

(172.17.2.34 [/index.html /img/banner.jpg /contenido/peliculas.html /peliculas/bladerunner.html])
(172.17.2.42 [/index.html /img/banner.jpg /contenido/series.html /img/series.jpg])
(172.17.7.13 [/contenido/futbol.html /img/futbol.jpg /index.html])

Hemos generado un listado ordenado por IP, en el que se identifican todos los accesos que ha habido.

Una vez hecho esto Hadoop entrega trozos de esta salida a n procesos Reduce, que en este sencillo ejemplo sólo tienen que contar las URL que aparecen en el valor, para generar algo como lo siguiente:

(172.17.2.34    4)
(172.17.2.42    4)
(172.17.7.13    3)

Esto no es más que un ejemplo. En la realidad, Hadoop se ha usado para cosas tan diversas como convertir 4TB de artículos de periódico escaneados en formato tiff en 11 millones de pdfs; todo usando unas 100 instancias de amazon y en menos de 24 horas, por un total de 240$ sin contar ancho de banda, como se puede ver aquí.

El próximo post describirá una instalación básica del sistema, y el siguiente será una prueba algo más interesante de uso con logs de postfix+amavis, muy al estilo Mailtrust, y usando perl y quizá alguno de los proyectos de apoyo que han surgido alrededor de Hadoop; como Hive, que fue desarrollado por Facebook para crear trabajos MapReduce desde una consola con sintaxis muy similar a SQL.


[1] En el 2008 el webmap de Yahoo! era un grafo de un trillón de vértices(enlaces web) y 100 billones de nodos (URL únicas).
[2] Los tres o cuatro lectores de este blog se enfadan si no uso al menos un “digamos” o “supongamos” en cada post.
[3] En realidad hay más fases, como sort-suffle.

 

Bueno, en realidad el nombre del post no se corresponde con este pequeño artículo, en el que sólo voy a ejecutar un “dd” y un “hdparm” sobre un par de particiones, en discos diferentes, bajo un mismo grupo LVM.

Resumiendo:

Tengo dos discos SATA de 500G, baratos, de poco más de 50 euros cada uno. Cada disco tiene dos particiones de 100G independientes, y otras dos de 400G como particiones LVM, para dar un total de 800G disponibles para el gestor de volúmenes.

Una de las particiones de 100G la dejo vacía mientras no la necesite. La otra la uso para la raíz y /boot del sistema.

En un principio sólo pongo /home en LVM, y luego, según se valla llenando “/”, voy creando otros volúmenes, por ejemplo para /var. Todo un poco rebuscado, pero así mantengo fresco lo poco que sé sobre LVM.

La máquina es una i7 a 2.80GHz, con memoria de sobra. Además, está completamente idle. Como hago copias de seguridad a menudo no me preocupa demasiado que se fastidien los discos. Por lo tanto, me he ahorrado el fakeraid de la placa base y el raid por software de linux. Por cierto, esta prueba se ha hecho sobre una Debian Squeeze de 64 bits básica, y un kernel 2.6.32 compilado a mano.

La creación de los volúmenes no tiene misterio y usa los parámetros por defecto. En otros posts quizá publique pruebas diferentes:

pvcreate /dev/sda2
pvcreate /dev/sdb2

vgcreate vg_prueba /dev/sda2 /dev/sdb2

lvcreate -i2 -L200G -nlv_home vg_prueba

mkfs.ext3 /dev/mapper/vg_prueba-lv_home

mount /dev/mapper/vg_prueba-lv_home /home

Como se puede ver, he utilizado un stripe de 2 y he formateado la partición como ext3.
Ejecutamos ahora escrituras secuenciales con dd sobre ese punto de montaje:

# time dd if=/dev/zero of=/home/prueba/test_escritura bs=1024 count=1048576
1048576+0 records in
1048576+0 records out
1073741824 bytes (1,1 GB) copied, 2,94397 s, 365 MB/s

real	0m2.945s
user	0m0.098s
sys	0m2.700s

Y ahora hdparm:

# hdparm -tT /dev/mapper/vg_prueba-lv_home 

/dev/mapper/vg_prueba-lv_home:
 Timing cached reads:   18472 MB in  2.00 seconds = 9245.75 MB/sec
 Timing buffered disk reads:  716 MB in  3.01 seconds = 238.02 MB/sec

Lo dicho, más adelante quizá publique alguna prueba más con diferentes sistemas de ficheros o parámetros LVM.

Update 1: He hecho una instalación de 32 bits para ver los resultados. En este caso el dd no llegaba ni remotamente a 250 MB/s.
Update 2: He hecho otra prueba con otros módulos de memoria que me han dejado, y los resultados son muuuy diferentes. Siempre hablo del más sencillo “dd” que se puede hacer, claro:

time dd if=/dev/zero of=/home/foron/test_escritura2 bs=1024 count=1048576
1048576+0 records in
1048576+0 records out
1073741824 bytes (1,1 GB) copied, 1,97434 s, 544 MB/s

real	0m1.975s
user	0m0.097s
sys	0m1.863s
 

A veces (seguro que pocas), nos encontramos ante una máquina que por algún motivo ha dejado de trabajar como se esperaba.
Digamos que el rendimiento o el IO del equipo baja, sin motivo visible. Digamos que no hay logs que indiquen problemas, ni errores del sistema. Nada.
Este es el momento para el kung fu, el voodoo o todo tipo de técnicas adivinatorias que tanto usamos los informáticos. Muchas veces es suficiente, pero no siempre, y no hay nada peor que no ser capaz de dar una explicación a un problema.

systemtap

Supongamos que tenemos la arquitectura de la imagen, con un par de balanceadores de carga, unas cuantas máquinas iguales que pueden ser servidores web, ftp, pop, smtp o cualquier otra cosa, y un par de servidores de almacenamiento nfs.
Teniendo en cuenta que los balanceadores de carga suelen dar la posibilidad de asignar diferentes pesos a cada máquina balanceada, ¿Por qué no usar uno de estos servidores para la monitorización del sistema? Es perfectamente posible enviar un poco menos de tráfico a la máquina destinada a SystemTap, de tal manera que no afecte al rendimiento global, para que podamos darnos un mecanismo para tener nuestra infraestructura controlada.

En este post sólo voy a referenciar algunos scripts que están disponibles en la web y en el redpaper. Además, el propio software viene con muchos ejemplos. En CentOS se puede instalar el rpm “systemtap-testsuite” para probar varios scripts.

Vamos a empezar con un par de ejemplos para usar más en un entorno académico que en la práctica.

Digamos que queremos aprender “lo que pasa” cuando hacemos un “ls” en un directorio de una partición ext3. Con este sencillo script:

#! /usr/bin/env stap

probe module("ext3").function("*").call
{
   printf("%s -> %s\n", thread_indent(1), probefunc())
}
probe module("ext3").function("*").return
{
   printf("%s <- %s\n", thread_indent(-1), probefunc())
}

Vamos a hacer un printf cada vez que comience y termine la ejecución de cualquier función del módulo ext3. En el printf vamos a tabular el nombre de la función que se ha ejecutado. El resultado es algo así:

   ...
     2 ls(31789): <- ext3_permission
     0 ls(31789): -> ext3_dirty_inode
     8 ls(31789):  -> ext3_journal_start_sb
    21 ls(31789):  <- ext3_journal_start_sb
    26 ls(31789):  -> ext3_mark_inode_dirty
    31 ls(31789):   -> ext3_reserve_inode_write
    35 ls(31789):    -> ext3_get_inode_loc
    39 ls(31789):     -> __ext3_get_inode_loc
    52 ls(31789):     <- __ext3_get_inode_loc
    56 ls(31789):    <- ext3_get_inode_loc
    61 ls(31789):   <- ext3_reserve_inode_write
    67 ls(31789):   -> ext3_mark_iloc_dirty
    74 ls(31789):   <- ext3_mark_iloc_dirty
    77 ls(31789):  <- ext3_mark_inode_dirty
    83 ls(31789):  -> __ext3_journal_stop
    87 ls(31789):  <- __ext3_journal_stop
    90 ls(31789): <- ext3_dirty_inode
     0 ls(31789): -> ext3_permission
   ...

Como he dicho, no es algo demasiado práctico, pero puede servir a algún profesor para suspender a un buen porcentaje de pobres alumnos :-)
Sigamos con algún otro ejemplo. Digamos que queremos programar un "nettop" que nos diga qué conexiones se están abriendo en una máquina en cada momento. Con un script similar al siguiente lo tendremos en unos minutos:

#! /usr/bin/env stap

global ifxmit, ifrecv

probe netdev.transmit
{
  ifxmit[pid(), dev_name, execname(), uid()] <<< length
}

probe netdev.receive
{
  ifrecv[pid(), dev_name, execname(), uid()] <<< length
}

function print_activity()
{
  printf("%5s %5s %-7s %7s %7s %7s %7s %-15s\n",
         "PID", "UID", "DEV", "XMIT_PK", "RECV_PK",
         "XMIT_KB", "RECV_KB", "COMMAND")

  foreach ([pid, dev, exec, uid] in ifrecv-) {
    n_xmit = @count(ifxmit[pid, dev, exec, uid])
    n_recv = @count(ifrecv[pid, dev, exec, uid])
    printf("%5d %5d %-7s %7d %7d %7d %7d %-15s\n",
           pid, uid, dev, n_xmit, n_recv,
           n_xmit ? @sum(ifxmit[pid, dev, exec, uid])/1024 : 0,
           n_recv ? @sum(ifrecv[pid, dev, exec, uid])/1024 : 0,
           exec)
  }
  print("\n")
  delete ifxmit
  delete ifrecv
}

probe timer.ms(5000), end, error
{
  print_activity()
}

¿Qué es lo que hemos hecho? Hemos generado tres "probes". Por un lado, cuando se recibe o trasmite a través de la red añadimos a los arrays "ifxmit, ifrecv" información sobre qué proceso y en qué interfaz ha enviado o recibido. Por otro lado, cada 5000 ms o cuando haya un error o termine el script mostramos la información por pantalla. El resultado puede ser algo similar a lo siguiente:

  PID   UID DEV     XMIT_PK RECV_PK XMIT_KB RECV_KB COMMAND
31485   500 eth0          1       1       0       0 sshd           

y pasados unos miles de milisegundos ...

  PID   UID DEV     XMIT_PK RECV_PK XMIT_KB RECV_KB COMMAND
15337    48 eth0          4       5       5       0 httpd
31485   500 eth0          1       1       0       0 sshd

Como se ve, tengo una sesión ssh abierta permanentemente, y he consultado una página web en el servidor monitorizado.
Por supuesto, esto puede no ser muy práctico en servidores muy activos, pero puede dar alguna idea a algún administrador.
Igual que hemos hecho un "nettop", también podemos hacer un "disktop" que muestre un resultado como el siguiente:

Sat Apr 11 17:22:03 2009 , Average:   0Kb/sec, Read:       0Kb, Write:      0Kb
     UID      PID     PPID                       CMD   DEVICE    T        BYTES
      48    15342    15239                     httpd     dm-0    W          210
      48    15343    15239                     httpd     dm-0    W          210
      48    15337    15239                     httpd     dm-0    W          210
      48    15336    15239                     httpd     dm-0    W          210

Lo que hecho es hacer el mismo wget varias veces. Para el que tenga curiosidad, los procesos httpd (que por cierto usa el usuario id=48) ejecutando en el servidor son:

# pstree -p | grep http
        |-httpd(15239)-+-httpd(15336)
        |              |-httpd(15337)
        |              |-httpd(15338)
        |              |-httpd(15339)
        |              |-httpd(15340)
        |              |-httpd(15341)
        |              |-httpd(15342)
        |              `-httpd(15343)

El script disktop está disponible en "/usr/share/systemtap/testsuite/systemtap.examples/io" del rpm "systemtap-testsuite".

Volvamos al primer ejemplo del post. Teníamos un grupo de servidores que han dejado de funcionar como deben. Digamos que sospechamos de la velocidad con la que nuestros servidores nfs están sirviendo el contenido de los servidores web.
Podemos probar aquellos ficheros que necesiten más de 1 segundo para abrirse:

#!/usr/bin/stap

global open , names
probe begin {
        printf("%10s %12s %30s\n","Process" , "Open time(s)" , "File Name")
}
probe kernel.function("sys_open").return{
        open[execname(),task_tid(task_current()),$return] = gettimeofday_us()
        names[execname(),task_tid(task_current()),$return] = user_string($filename)
}
probe kernel.function("sys_close"){
        open_time_ms = (gettimeofday_us() - open[execname(),task_tid(task_current()), $fd])
        open_time_s = open_time_ms / 1000000
        if ((open_time_s >= 1) && (names[execname(),task_tid(task_current()), $fd] != "")) {
                printf("%10s %6d.%.5d %30s\n", execname(),
open_time_s,open_time_ms%1000000,names[execname(),task_tid(task_current()), $fd])
        }
}

Con este sencillo script estaría hecho:

   Process Open time(s)                      File Name
     httpd      8.471285 /var/www/html/lectura_lenta.html

En este caso al servidor web le ha costado 8.5 segundos servir el fichero lectura_lenta.html. Después será responsabilidad nuestra buscar los problemas.

En definitiva, systemtap es una herramienta muy completa, pero que como tal requiere algo de práctica para ser útil. No sé si alguna vez va a tener mucho éxito en entornos de producción, pero no está de más saber que existe.

 

Empiezo otra serie de dos posts. En este caso sobre algo que tenía “pendiente” desde hace ya tiempo. De hecho, normalmente escribiría mis propios ejemplos para publicarlos aquí, pero para ir más rápido me voy a limitar a referenciar los scripts que usaré para mostrar las funcionalidades de ….. SystemTap.
SystemTap es una herramienta que sirve para monitorizar en tiempo real lo que está pasando con un kernel linux. Quitando el detalle de que el kernel tiene que tener ciertas opciones compiladas (luego las comento), SystemTap tiene la gran ventaja de no requerir ningún tipo de reinicio para empezar a trabajar.

¿Qué podemos monitorizar con SystemTap?
Pues ….. prácticamente todo lo que queramos. La segunda parte de este post tratará algunos ejemplos, pero por dar alguna pista, con este software podemos desde vigilar los procesos que más I/O están generando a programarnos un “nettop” que diga los procesos que más están usando la red.

¿Cómo funciona SystemTap?
Con SystemTap se le dice al kernel que ejecute una rutina cuando ocurre un evento, que puede basarse, por citar dos ejemplos, en el tiempo (cada n segundos) o en una llamada del sistema (al ejecutarse un vfs_read).
Para esto se usa un lenguaje de programación propio con el que se definen los “probe points” y las diferentes funciones. A pesar de ofrecer muchas posibilidades, el propio lenguaje tiene un control especial sobre los bucles infinitos, el acceso a memoria o la recursividad, por poner tres ejemplos. ¿Por qué tanto control? Pues porque a partir de este código se genera un módulo de kernel que se carga en el sistema. Claro, no hace falta decir las consecuencias de un bucle “mal hecho” a tan bajo nivel.

¿Qué necesita el kernel de linux para poder usar SystemTap?
Para empezar, cómo no, necesitamos un núcleo capaz de cargar módulos. Después, deberemos activar el soporte para los distintos tipos de debug que tiene el kernel, como el del sistema de ficheros, el del propio kernel o los kprobes. Traducido a formato .config, necesitamos las opciones CONFIG_DEBUG_INFO, CONFIG_KPROBES, CONFIG_RELAY, CONFIG_DEBUG_FS, CONFIG_MODULES y CONFIG_MODULES_UNLOAD.
Algunas distribuciones incluyen kernels específicos con estas opciones ya activadas.

¿Por qué SystemTap y no strace o gdb?
Bueno, esta seguramente sea una buena pregunta, al menos hasta ver los ejemplos de la segunda parte de este post; pero resumiendo, con SystemTap podemos:

  • Ver de forma integrada y unificada lo que pasa en el kernel y en las aplicaciones que ejecuta.
  • Probar aplicaciones multihilo.
  • Monitorizar aplicaciones multiproceso, como las cliente-servidor, en las que ambos componentes son procesos independientes.
  • Monitorizar en tiempo real y a prácticamente la velocidad de ejecución original.
  • Escribir nuestros propios monitores que den detalles que aplicaciones “generalistas” no son capaces de dar.

¿Cuáles son los aspectos negativos del invento?
Evidentemente, no todos son ventajas con SystemTap. Podríamos hablar de los inconvenientes técnicos, porque las opciones de debug del kernel ralentizan un poco la velocidad del sistema, pero yo creo que los mayores problemas vienen por el aprendizaje necesario para usar el software. Para empezar, hay que tener un cierto conocimiento del kernel de linux; después, hay que saber las posibilidades del lenguaje de programación, y por último hay que ser capaz de interpretar los resultados. Todo esto desmoralizará a más de uno, seguro, que preferirá seguir con top, htop, vmstat y demás familia antes de meterse en este embolado.
Afortunadamente, ya hay multitud de scripts disponibles en Internet para todo tipo de situaciones.

Suelo terminar los posts con una referencia bibliográfica. En este caso no creo que haya ningún libro sobre SystemTap, pero sí que hay un redpaper de IBM (que debería pasar a ser un redBOOK pronto :-) ): SystemTap: Instrumenting the Linux Kernel for Analyzing Performance and Functional Problems

 

En este último post de la serie vamos a ver, como siempre muy por encima, la funcionalidad “active response” que ofrece ossec. En definitiva, se trata de ser capaces de ejecutar un script cuando se activa un evento. Esto normalmente se usa para añadir reglas en firewalls, para añadir reglas tcp wrapper o para bloquear usuarios, pero en el fondo se puede hacer cualquier cosa que se pueda escribir en un programa.
Por ejemplo, si una de las máquinas monitorizadas es un servidor de correo se podría añadir una IP que generase una alerta a una lista de acceso, o a una base de datos RBL. O también se podrían añadir reglas de modsecurity en un servidor web.

Para el ejemplo de este post, digamos que como administradores ya estamos usando psad para crear reglas en nuestro firewall. Digamos, además, que sólo queremos que sea psad el responsable de las reglas dinámicas en el cortafuegos.

Infraestructura ossec

Rootkits, accesos no permitidos, …. No parece necesario justificar la necesidad de tener el firewall monitorizado, ¿Verdad?

Nuestro objetivo es crear reglas para psad desde ossec.
Primero definimos el comando en ossec.conf para poder usarlo más adelante en la configuración de la respuesta activa.

<command>
  <name>psad</name>
  <executable>psad.sh</executable>
  <expect>user,srcip</expect>
</command>

El script “psad.sh” (que guardaremos en $ossec_instalacion/active-response/bin con permisos de ejecución) tiene el siguiente contenido:

#!/bin/bash
# Anyade una regla de psad usando la IP origen. Tambien pasamos el usuario, pero no lo vamos a usar.
# Entrada: user, srcip
# Salida: Nada. Ejecuta una regla psad
ACCION=$1
USUARIO=$2
IP=$3

psad -fw-block-ip $IP

Ahora sólo queda configurar la respuesta activa en ossec.conf.

<active-response>
  <disabled>no</disabled>
  <command>psad</command>
  <location>defined-agent</location>
  <agent_id>001</agent_id>
  <level>10</level>
  <rules_group>authentication_failures</rules_group>
</active-response>

Estamos diciendo que vamos a ejecutar en el agente 001 (el firewall) el comando psad cuando ocurra un evento de nivel 10 y del grupo authentication_failures (por ejemplo la regla 5720).

Veamos si funciona bien intentando loguearnos varias veces desde una máquina (después de haber recargado la configuración de ossec). Este es el resultado

Nov  9 00:50:03 firewall psad: added iptables auto-block against 192.168.10.133 for 3600 seconds

Chain PSAD_BLOCK (1 references)
target     prot opt source               destination
DROP       all  --  192.168.10.133       0.0.0.0/0

Como siempre, y para aquellos que quieran profundizar más, recomiendo el libro http://www.elsevierdirect.com/product.jsp?isbn=9781597492409.

 

En mi primer post sobre ossec he mostrado los pasos para hacer una instalación básica y completamente estándar de ossec. En este post vamos a ver lo que hay por debajo del software.

Ossec usa tres componentes: un detector de rootkits, una herramienta para revisar la integridad del sistema, y el analizador de logs. Todas ellas se configuran en el fichero ossec.conf.

Este fichero de configuración tiene una estructura muy clara, en formato xml, en el que se definen varios bloques.

En este post sólo voy a dar algunas ideas básicas sobre el formato con el que se definen las reglas que describen los eventos detectables en los logs.

Tanto el chequeo de integridad como el detector de rootkits se configuran de una forma muy similar, así que lo dejaré para que quien quiera se pelee con ello.

En ossec.conf se definen los logs que se quieren monitorizar dentro de secciones “localfile”. Un ejemplo:

  <localfile>
    <log_format>syslog</log_format>
    <location>/var/log/auth.log</location>
  </localfile>

En este caso se dice que queremos vigilar auth.log, del tipo syslog. Existen varios formatos definidos, como apache o snort-full, por citar dos ejemplos. Syslog se usa en los casos en los que se loguea un único evento por linea. Sort-full, por poner otro ejemplo, está adaptado al tipo de log que deja snort cuando usa el formato de salida full.

Una vez definidos los formatos, empezamos a hablar de reglas. El primer paso es que ossec detecte qué tipo de entrada de log es. Para esto se usan los decoders (decoder.xml) Veamos un ejemplo.

<decoder name="sshd">
<program_name>^sshd</program_name>
</decoder>

Este decoder “se activa” cuando la entrada de log es generada por el programa sshd (sshd[5493] en el siguiente ejemplo):

Feb  3 19:22:33 server sshd[5493]: Accepted publickey for prueba from 192.168.10.2 port 50560 ssh2

Pero, a pesar de la importancia de esta sencilla regla (después veremos por qué), necesitamos extender el decoder para que sea útil.

<decoder name="ssh-failed">
<parent>sshd</parent>
<prematch>^Failed \S+ </prematch>
  <regex offset="after_prematch">^for (\S+) from (\S+) port \d+ \w+$</regex>
  <order>user, srcip</order>
</decoder>

Gracias a la primera y simple regla “sshd” nos aseguramos que ossec sólo va a analizar la entrada de log contra esta segunda regla, mucho más compleja, si dicho log está relacionado con ssh. Esto es una gran mejora en el rendimiento del sistema.

Sin entrar en explicaciones detalladas, esta regla se cumple con este tipo de log:

Jul 26 22:06:10 server sshd[3727]: Failed password for prueba from 192.168.10.2 port 50519 ssh2

Y además crea dos “variables”; una con el usuario que ha intentado conectarse y otra con la IP origen.

Teniendo estos datos ya podemos empezar con las reglas “de verdad”. Veamos algo del fichero sshd_rules.xml

  <rule id="5700" level="0" noalert="1">
    <decoded_as>sshd</decoded_as>
    <description>SSHD messages grouped.</description>
  </rule>

<rule id="5716" level="5">
    <if_sid>5700</if_sid>
    <match>^Failed|^error: PAM: Authentication</match>
    <description>SSHD authentication failed.</description>
    <group>authentication_failed,</group>
  </rule>

De manera similar a los decoders, se define una primera regla básica para ssh, que ayudará a ossec en no tener que analizar reglas de apache (por ejmplo) en logs de ssh.

La segunda regla (5716) generará una alerta de nivel 5 cuando haya un intento de login erroneo.
Pero también se pueden crear reglas compuestas que activarán alertas de mayor nivel si hay x intentos fallidos desde una misma IP

  <rule id="5720" level="10" frequency="6">
    <if_matched_sid>5716</if_matched_sid>
    <same_source_ip />
    <description>Multiple SSHD authentication failures.</description>
    <group>authentication_failures,</group>
  </rule>

Gracias al formato xml es muy sencillo añadir nuevas reglas. También se pueden enviar correos con las alertas a cuentas de correo diferentes en base, por ejemplo, al agente que ha generado la alerta o al nivel de la misma.

En el tercer y último post de la serie vamos a integrar ossec con psad.

 

Antes de seguir con el segundo post sobre ossec, voy a dar un par de pistas sobre cómo ver el tráfico que pasa por una sesión tcp. Esto sí que es todo un mundo, así que me limito, como casi siempre, a dar cuatro detalles para que quien quiera se haga una idea de lo que se puede hacer y siga investigando.

Por supuesto, el que se pueda “espiar” lo que se está trasmitiendo no significa que debamos (ni podamos) hacerlo, al menos si no queremos tener problemas legales.

Vamos a empezar por lo básico. Necesitamos capturar tráfico para poderlo analizar después con cierta tranquilidad. Para esto usamos el conocido tcpdump, aunque podemos usar otros, como por ejemplo, snort.

tcpdump -n -i eth1 -s 1515 -U -w /tmp/captura.pcap '(tcp port 20) or (tcp port 21) or (tcp port 25)'
tcpdump: listening on eth1, link-type EN10MB (Ethernet), capture size 1515 bytes

Para saber lo que hacen los parámetros nada mejor que el manual :-)

Dejamos esta terminal abierta y con el comando en ejecución. Vamos a ir analizando lo que se vuelca en captura.pcap en otra terminal.

Primera prueba.

Desde otra máquina vamos a hacer un telnet al puerto 25 y a mandar un correo.

telnet 192.168.10.1 25
Trying 192.168.10.1...
Connected to 192.168.10.1.
Escape character is '^]'.
220 smtp.example.com ESMTP Postfix
helo prueba.example.com
250 smtp.example.com
rset
250 2.0.0 Ok
helo prueba.example.com
250 smtp.example.com
mail from: prueba@example.com
250 2.1.0 Ok
rcpt to: prueba1@example.com
250 2.1.5 Ok
data
354 End data with .
Subject: Titulo del correo
Este texto se ve en la captura
.
250 2.0.0 Ok: queued as 1F7426C420
quit
221 2.0.0 Bye
Connection closed by foreign host.

Nuestro volcado tiene datos…. vamos a ver que tiene usando tcpflow.

# tcpflow -r captura.pcap -c port 25
192.168.010.001.00025-192.168.010.002.41403: 220 smtp.example.com ESMTP Postfix
192.168.010.002.41403-192.168.010.001.00025: helo prueba.example.com
192.168.010.001.00025-192.168.010.002.41403: 250 smtp.example.com
192.168.010.002.41403-192.168.010.001.00025: rset
192.168.010.001.00025-192.168.010.002.41403: 250 2.0.0 Ok
192.168.010.002.41403-192.168.010.001.00025: helo prueba.example.com
192.168.010.001.00025-192.168.010.002.41403: 250 smtp.example.com
192.168.010.002.41403-192.168.010.001.00025: mail from: prueba@example.com
192.168.010.001.00025-192.168.010.002.41403: 250 2.1.0 Ok
192.168.010.002.41403-192.168.010.001.00025: rcpt to: prueba1@example.com
192.168.010.001.00025-192.168.010.002.41403: 250 2.1.5 Ok
192.168.010.002.41403-192.168.010.001.00025: data
192.168.010.001.00025-192.168.010.002.41403: 354 End data with .
192.168.010.002.41403-192.168.010.001.00025: Subject: Titulo del correo
192.168.010.002.41403-192.168.010.001.00025: Este texto se ve en la captura
192.168.010.002.41403-192.168.010.001.00025: .
192.168.010.001.00025-192.168.010.002.41403: 250 2.0.0 Ok: queued as 1F7426C420
192.168.010.002.41403-192.168.010.001.00025: quit
192.168.010.001.00025-192.168.010.002.41403: 221 2.0.0 Bye

Sorpresa, tcpflow ha generado, en esta caso por salida estándar (-c), todo lo que ha sido capturado en el puerto 25.

Segunda prueba.

Bien, vamos con algo un poco diferente, pero igual de fácil (recordad que esto no son más que ideas)

Ahora vamos a suponer que he programado un rootkit que se llama exploit, y que lo voy a subir por ftp a la máquina que estamos monitorizando.

ftp 192.168.10.1
Connected to 192.168.10.1.
220 Este es un servidor privado. Por favor cierre la sesion inmediatamente.
Name (192.168.10.1:prueba):
331 Please specify the password.
Password:
230 Login successful.
Remote system type is UNIX.
Using binary mode to transfer files.
ftp> put exploit
local: exploit remote: exploit
200 PORT command successful. Consider using PASV.
150 Ok to send data.
226 File receive OK.
101992 bytes sent in 0.00 secs (29433.1 kB/s)
ftp> quit
221 Goodbye.

Muy bien, ahora veamos lo que tenemos, empezando por el puerto 21, y después por el 20.

# tcpflow -r captura.pcap -c port 21
192.168.010.001.00021-192.168.010.002.50427: 220 Este es un servidor privado. Por favor cierre la sesion inmediatamente.
192.168.010.002.50427-192.168.010.001.00021: USER prueba
192.168.010.001.00021-192.168.010.002.50427: 331 Please specify the password.
192.168.010.002.50427-192.168.010.001.00021: PASS secreto
192.168.010.001.00021-192.168.010.002.50427: 230 Login successful.
192.168.010.002.50427-192.168.010.001.00021: SYST
192.168.010.001.00021-192.168.010.002.50427: 215 UNIX Type: L8
192.168.010.002.50427-192.168.010.001.00021: TYPE I
192.168.010.001.00021-192.168.010.002.50427: 200 Switching to Binary mode.
192.168.010.002.50427-192.168.010.001.00021: PORT 192,168,10,2,205,32
192.168.010.001.00021-192.168.010.002.50427: 200 PORT command successful. Consider using PASV.
192.168.010.002.50427-192.168.010.001.00021: STOR exploit
192.168.010.001.00021-192.168.010.002.50427: 150 Ok to send data.
192.168.010.001.00021-192.168.010.002.50427: 226 File receive OK.
192.168.010.002.50427-192.168.010.001.00021: QUIT
192.168.010.001.00021-192.168.010.002.50427: 221 Goodbye.

Vamos a hacer ahora que tcpflow genere un fichero con el tráfico del puerto 20. Para esto quitamos el parámetro -c.

# tcpflow -r captura.pcap  port 20
# file 192.168.010.002.52512-192.168.010.001.00020
192.168.010.002.52512-192.168.010.001.00020: ELF 64-bit LSB executable, x86-64, version 1 (SYSV), dynamically linked (uses shared libs), for GNU/Linux 2.6.8, stripped

Sorpresa, tenemos un fichero binario ….. con el exploit. Bueno, en realidad en una copia de /bin/ls, pero vale para el ejemplo

# strings 192.168.010.002.52512-192.168.010.001.00020

.......
Usage: %s [OPTION]... [FILE]...
List information about the FILEs (the current directory by default).
Sort entries alphabetically if none of -cftuvSUX nor --sort.
Mandatory arguments to long options are mandatory for short options too.
  -a, --all                  do not ignore entries starting with .
  -A, --almost-all           do not list implied . and ..
      --author               with -l, print the author of each file
  -b, --escape               print octal escapes for nongraphic characters
      --block-size=SIZE      use SIZE-byte blocks
.......

¿Qué pasa en la realidad, cuando tenemos cientos o miles de sesiones de correo o ftp y queremos ver una concreta? Por un lado debemos guardar el tráfico que queremos investigar, claro, pero luego debemos saber qué sesiones se han establecido, a qué hora, cuánto tráfico ha pasado por ellas, entre que puertos, …. Para esto hay mucho software, pero un buen ejemplo es argus. A partir de aquí, podemos generar expresiones más elaboradas para usar en tcpflow (algo más que “port 20″)

© 2012 forondarenanet Suffusion theme by Sayontan Sinha