logo

SQL PySpark

Apache Spark é o software de maior sucesso da Apache Software Foundation e projetado para computação rápida. Vários setores estão usando o Apache Spark para encontrar suas soluções. PySpark SQL é um módulo do Spark que integra processamento relacional com a API de programação funcional do Spark. Podemos extrair os dados usando uma linguagem de consulta SQL. Podemos usar as mesmas consultas da linguagem SQL.

Se você tiver um conhecimento básico de RDBMS, o PySpark SQL será fácil de usar, onde você poderá estender a limitação do processamento de dados relacionais tradicional. O Spark também oferece suporte à Hive Query Language, mas há limitações do banco de dados Hive. Spark SQL foi desenvolvido para remover as desvantagens do banco de dados Hive. Vamos dar uma olhada nas seguintes desvantagens do Hive:

Desvantagens do Hive

  • Ele não pode retomar o processamento, o que significa que se a execução falhar no meio de um fluxo de trabalho, você não poderá retomar de onde ficou travado.
  • Não podemos descartar os bancos de dados criptografados em cascata quando a lixeira está habilitada. Isso leva ao erro de execução. Para descartar esse tipo de banco de dados, os usuários devem utilizar a opção Purge.
  • As consultas ad-hoc são executadas usando MapReduce, que é lançado pelo Hive, mas quando analisamos o banco de dados de tamanho médio, atrasa o desempenho.
  • O Hive não dá suporte à operação de atualização ou exclusão.
  • É limitado ao suporte à subconsulta.

Essas desvantagens são as razões para desenvolver o Apache SQL.

Breve introdução ao PySpark SQL

PySpark oferece suporte ao processamento relacional integrado com a programação funcional do Spark. Ele fornece suporte para diversas fontes de dados para possibilitar a tessitura de consultas SQL com transformações de código, resultando assim em uma ferramenta muito poderosa.

PySpark SQL estabelece a conexão entre o RDD e a tabela relacional. Ele fornece uma integração muito mais próxima entre o processamento relacional e processual por meio da API declarativa do Dataframe, que é integrada ao código Spark.

Usando SQL, ele pode ser facilmente acessível a mais usuários e melhorar a otimização dos atuais. Ele também oferece suporte a uma ampla variedade de fontes de dados e algoritmos em Big Data.

Recurso do PySpark SQL

Os recursos do PySpark SQL são fornecidos abaixo:

1) Acesso a dados de consistência

Ele fornece acesso consistente aos dados, o que significa que o SQL oferece suporte a uma maneira compartilhada de acessar uma variedade de fontes de dados, como Hive, Avro, Parquet, JSON e JDBC. Ele desempenha um papel significativo na acomodação de todos os usuários existentes no Spark SQL.

2) Incorporação com Spark

As consultas SQL do PySpark são integradas aos programas Spark. Podemos usar as consultas dentro dos programas Spark.

Uma de suas maiores vantagens é que os desenvolvedores não precisam gerenciar manualmente a falha de estado ou manter o aplicativo sincronizado com trabalhos em lote.

3) Conectividade Padrão

Ele fornece conexão por meio de JDBC ou ODBC, e esses dois são os padrões do setor para conectividade para ferramentas de business intelligence.

4) Funções definidas pelo usuário

PySpark SQL tem uma função definida pelo usuário (UDFs) combinada com linguagem. UDF é usado para definir uma nova função baseada em coluna que estende o vocabulário da DSL do Spark SQL para transformar DataFrame.

5) Compatibilidade do Hive

PySpark SQL executa consultas Hive não modificadas em dados atuais. Permite total compatibilidade com os dados atuais do Hive.

Módulo SQL PySpark

Algumas classes importantes de Spark SQL e DataFrames são as seguintes:

    pyspark.sql.SparkSession:Representa o principal ponto de entrada para Quadro de dados e funcionalidade SQL.pyspark.sql.DataFrame:Representa uma coleção distribuída de dados agrupados em colunas nomeadas.pyspark.sql.Coluna:Ele representa uma expressão de coluna em um Quadro de dados. pyspark.sql.Row:Representa uma linha de dados em um Quadro de dados. pyspark.sql.GroupedData:Métodos de agregação, retornados por DataFrame.groupBy(). pyspark.sql.DataFrameNaFunções:Representa métodos para lidar com dados ausentes (valores nulos).pyspark.sql.DataFrameStatFunções:Representa métodos para funcionalidade estatística.pysark.sql.funções:Ele representa uma lista de funções integradas disponíveis para Quadro de dados. pyspark.sql.types:Ele representa uma lista de tipos de dados disponíveis.pyspark.sql.Window:É usado para trabalhar com funções do Windows.

Considere o seguinte exemplo de PySpark SQL.

 import findspark findspark.init() import pyspark # only run after findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql('''select 'spark' as hello ''') df.show() 

Saída:

 +-----+ |hello| +-----+ |spark| +-----+ 

Explicação do código:

No código acima, importamos o encontrar parque módulo e chamado findpark.init() construtor; em seguida, importamos o módulo SparkSession para criar a sessão Spark.

de pyspark.sql importar SparkSession

Uma sessão spark pode ser usada para criar a API Dataset e DataFrame. Um SparkSession também pode ser usado para criar DataFrame, registrar DataFrame como uma tabela, executar SQL sobre tabelas, armazenar tabela em cache e ler arquivo parquet.

