2018年1月9日星期二

spark sql and data frame

Overview
spark sql 是处理结构化数据的Spark模块,与spark的基础RDD模块不同,这个模块提供了更多结构化的信息,与spark sql交互的方式包括SQL和Dataset api.

以下的所有例子都可以在spark-shell shell,pyspark shell和sparkR shell中使用。

SQL
使用spark sql的一种方式是执行SQL查询,Spark SQL也可以用来从现有的Hive安装中读取数据,关于更多的信息,请参考 Hive Tables 部分。 当在另一种编程语言中调用sql时,结果返回的类型会是Dataset/DataFrame类型。你也可以使用 command-line 和 JDBC/ODBC与sql进行交互。

Datasets 和 DataFrames
一个Dataset就是一个数据分布的集合。Dataset是在spark 1.6后添加的新的接口,它提供了RDDs的优点(强壮的类型和使用匿名函数)和Spark sql优化执行引擎的优点,Dataset api在Scala和java中是可以获取的。但是由于python的动态特性,许多Dataset API的优点也已经可用了。R语言的情况也是相似的。

一个DataFrame是一个组织成命名列的DataSet,它的概念等同于一个关系数据库的表或者R/Python中的Data frame, DataFrame 可以从多种资源构造:结构化的数据文件,Hive中的表,扩展的数据库或者从已经存在的RDDs。DataFrame的API对Scala,Java,Python和R都公开。

Getting Started

Starting Point: SparkSession
在spark中,所有函数的入口点是SparkSession类。去创建一个基本的SparkSession,请使用
SparkSession.builder:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
源码可以去 这里

Creating DataFrames
在创建SparkSession的前提下,应用可以创建DataFrames从现存的RDD,Hive table或者Spark 数据资源。
举一个例子,创建一个DataFrame使用一个JSON文件。

# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
源码可以去 这里

Untyped Dataset Operations(aka DataFrame Operations)
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+
更完整的DataFrame操作可以查看 API文档 笔者看完API文档,会将比较常用的API写在下面。

由于笔者主要看的是data frame,接下来要去看API文档,下面不再翻译

更具体的内容你可以参考 这里


————————————————我是分割线———————————————————

DataFrame的API常用操作
  一个DataFrame就相当于Spark Sql中的一个关系数据库,他可以通过SQLContext中的很多函数创建,例如:
people = sqlContext.read.parquet("...")
一旦创建,就可以用很多方法去操作它:
ageCol = people.age
一个更复杂的例子:
# To create DataFrame using SQLContext
people = sqlContext.read.parquet("...")
department = sqlContext.read.parquet("...")

people.filter(people.age > 30).join(department, people.deptId == department.id) \
  .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
上述例子的意思就是先做一个people表的筛选,再取people表和department的union, 然后根据department里面的name和gender属性对表进行一个分组,然后在分好的组里取salary的平均和年龄的最大值。


没有评论:

发表评论

leetcode 17

17.   Letter Combinations of a Phone Number Medium Given a string containing digits from   2-9   inclusive, return all possible l...