Introducció

L’equip de Service Management Analytics de BaseTIS (SMA) és l’equip encarregat de crear una sèrie de reports de l’evolució del servei que presten per a diferents proveïdors. Aquesta tasca originalment es feia partint d’una sèrie d’Excels i generant PowerPoints que ensenyessin l’evolució del servei prestat pels proveïdors. Llavors es va decidir tenir un grup de persones deslligades del dia a dia de la gestió per mirar de fer aquests informes el més automàtic possible, i aquí va ser quan va sorgir SMA. A part d’automatitzar informes, també s’automatitzen altres tipus de tasques repetitives, entre les quals destaca esborrar tickets amb dades incoherents.

En aquest post parlaré de com automatitzar totes aquestes tasques, que s’han d’executar cada dia. Les primeres comencen a les 6:15h del matí i, a mida que van acabant, es van executant les següents. Aquestes tasques són, bàsicament, scripts de Python que, majoritàriament, generen una sèrie d’informes a partir d’unes dades extretes prèviament.

A la segona part del post faré un petit tutorial sobre com automatitzar un petit workflow amb Python, així que aquest article també et pot ser útil si ets un desenvolupador interessat en l’automatització de tasques.

 

Automatització

Al principi, el workflow que feia SMA era molt senzill: hi havia un script que extreia les dades, un segon script que les processava, un altre que generava els informes i un últim script que les penjava al seu lloc. A més, cada vegada que un script acabava, s’enviava un missatge per un canal de Slack amb el resultat (la idea era que, en un futur, també es pogués reiniciar el workflow, o fins i tot només certes tasques, mitjançant missatges d’Slack). Aquests scripts s’executaven seqüencialment, utilitzant un cinquè script que els anava llançant.

Com us podeu imaginar, aquesta no és la millor idea. A mida que s’anaven afegint scripts (ja que cada vegada hi havia més fonts de dades i més informes a publicar, a part d’altres tasques), la gestió de les tasques es feia cada vegada més complicada. Per començar, les dependències entre les diferents tasques i els errors eren difícils de gestionar (òbviament, hi ha tasques que s’han de fer abans d’unes altres, com extreure les dades abans de processar-les): imagineu que hi ha dos informes a publicar i cadascun utilitza dades de fonts diferents. Primer, extraiem les primeres, i funciona, però hi ha un error a l’hora d’extreure les segones. Com podem dir-li al programa que igualment pot generar el primer informe, però no el segon?

Tot això es va començar gestionant des del mateix script “mestre” de Python, amb moltes sentències condicionals, però a mesura que augmentava el nombre de tasques estava més clar que es necessitava un canvi. Aleshores va ser quan vam entrar en joc el Tomàs Ortega i jo (Roger Romero): com a becaris de SMA, la nostra primera tasca va ser millorar el sistema de gestió de dependències i errors.

El primer que vam fer va ser documentar-nos sobre dues llibreries de Python: Airflow i Luigi. Aquestes serveixen per al que volíem fer: programar workflows amb Python, gestionant les dependències entre tasques. Les llibreries, a més, aporten certes funcionalitats molt útils com, per exemple, una interfície gràfica des de la qual veure (i, amb Airflow, controlar) l’estat del workflow, o l’execució en paral·lel de tasques que no depenen entre elles.

Una vegada ens vam documentar sobre les llibreries, vam començar a fer petites proves amb workflows de poques tasques, per a entrar en contacte amb el seu funcionament i decidir quina s’adaptava millor al nostre projecte.

Aquesta taula resumeix les conclusions que vam obtenir una vegada vam acabar les proves:

 

 

Com podeu veure, Luigi ens oferia dues funcionalitats importants que Airflow no té: l’ús amb Windows, ja que a SMA treballem amb aquest sistema operatiu, i la facilitat per a reiniciar una tasca des d’Slack, ja que com he comentat abans era una cosa que volíem implementar en un futur. D’aquesta manera, ens vam decantar per Luigi, i vam implementar el nostre workflow amb aquesta llibreria.

A continuació, parlaré de com crear un petit workflow amb Luigi, i integrar-ho amb un bot de Slack, que envii missatges per un canal cada vegada que acabi una tasca.
 

Implementació d’un petit workflow amb Luigi

Aspectes tècnics

