spark sql 是处理结构化数据的Spark模块,与spark的基础RDD模块不同,这个模块提供了更多结构化的信息,与spark sql交互的方式包括SQL和Dataset api.
以下的所有例子都可以在spark-shell shell,pyspark shell和sparkR shell中使用。
SQL
使用spark sql的一种方式是执行SQL查询,Spark SQL也可以用来从现有的Hive安装中读取数据,关于更多的信息,请参考 Hive Tables 部分。 当在另一种编程语言中调用sql时,结果返回的类型会是Dataset/DataFrame类型。你也可以使用 command-line 和 JDBC/ODBC与sql进行交互。
Datasets 和 DataFrames
一个Dataset就是一个数据分布的集合。Dataset是在spark 1.6后添加的新的接口,它提供了RDDs的优点(强壮的类型和使用匿名函数)和Spark sql优化执行引擎的优点,Dataset api在Scala和java中是可以获取的。但是由于python的动态特性,许多Dataset API的优点也已经可用了。R语言的情况也是相似的。
一个DataFrame是一个组织成命名列的DataSet,它的概念等同于一个关系数据库的表或者R/Python中的Data frame, DataFrame 可以从多种资源构造:结构化的数据文件,Hive中的表,扩展的数据库或者从已经存在的RDDs。DataFrame的API对Scala,Java,Python和R都公开。
Getting Started
Starting Point: SparkSession
在spark中,所有函数的入口点是SparkSession类。去创建一个基本的SparkSession,请使用
SparkSession.builder:
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()源码可以去 这里 看
Creating DataFrames
在创建SparkSession的前提下,应用可以创建DataFrames从现存的RDD,Hive table或者Spark 数据资源。
举一个例子,创建一个DataFrame使用一个JSON文件。
# spark is an existing SparkSession df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+源码可以去 这里 看
Untyped Dataset Operations(aka DataFrame Operations)
# spark, df are from the previous example # Print the schema in a tree format df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ # Select everybody, but increment the age by 1 df.select(df['name'], df['age'] + 1).show() # +-------+---------+ # | name|(age + 1)| # +-------+---------+ # |Michael| null| # | Andy| 31| # | Justin| 20| # +-------+---------+ # Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+ # Count people by age df.groupBy("age").count().show() # +----+-----+ # | age|count| # +----+-----+ # | 19| 1| # |null| 1| # | 30| 1| # +----+-----+更完整的DataFrame操作可以查看 API文档 笔者看完API文档,会将比较常用的API写在下面。
由于笔者主要看的是data frame,接下来要去看API文档,下面不再翻译
更具体的内容你可以参考 这里
————————————————我是分割线———————————————————
DataFrame的API常用操作
一个DataFrame就相当于Spark Sql中的一个关系数据库,他可以通过SQLContext中的很多函数创建,例如:
一旦创建,就可以用很多方法去操作它:
一个更复杂的例子:
上述例子的意思就是先做一个people表的筛选,再取people表和department的union, 然后根据department里面的name和gender属性对表进行一个分组,然后在分好的组里取salary的平均和年龄的最大值。
没有评论:
发表评论