construtor de classe

É um construtor do Spark Session.

getOrCreate()

É usado para obter um existente SparkSession, ou se não existir, crie um novo com base nas opções definidas no construtor.

Alguns outros métodos

Alguns métodos de PySpark SQL são os seguintes:

1. nome do aplicativo (nome)

É usado para definir o nome do aplicativo, que será exibido na UI da web do Spark. O parâmetro nome aceita o nome do parâmetro.

2.config(chave=Nenhum, valor = Nenhum, conf = Nenhum)

É usado para definir uma opção de configuração. As opções definidas usando este método são propagadas automaticamente para ambos SparkConf e SparkSession configuração.

 from pyspark.conf import SparkConfSparkSession.builder.config(conf=SparkConf()) 

Parâmetros:

    chave-Uma sequência de nomes de chave de uma propriedade de configuração.valor-Representa o valor de uma propriedade de configuração.conf -Uma instância do SparkConf.

3. mestre(mestre)

Ele define o URL mestre do Spark para se conectar, como 'local' para executar localmente, 'local[4]' para executar localmente com 4 núcleos.

Parâmetros:

    mestre:uma URL para o Spark Master.

4. SparkSession.catalog

É uma interface que o usuário pode criar, eliminar, alterar ou consultar o banco de dados, tabelas, funções subjacentes, etc.

banda base vs banda larga

5. SparkSession.conf

É uma interface de configuração de tempo de execução para o Spark. Esta é a interface através da qual o usuário pode obter e definir todas as configurações do Spark e Hadoop que são relevantes para o Spark SQL.

classe pyspark.sql.DataFrame

É uma coleção distribuída de dados agrupados em colunas nomeadas. Um DataFrame é semelhante à tabela relacional no Spark SQL, pode ser criado usando várias funções no SQLContext.

 student = sqlContext.read.csv('...') 

Após a criação do dataframe, podemos manipulá-lo usando as diversas linguagens específicas de domínio (DSL) que são funções pré-definidas do DataFrame. Considere o seguinte exemplo.

 # To create DataFrame using SQLContext student = sqlContext.read.parquet('...') department = sqlContext.read.parquet('...') student.filter(marks > 55).join(department, student.student_Id == department.id)  .groupBy(student.name, 'gender').({'name': 'student_Id', 'mark': 'department'}) 

Vamos considerar o seguinte exemplo:

Consultando usando Spark SQL

No código a seguir, primeiro criamos um DataFrame e executamos as consultas SQL para recuperar os dados. Considere o seguinte código:

 from pyspark.sql import * #Create DataFrame songdf = spark.read.csv(r'C:UsersDEVANSH SHARMA	op50.csv', inferSchema = True, header = True) #Perform SQL queries songdf.select('Genre').show() songdf.filter(songdf['Genre']=='pop').show() 

Saída:

 +----------------+ | Genre| +----------------+ | canadian pop| | reggaeton flow| | dance pop| | pop| | dfw rap| | pop| | trap music| | pop| | country rap| | electropop| | reggaeton| | dance pop| | pop| | panamanian pop| |canadian hip hop| | dance pop| | latin| | dfw rap| |canadian hip hop| | escape room| +----------------+ only showing top 20 rows +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name| Artist.Name|Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 6|I Don't Care (wit...| Ed Sheeran| pop| 102| 68| 80| -5| 9| 84| 220| 9| 4| 84| | 8| How Do You Sleep?| Sam Smith| pop| 111| 68| 48| -5| 8| 35| 202| 15| 9| 90| | 13| Someone You Loved|Lewis Capaldi| pop| 110| 41| 50| -6| 11| 45| 182| 75| 3| 88| | 38|Antisocial (with ...| Ed Sheeran| pop| 152| 82| 72| -5| 36| 91| 162| 13| 5| 87| | 44| Talk| Khalid| pop| 136| 40| 90| -9| 6| 35| 198| 5| 13| 84| | 50|Cross Me (feat. C...| Ed Sheeran| pop| 95| 79| 75| -6| 7| 61| 206| 21| 12| 82| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ 

Usando a função groupBy()

A função groupBy() coleta dados de categorias semelhantes.

 songdf.groupBy('Genre').count().show() 

Saída:

 +----------------+-----+ | Genre|count| +----------------+-----+ | boy band| 1| | electropop| 2| | pop| 7| | brostep| 2| | big room| 1| | pop house| 1| | australian pop| 1| | edm| 3| | r&b en espanol| 1| | dance pop| 8| | reggaeton| 2| | canadian pop| 2| | trap music| 1| | escape room| 1| | reggaeton flow| 2| | panamanian pop| 2| | atl hip hop| 1| | country rap| 2| |canadian hip hop| 3| | dfw rap| 2| +----------------+-----+ 

distribuição(numpartições, *cols)

O distribuição() retorna um novo DataFrame que é uma expressão de particionamento. Esta função aceita dois parâmetros numpartições e *col. O numpartições parâmetro especifica o número alvo de colunas.

 song_spotify.repartition(10).rdd.getNumPartitions() data = song_spotify.union(song_spotify).repartition('Energy') data.show(5) 

Saída:

 +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name|Artist.Name| Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| | 17| LA CANCI?N| J Balvin| latin| 176| 65| 75| -6| 11| 43| 243| 15| 32| 90| | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ only showing top 5 rows