Per començar, parlaré una mica sobre com funciona aquesta llibreria. El primer que s’ha de saber és que, quan volem executar una pipeline amb Luigi, s’ha d’executar primer el Luigi Scheduler (escribint luigid a una consola). Aquest és un programa que s’encarrega de gestionar les dependències i repartir les tasques entre els diferents Workers, que bàsicament són “treballadors” que s’encarreguen d’executar les tasques. A més, també és l’encarregat de mostrar la interfície gràfica (de la qual parlaré més endavant).

Una altra cosa molt important a tenir en compte és que, quan volem executar un workflow amb Luigi, no se li diu alguna cosa de l’estil “executa’m aquest workflow”. El que se li ha de dir és una tasca concreta a executar i aleshores el Scheduler mirarà si totes les tasques de les quals depèn aquesta ja han estat executades. En cas que així sigui, executarà la tasca que li hem dit. En cas contrari, executarà les que no hagin estat executades (mirant de què depenen, i així recursivament) i després la que nosaltres li hem dit.

Per últim, també cal conèixer com sap Luigi si una tasca ha estat executada o no: quan es programa cada tasca, s’ha d’explicitar un fitxer on aquesta tasca generarà una sortida quan acabi. Així, Luigi, quan vol saber si una tasca ja s’ha executat, va a buscar aquest fitxer: només considera que ja s’ha executat la tasca si el fitxer existeix i conté alguna cosa.

D’aquesta manera, si un workflow falla a la meitat, es pot tornar a executar i només s’executaran les tasques que no hagin generat sortida.
 

Programar amb Luigi

Amb Luigi, cada tasca s’ha de definir com a una classe de Python que hereda de la classe luigi.Task. Una vegada s’ha creat la classe (amb el nom de la tasca), se li poden definir certes funcions (si no se li sobreescriu alguna d’elles, el Scheduler utilitzarà la funció que ve a luigi.Task per defecte):

  • Run: aquesta funció ha de contenir el codi que s’executarà (és a dir, el codi de la tasca en sí).

  • Output: aquesta funció ha de retornar el fitxer que el Scheduler mirarà per a saber si ja s’ha executat la tasca.

  • Requires: aquesta funció ha de retornar una llista amb les tasques de les que depèn la tasca en qüestió.
     

A part d’aquestes tres, que són les més importants, es poden sobreescriure altres funcions, d’entre les quals destaca on_failure (codi que s’executarà si la tasca falla).
 

Exemple

Imaginem que tenim cinc scripts de Python que volem executar, amb certes dependències entre ells. Per a donar-li una mica de gràcia al workflow, suposem que els dos primers s’encarreguen d’extreure dades d’una font, el tercer de processar-les i els dos últims de generar uns informes. D’aquesta manera, l’arbre de dependències quedaria així:
 

A més, també volem que a l’acabar cada tasca, s’envii un missatge per un canal d’Slack amb el resultat de la tasca.

El primer que hem de veure és que com que amb Luigi no s’executa el workflow, sinó una tasca en concret (i totes de les quals depèn), s’ha de crear una nova tasca, RunAll, que depengui de GenerarInforme1 i GenerarInforme2. Així, quan volguem executar el workflow sencer, senzillament li direm a Luigi que executi RunAll.

Ara ja ens podem posar a programar: el primer que s’ha de fer és importar os (per a llegir variables d’entorn), luigi slackclient (que ens permetrà enviar missatges d’Slack).

import os
import luigi
from slackclient import SlackClient

A continuació, comencem a programar les classes. Per a fer més senzilla la integració amb Slack, el que farem serà crear una classe, StandardTask, que heredi de luigi.Task. Després, la resta de classes heredaran de StandardTask.
 

class StandardTask(luigi.Task):
    """
        Classe de la qual heredaran les tasques
    """

    def output(self):
        """
            Path on es guarda l'output de la tasca
        """
        return luigi.LocalTarget("./luigi/" + self.name + '.txt')

    def run_std(self):
        """
            Cada tasca l'ha de sobreescriure amb el codi que executa
        """
        return

    def escriu_output(self):
        """
            Escriu a l'output
        """
        with open(self.output(), 'w') as out:
            out.write('Tasca feta')

    def envia_missatge_slack(self, resultat=True):
        """
            Envia un missatge de Slack
        """
        slack_client = SlackClient(os.system('SLACK_TOKEN'))

        text = "*{}:* :heavy_check_mark:" if resultat else "*{}:* :x:"
        text = text.format(self.name)

        slack_client.api_call(
            "chat.postMessage",
            channel=os.system('SLACK_CHANNEL'),
            text=text,
            username=os.system('SLACK_USERNAME'),
            icon_emoji=os.system('SLACK_ICON')
        )

    def run(self):
        """
            Funció que executarà el Worker de Luigi
        """
        self.run_std()
        self.escriu_output()
        self.envia_missatge_slack(True)

    def on_failure(self, exception):
        """
            Funció que es crida només si la tasca falla
        """
        self.envia_missatge_slack(False)

        super().on_failure(exception)

