Apache Spark Streaming com Python

9
 min reading

Faremos uma demonstração simples, porém muito objetiva de como utilizar o ApacheSpark com o modulo de streaming, para capturar todos os arquivos CSVs disponibilizadosem uma pasta especifica da rede. Será utilizado o Pyspark, banco de dados PostgreSql paraarmazenamento dos dados, três arquivos csv com dados dummy e um jar de conexão ao postgres. Apache Spark, Pyspark, Streaming, PostgreSQL

Apache Spark Streaming com Python

1.Introdução

 

Uma visão geral do Apache Spark. O apache spark é um mecanismo de análise unificado para processamento de dados em grande escala. Ele fornece APIs de alto nível em Java, Scala,Python e R e um mecanismo otimizado que oferece suporte a gráficos de execução geral. Ele também fornece suporte a um rico conjunto de ferramentas de nível superior, incluindo SpakSQL para SQL e processamento de dados estruturados, MLlib para aprendizado de máquina,GraphX para processamento de gráfico e Streams estruturados para computação incremental e processamento defluxo.

Uma visão geral do Spark Streaming. O Spark Streaming é uma extensão da API do Spark principal que permite o processamento escalonável, de alto rendimento e tolerante a falhas defluxos de dados ao vivo. Os dados podem ser ingeridos a partir de muitas fontes, como Kafka,Kinesis, ou TCP, e pode ser processado por meio de algoritmos complexos expressos com funções de alto nível como map, reduce, joine window. Finalmente, os dados processados podem ser enviados para sistemas de arquivos, bancos de dados e painéis ativos. Na verdade,você pode aplicar o aprendizado de máquina do Spark e os algoritmos de processamento de gráficos em fluxos de dados.

PostgreSQL   é  um   sistema   gerenciador  de   banco   de  dados   objeto   relacional  (SGBD),desenvolvido como projeto de código aberto

2.Metodologia

 

O código foi desenvolvido com python e spark (pyspark), usando a API de spark streaming. Foi criado um script spark_streaming_csv.py. O script é executado através dos park-submit,indicando através da flag - - jars o caminho onde está o jar de conexão ao postgre SQL, e logo em seguida o nome do script. Abaixo o código desenvolvido.

from pyspark.sql import SparkSession

from pyspark.sql.types import *

from pyspark.streaming import *

if __name__=="__main__":

spark =SparkSession.builder.appName("StreamingCSV").getOrCreate

()userSchema= StructType().add("id", "string").add("tipo","string").add("nome","string").add("ppu", "string")

df =spark.readStream.option("sep",",").schema(userSchema).csv("/home/nivas/testestream")

diretorio= "/home/nivas/temp"

defatualizabanco(dataf, batchId):

dataf.write.format("jdbc")\

.option("url","jdbc:postgresql://localhost:5438/banco")\

.option("dbtable","streaming.usuarios")\

.option("user","*********************")\

.option("password","*****************")\

.option("driver","org.postgresql.Driver")\

.mode("append")\

.save()

Stcal =df.writeStream.foreachBatch(atualizabanco).trigger(processingTime="5 second").option("checkpointlocation",diretorio).start()

Stcal.awaitTermination()

Importamos as libs necessárias, criamos o spark session, demos nome a aplicação, criamos o schema utilizando o struct type. Após isso usamos o spark streaming para ler todos os arquivos .csv que forem salvos em uma pasta específica. Apontamos o separador como a virgula, e também o schema que criamos. Apontamos um diretório de arquivos temporários, e logo em seguida criamos uma função para gravar os dados no banco postgre Sql.

A função é escrita usando o modo write com formato JDBC, apontamos a url de conexão,tabela a qual os dados serão gravados, driver, usuário e senha. Mode append para gravar os dados, e logo em seguida usamos a função criada atualiza banco no contexto do write streaming, com trigger de execução de 5 segundos.

Utilizamos uma base de dados em csv, com id numérico, tipo como nome do ministério, nome tipo.Todos retirados de base de dados aberto da plataforma de dados do governo federal (http://landpageh.cgu.gov.br/dadosabertos/index.php?url=http://repositorio.dados.gov.br/segrt/pensionistas/PENSIONISTAS_112021.zip).

Após o script ser executado, ele ficara aguardando algum arquivo .csv ser criado nesta pasta para iniciar o processo de leitura e gravação. E a cada arquivo que chegar ele fará o processo no arquivo,leitura, gravação e continuara aguardando. Os arquivos já lidos permanecerão na pasta e somente os novos serão lidos, mantendo assim o histórico dos arquivos na pasta.

 

3.Experimentos e Resultados

Subimos nossa aplicação:

O spark streaming está apontando para a pasta que definimos para colocar os arquivos .csv a serem ingeridos:

Vamos pegar nosso arquivo csv e colocar na pasta:

pasta: Nesse momento, rapidamente o  spark streaming realiza a ingestão na tabela,vamos dar um select no postgre para conferir os dados:

4.Conclusões

A biblioteca Spark Streaming, parte do ecossistema Apache Spark, é usada para processamento de streaming de dados em tempo real. Neste artigo, aprendemos como usar a API do Spark Streaming para processar uma base de dados em csv e ingeri la em um  banco de dados relacional.

Autores: Nivaldo Oliveira de Melo , Pedro Henrique Rodrigues Alves