Google Cloud Composer (integrado ao Apache Airflow) simplificado

9
 min reading

O Google Cloud Platform oferece um serviço chamado Cloud Composer para agendar e monitorar pipelines que se estendem por ambientes híbridos e com várias nuvens. Esse serviço é baseado no projeto de código aberto Apache Airflow e operado usando python.

Google Cloud Composer (integrado ao Apache Airflow) simplificado

1. Introdução

O Objetivo deste tutorial,  é passar os conceitos básicos de Cloud Composer / Airflow para orquestração de pipelines dedados construídos em GCP.

2. Passo a passo

Vamos supor que o ambiente do Airflow já foi criado para seu projeto do GCP. Digite composer na barra de pesquisa do GCP, que abre uma Interface conforme mostrado abaixo:

Nome: É o nome do seu projeto no qual o ambiente do Composer foi criado, este será anexado ao seu projeto. Se você deseja criar uma programação entre projetos, é necessário estabelecer uma conexão de projeto https://cloud.google.com/composer/docs/how-to/managing/connections

Airflow: Isso abrirá a interface de programação , onde você pode ver o fluxo de trabalho, acionar um fluxo de trabalho (DAG), verificar os registros etc.

DAGs: Directed Acyclic Graph é uma coleção de todas as tarefas que você deseja executar, organizadas de uma forma que reflita seus relacionamentos e dependências. Um DAG é definido em um script Python, que representa a estruturados DAGs (tarefas e suas dependências) como código.

DAG de amostra : vamos criar e entender um DAG de amostra, digamos que você queira criar e agendar um fluxo de trabalho, que pega uma determinada tabela e transforma em um arquivo . Csv e faz a entrega em um determinado Bucket , para ser consumido pela área solicitante.  

-----------------------------------------  início do código ------------------------------------------------

importairflow

fromairflow

import DAG

fromairflow.operators.bash_operator import BashOperator

from datetime import datetime,timedelta

default_args = {

'start_date': datetime(2021, 4, 16),

'retries': 36,  

'retry_delay':timedelta(minutes=5),

'depends_on_past': False,

}

 

dag = DAG(    

'FOOD_quarta,    

default_args=default_args,    

description='FOOD_quarta,    

schedule_interval='0 9  * * WED,    

catchup=False,    

max_active_runs=1,    

concurrency=3

)

 

FOOD_segunda = BashOperator(

task_id='FOOD_quarta,

bash_command='bq extract--destination_format CSV --field_delimiter ";"--print_header=TRUE  vtex.tbl_cie_foodgs://cie_vtex/FOOD/QUARTA/CIE_FOOD_$( date +"%Y%m%d" ).csv',

dag=dag,

depends_on_past=False)

 

FOOD_quarta

-----------------------------------------  fim do código------------------------------------------------

Em uma DAG, primeiro você precisa importar as bibliotecas que deseja usar, neste exemplo, importei os modelos de biblioteca padrão usados para executar o DAG,depois o operador from airflow.operators.bash_operator import BashOperator e ,em seguida, as bibliotecas from datetime import datetime, timedelta para agendamento .

Agora definimos nossos locais de intervalo e argumento padrão, em default_args para oDAG, como data de início, novas tentativas e há muitos outros que podem ser definidos conforme necessário, mas aqui defini apenas alguns que são necessários.

Agora o nome da DAG , sendo o schedule_interval o item mais importante para os agendamentos.

Neste caso o processo executa todas as quartas-feiras. ás 09:00 horas AM. Usei como apoio o linkhttps://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html novamente,há muitos parâmetros que podem ser definidos, usei apenas os que são necessários.

Por fim a execução da tarefa, com o comando :

bash_command = 'bq extract--destination_format CSV --field_delimiter ";" --print_header=TRUEvtex.tbl_cie_food.  

Extraio o conteúdo da tabela vtex.tbl_cie_food para um arquivo no format CSV , com header e delimitador ; (ponto e virgula).

Após a extração o arquivo é enviado para o bucket do projeto:

gs://cie_vtex/FOOD/QUARTA/CIE_FOOD_$(date +"%Y%m%d" ).csv'

Onde  $( date +"%Y%m%d" ).csv  criará arquivos com Ano/Mes/Dia a cada nova execução da DAG.

Salve este arquivo como <<file_name>> .py e coloque-o na pasta DAGs (esta pasta pode ser aberta clicando na opção DAGs na GUI e arrastando o arquivo para dentro da pasta.

Em seguida, abra o ambiente do Airflow clicando na opção Airflow

Clique nas DAG escolhidas, e isso abrirá os detalhes do fluxo de trabalho conforme mostrado abaixo, Existem vários estágios mencionados como agendado, ignorado,sucesso, falha, execução, etc. Aqui, todas as DAGs estão em sucesso, sendo mostrado como Verde Escuro.

Em Task instance details , todos os detalhes da execução

Então,isso é tudo para criar e agendar uma DAG. Há muito que você pode explorar e fazer por meio do Composer / Airflow. Este foi um exemplo simples e prático aplicado no ambiente de produção, em GCP.