Com podem veure, aquesta classe té la funció output, que retorna un fitxer .txt amb el nom de la tasca i que està guardat a una carpeta amb nom “luigi” (no per res en especial, només per a tenir les coses ben ordenades).

A més, té la funció run (la que el Worker executarà). Aquesta funció crida primer a run_std, que és la que sobreescriurem a les tasques. Quan acaba crida a escriu_output, que obre el fitxer .txt amb el nom de la tasca i hi escriu “Tasca feta”. Per últim, crida a envia_missatge_slack(True), que envia un missatge a un canal de Slack amb el resultat que se li passi (en aquest cas, True).

Per últim, tenim la funció on_failure que es crida només si la tasca falla, i crida a envia_missatge_slack(False).

Ara, podem programar les tasques en si:
 

class ExtreureDades1(StandardTask):
    """
        Tasca que extreu dades d'una font
    """

    def run_std(self):
        import extreuredades1
        extreuredades1.main()


class ExtreureDades2(StandardTask):
    """
        Tasca que extreu dades d'una font
    """

    def run_std(self):
        import extreuredades2
        extreuredades2.main()


class ProcessarDades(StandardTask):
    """
        Tasca que processa les dades
    """

    def requires(self):
        return [ExtreureDades1(), ExtreureDades2()]

    def run_std(self):
        import processadades
        processadades.main()


class GenerarInforme1(StandardTask):
    """
        Tasca que genera un informa a partir de les dades
    """

    def requires(self):
        return ProcessarDades()

    def run_std(self):
        import generarinforme1
        generarinforme1.main()


class GenerarInforme2(StandardTask):
    """
        Tasca que genera un informa a partir de les dades
    """

    def requires(self):
        return ProcessarDades()

    def run_std(self):
        import generarinforme2
        generarinforme2.main()


class RunAll(StandardTask):
    def requires(self):
        return [GenerarInforme1(), GenerarInforme2()]

Com podem veure, com que hereden de StandardTask, només hem de definir dues funcions per a cada tasca:

  • requires, que en el cas de ExtreureDades1 i ExtreureDades2 no cal definir ja que no depenen de cap tasca.

  • run_std, que conté el codi que s’executa. En aquest cas, s’importa el script que toca i s’executa el seu main. La tasca RunAll no en necessita ja que no cal que executi res, simplement serveix per a que s’executin GenerarInforme1 GenerarInforme2
     

Ara que ja tenim el workflow programat, només ens cal definir el main de l’script, que li dirà a Luigi que executi RunAll. En aquest cas, el main seria:
 

if __name__ == '__main__':
    luigi.build([RunAll()])

L’únic que fa és executar RunAll.
 

Execució

Ara que ja tenim el workflow programat (podem desar el script com a workflow.py, per exemple), només ens queda executar-lo. El primer que hem de fer és obrir una consola i executar luigid. Per a executar el workflow, obrim una altra consola i executem la comanda.
 

python workflow.py

D’aquesta manera, ja tindrem el workflow en execució.

Per últim, si volem veure l’estat del workflow, només fa falta que anem a la url localhost:8082 amb qualsevol navegador web;  allà podrem veure informació sobre quines tasques ja s’han executat, quin ha estat el seu resultat, un arbre de dependències generat per Luigi, etc.

 

Conclusions

Com podeu veure, l’automatització de tasques repetititves és una eina molt potent per a estalviar-nos temps i assegurar-nos que no hi ha errors. Amb aquest post, he volgut reflexar la nostra experiència a SMA amb l’automatització, i de quina manera hem enrobustit tot el procés per a poder escalabilitzar-lo amb cada vegada més tasques. A més, he volgut ensenyar com s’utilitza Luigi per a qualsevol persona interessada en el tema.

Si teniu qualsevol dubte o en voleu aprendre més, no dubteu en enviar-me un correu a:

roger.romero@basetis.com

Imatge de capçalera de Pixabay
Airflow,desenvolupament,Open,processos,Python,sma,