MH-EX-001: Análisis básico de malware con Yara¶
En este ejemplo práctico crearemos una pequeña herramienta que nos permita analizar, de forma muy básica, malware usando Yara.
Yara es una herramienta que tiene como objetivo ayudar a los investigadores de malware a clasificar y detectar malware.
El uso de Yara puede ser tan sencillo o complejo como queramos. La herramienta propuesta tiene como objetivo ser un PoC simple, funcional y práctico. Siéntase libre de copiar el código y ampliarlo como necesite.
Problema¶
Analizar muestras de malware en batch puede puede no resultar sencillo, sobre todo si las muestras son de gran tamaño o tenemos que aplicar muchas reglas.
La principal dificultad suele ser el correcto diseño, implementación y documentación de la aplicación.
Solución¶
La solución propuesta implementa un sistema de procesamiento paralelo, distribuido y automatizado de análisis de muestras, en el que se pueden añadir nuevas muestras en cualquier momento, quedando en cola de espera para ser atendidas.
La aplicación generará un resultado en formato CSV compatible con Excel.
Las soluciones aquí propuestas siguen los casos de estudio propuestos en el bloque de desarrollo.
Cómo¶
Funcionamiento¶
Ayuda en la linea de comandos¶
Nuestra aplicación se ejecuta a través de la linea de comandos. Para ver todas las opciones disponibles tan solo tenemos que escribir:
python start.py --helpY como resultado:
usage: start.py [-h] -p SAMPLE_PATH [-v VERBOSITY] [-o OUTPUT_FILE] [--rules-path RULES_PATH] OMSTD Malware optional arguments: -h, --help show this help message and exit -p SAMPLE_PATH, --sample-path SAMPLE_PATH binary sample path -v VERBOSITY enable verbose mode -o OUTPUT_FILE output file name --rules-path RULES_PATH yara rules path (default .rules/)
Ejecución básica¶
El funcionamiento es muy simple. El analizador está hecho usando Celery, por lo que hay que seguir los mismos pasos explicados en el anexo del caso de estudio BH-001 para ejecutar la aplicación: Pasos para lanzar Celery.
Una vez lanzado el servicio de Celery, así como sus dependencias, nuestra aplicación ya está a la espera de recibir muestras para ser analizadas. Para esto ejecutaremos:
python start.py -p samples/sample.txt -o resultsAquí podemos ver la salida generada: Un CSV con la información:
HelloWorld,True,Data: 'Hello world' (flags: 19 # offset: 0),1,Rule HelloWorld was found a match in binary 'sample.txt' with tags: No tags HelloWorld,True,Data: 'Hello world' (flags: 19 # offset: 0),0,Rule HelloWorld was found a match in binary 'sample.txt' with tags: No tags Another,False,,1,Rule Another was NOT found a match in binary 'sample.txt' with tags: No tags SeeYou,True,Data: 'See you' (flags: 19 # offset: 12),1,Rule SeeYou was found a match in binary 'sample.txt' with tags: No tags Another,False,,0,Rule Another was NOT found a match in binary 'sample.txt' with tags: No tags
Desglose de las piezas¶
Las piezas que compondrán la herramienta son las siguientes:
- Servicio que recibe nuevas muestras.
- Generador de resultados.
- Medio para añadir nuevas muestras
Todo el código fuente lo puedes descargar y probar aquí.
La estructura y organización del proyecto sigue las directrices de ST-001.
Recepción de muestras¶
El servicio de recepción de muestras es un tarea de Celery, a la espera de recibir nueva información para analizar de forma asíncrona y en background.
El motor de análisis es muy sencillo. El análisis se hace en dos partes:
- El análisis con Yara.
- La interpretación y transformación de resultados.
Ambas partes están definidas como dos funciones en el fichero yara_task.py.
Analizador con Yara
La función yara_task, convertida en tarea con el decorador @celery.task, es la encargada de hacer la llamada a Yara y:
- Cargar las reglas Yara, con el método yara.compile(), indicándole el listado de ficheros “*.yara” con las reglas.
- Usando partials (LP-003) construimos la llamada al callback. Dicho callback será llamado por la librería de Yara, cada vez que ésta ejecute con éxito una regla.
- Finalmente se lanza la orden para hacer el “match”, rules.match(...), indicándole el partial anteriormente construido.
En el siguiente código se puede ver estos puntos:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @celery.task def yara_task(input_parameters): """ Celery task that process a binary sample. :param input_parameters: Parameters object with global input parameters :type input_parameters: Parameters """ # Load Yara rules and match with binary rules = yara.compile(filepaths=input_parameters.yara_rules_files) # Make custom callback function callback_function = partial(yara_callback, input_parameters) # Run Yara rules! rules.match(input_parameters.sample_path, callback=callback_function)Analizador de resultados
La función yara_callback actúa como callback que Yara llamará cuando termine de procesar cada una de las reglas.
En ella, y tras comprobarse la validez de los parámetros de entrada, se llevan a cabo las siguientes acciones: #. Transformar los datos de entrada del formato Yara al formato interno, del tipo Results, según SP-003 #. Enviar la información, de forma asíncrona, a la tarea que se encarga de almacenar los resultados: celery.send_task("framework.tasks.export_results_task.export_to_csv", ...).
En el siguiente código se puede ver estos puntos:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 # ---------------------------------------------------------------------- def yara_callback(input_parameters, yara_results): """ Call back for Yara match. Retreive Yara yara_results, as dictionary, and call Celery task that stores yara_results. :param input_parameters: Parameters object with global input parameters :type input_parameters: Parameters :param yara_results: Raw Yara match yara_results in format: {'matches': True, 'rule': 'HelloWorld', 'namespace': '0', 'tags': [], 'meta': {}, 'strings': [{'identifier': '$a', 'flags': 19, 'data': 'Hello world', 'offset': 0}]} :type yara_results: dict """ # Inputs validations if not isinstance(yara_results, dict): raise TypeError("Expected dict, got '%s' instead" % type(yara_results)) if yara_results is None: print("Yara rules returned not yara_results") return # Yara input format: # # {'matches': True, # 'meta': {}, # 'namespace': '0', # 'rule': 'HelloWorld', # 'strings': [{'data': 'Hello world', # 'flags': 19, # 'identifier': '$a', # 'offset': 0}], # 'tags': []} # r_rule = yara_results['rule'] r_matches = yara_results['matches'] r_tags = ",".join(yara_results['tags']) if yara_results['tags'] else "No tags" r_namespace = int(yara_results['namespace']) # Fixing payload r_payload = "#".join(["Data: '%s' (flags: %s # offset: %s)" % (x['data'], x['flags'], x['offset']) for x in yara_results['strings']]) # Make Results structure r = Results(rule=r_rule, matches=r_matches, payload=r_payload, namespace=r_namespace, description="Rule %s was %sfound a match in " "binary '%s' with tags: %s" % (r_rule, "" if r_matches else "NOT ", input_parameters.sample_file_name, r_tags)) # Send info to exporter celery.send_task("framework.tasks.export_results_task.export_to_csv", (input_parameters, r))
Generador de resultados¶
Los resultados serán generados de forma asíncrona, al igual que la recepción de éstos. Para ello, se ha creado una tarea de Celery, a la espera de recibir nueva información, con la que generar los resultados deseados.
La herramienta genera los resultados a través de la función yara_callback(...). Ésta, una vez hecha la transformación, hace una llamada la a la tarea generadora de resultados, llamada: export_to_csv(...).
Hay varias formas de llamar a una tarea de Celery, como se puede estudiar en BH-001. En este caso nos hemos decantado por la opción my_task.send_task("..."), como se ha visto en la :ref:` sección anterior <mh-001-results-analysis-section>`.
La función, convertida en tarea, export_to_csv(...) hace 3 cosas muy sencillas:
- Una comprobación mínima de los parámetros de entrada.
- Abre un fichero, siguiendo las recomendaciones de LP-005, en modo “append” o para añadir información al final de éste.
- Escribe una nueva linea en el fichero en formato csv
Tal y como podemos ver en la lineas señaladas:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @celery.task def export_to_csv(input_params, results): """ Export results to CSV file. :param input_params: Global Parameters instance with input configuration :type input_params: Parameters :param results: Results object with analysis results. :type results: Results """ if not isinstance(input_params, Parameters): raise TypeError("Expected Parameters, got '%s' instead" % type(input_params)) with open("%s.csv" % input_params.output_file, "a") as f: csv_writer = csv.writer(f) # Title # csv_writer.writerow(["# Rule", "matches", "payload", "description"]) csv_writer.writerow([ results.rule, results.matches, results.payload, results.namespace, results.description ])
Añadir nuevas muestras¶
Añadir nuevas muestras, es equivalente a decir: enviar nuevas muestras a la cola de análisis, para que sean procesadas cuando toque su turno.
Para este PoC se ha optado por algo sencillo: un linea de comandos (en secciones anteriores se muestra cómo ejecutarse).
Hemos tenido en cuenta las siguientes medidas o recomendaciones:
- Hemos usado la librería standard argparser, siguiendo la sintaxis de la documentación oficial, y teniendo en cuenta las buenas prácticas especificadas en IT-001.
- Además, hemos prevenido la ejecución accidental o a destiempo aplicando LP-004.
- Hemos centralizado la ejecución en un punto, usando ST-004, dejando preparado el proyecto para otras interfaces de usuario.
- Hemos usado un punto central para almacenar los parámetros de ejecución del usuario, como se recomienda en ST-002.
Tal y como podemos ver en la lineas señaladas:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import argparse from api import Parameters, run_all # ---------------------------------------------------------------------- if __name__ == '__main__': parser = argparse.ArgumentParser(description='OMSTD Malware') parser.add_argument("-p", "--sample-path", type=str, dest="sample_path", help="binary sample path", required=True) parser.add_argument("-v", dest="verbosity", type=int, help="enable verbose mode", default=False) parser.add_argument("-o", dest="output_file", type=str, help="output file name", default=None) parser.add_argument("--rules-path", dest="rules_path", type=str, help="yara rules path (default .rules/)", default=None) params = parser.parse_args() input_parameters = Parameters(sample_path=params.sample_path, output_file=params.output_file, verbosity=params.verbosity, rules_path=params.rules_path) run_all(input_parameters)La ejecución tiene lugar a través de la función run_all(...), incluido en el fichero api.py. Si observamos el código, podemos comprobar que lo que hace dicha función es una llamada a la tarea de análisis, con sintaxis Celery:
1 2 3 4 5 6 7 8 9 10 from lib.data import Parameters, Results from framework.celery.celery import celery from framework.tasks.yara_task import yara_task # ---------------------------------------------------------------------- def run_all(input_parameters): # Display results yara_task.delay(input_parameters)
Anexo¶
Se puede encontrar más información, así como reglas Yara predefinidas en las siguientes URL:
- https://github.com/arbor/yara
- https://github.com/1aN0rmus/Yara
- https://github.com/0pc0deFR/YaraRules
- https://github.com/kevthehermit/yaraMail
- https://github.com/3vangel1st/Yara
- https://github.com/frankenwino/yara-rules/
- https://github.com/nxdamian/YARA-Public
- https://github.com/jackcr/yara-memory
- https://github.com/0pc0deFR/YaraRules
- https://github.com/sysforensics/YaraRules
- https://github.com/Neo23x0/Yara-BRG
Otros enlaces interesantes: