本文介绍了Spark中三种api中的关系和选用方式,原文在此

对于程序员来说,最有吸引力的API应该是易用的,可以提高开发者开发效率,易理解而且表现力抢的。Spark的魅力之一就是给开发者提供了易用的操作大数据集的API,而且这种API事跨语言的,可以用在Scala,Java,Python和R上。

本文研究了Spark2.2(和更新的版本)中的三个API集——RDDs,DataFrames和Datasets,并将解答下面几个问题。

  1. 该如何从中选择合适的API?
  2. 每一种API的性能和优化效益?
  3. 列举使用DF和DS代替RDD的场景。

鉴于Spark2.0以后将DF和DS的API进行了统一,文章将主要关注DF和DS。

统一这两种API的主要原因是为了限制Spark中概念的数量和提供对结构化数据的处理方法,从而达到简化Spark的目的。 通过结构,Spark可以提供更高层次的抽象和API作为DSL的基础。

RDD(Resilient Distributed Dataset)

RDD随Spark一同出现,是面向用户的最基础的API。在Spark内部RDD是一个被分配到集群中不同节点上的不可变得分布式数据集合,RDD可以通过低级API进行并行操作,RDD支持的操作包括转化和行动。

何时使用RDD?

下面这些情况下可以考虑使用RDD:

  1. 你需要在你的数据集上使用底层的转换和行动和控制
  2. 你的数据是非结构化的,比如媒体流和文本流
  3. 你想要使用函数式编程操作你的数据,而不是特定领域的表达。
  4. 你通过属性名称或者列来访问数据时,不在乎结构数据(比如列的格式)
  5. 你可以放弃结构化数据DF和半结构化数据DS的优化和性能优势。

在Spark2中RDD有哪些变化??

你可能会问,RDD被降级为二等公民了么?它被弃用了么?答案是否定的,此外,你可以通过简单地API调用从DF或DS中无缝的迁移到RDD,

DataFrame

像RDD一样,DF也是一个分布式的不可变数据集合。和RDD不同的是,DF将数据存放到命名的列中,这和关系型数据库中的表类似。DF通过允许开发者在数据集上加入结构信息从而提供高层次的数据抽象,使大数据集处理变得更为简单。DF提供领域特定语言的API(SQL),这使得Spark的受众更广,不再只是特定的数据工程师。

我们在Spark2.0的会议和博客中提到DF的API和DS的API会在Spark2.0中合并,统一不同库中的数据处理能力。这次统一不但减少开发者需要学习的概念,也提供了单一的高层次类型安全的API,也就是Dataset。

spark unify structure

Dataset

从Spark 2.0开始,Dataset包括两个单独的特征:强类型API和无类型API(如下表所示)。DF可以认为是Dataset[Row]的别名,这里的Row是一个是一个无类型的JVM对象。而Dataset是强类型JVM对象,是Scala中的case class或者Java中的class。

Language Main Abstraction
Scala Dataset[T] & DataFrame(alias for Dataset[Row])
Java Dataset[T]
Python* DataFrame
R* DataFrame

* 由于Python和R不支持编译时类型安全,所以只提供无类型API

Dataset API的好处

作为一个Spark开发者,你可以从Spark2.0的统一API中获得如下好处

  1. 静态类型和运行时类型安全: 将静态类型和运行时安全作为一个谱,SQL的限制最少,Dataset的限制最多。例如,Spark SQL查询的错误只有在运行时才能发现而使用DataFrame和Dataset可以在编译时就能发现错误(这可以节省开发时间和花费)。话句话说,编译器可以轻易发现你调用了一个DF中不提供的API。不过,DF不能在编译时发现不存在的列。

在谱的最远端是Dataset, 他的限制最严格。由于Dataset的API都是使用lambda表达式和有类型的JVM对象,所有参数类型不匹配都能在编译时被发现。当然,你的分析错误也能在编译时被发现,这进一步提升了开发效率。

所有这些选项中,Dataset是最严格也最高效的选项。

type-safety-spectrum

  1. 结构化、半结构化数据的高阶抽象和自定义视图

DF作为一个Datasets[Row]的集合,对半结构化数据生成了一个结构化的自定义视图。例如你有一个巨大的JSON格式的IoT设备行为数据。因为JSON是半结构化格式,它非常适合被转化为特定类型的强类型数据集Dataset[DeviceIotData]。

{"device_id": 198164, "device_name": "sensor-pad-198164owomcJZ", "ip": "80.55.20.25", "cca2": "PL", "cca3": "POL", "cn": "Poland", "latitude": 53.080000, "longitude": 18.620000, "scale": "Celsius", "temp": 21, "humidity": 65, "battery_level": 8, "c02_level": 1408, "lcd": "red", "timestamp" :1458081226051}

你可以把每条JSON数据当做一个DeviceIoTData类型的数据,这里的DeviceIotData可以是一个Scala case class。

case class DeviceIoTData (battery_level: Long, c02_level: Long, cca2: String, cca3: String, cn: String, device_id: Long, device_name: String, humidity: Long, ip: String, latitude: Double, lcd: String, longitude: Double, scale:String, temp: Long, timestamp: Long)

然后我们可以将数据作为JSON文件读入。

// read the json file and create the dataset from the 
// case class DeviceIoTData
// ds is now a collection of JVM Scala objects DeviceIoTData
val ds = spark.read.json(“/databricks-public-datasets/data/iot/iot_devices.json”).as[DeviceIoTData]

上面的代码做了三件事情:

1. Spark读入JSON,推断结构,并创建一个DF。
2. 此时spark还不知道数据的具体结构,所以会将数据转换成DF(Dataset[Row])。
3. Spark 将数据从Dataset[Row]装换成Dataset[DeviceIoTData],一个指定类型的Scala JVM对象。

大多数处理过强结构类型数据的人习惯于按列处理数据或访问对象的特定参数。上面生成的强类型数据集可以无缝的提供编译时检查和自定义视图。而从中生成的新的强类型数据也可以轻松地被显示或用高阶方法处理。

  1. 易于使用的结构化API

虽然结构可能会限制spark对数据的操作,但它能引入丰富的语义和一组简单地特定领域的操作,这些操作可以表示为高阶概念。大部分计算都可以通过Dataset的高阶API来万恒。例如,对DeviceIotData进行agg,select,sum,avg,map,filter或groupby等操作比对RDD的row做相同操作容易得多。

使用特定领域的API来表达计算过程比在RDD上使用关系代数要容易得多。例如,下面带啊会filter和map数据并生成一个新的数据集。

// Use filter(), map(), groupBy() country, and compute avg() 
// for temperatures and humidity. This operation results in 
// another immutable Dataset. The query is simpler to read, 
// and expressive

val dsAvgTmp = ds.filter(d => {d.temp > 25}).map(d => (d.temp, d.humidity, d.cca3)).groupBy($"_3").avg()

//display the resulting dataset
display(dsAvgTmp)
  1. 性能和优化

除了上述的优势之外,因为下面两点原因,你也不能忽略使用DF和DS带来的空间效率和性能提升。

首先,因为DF和DS API是建立在Spark SQL引擎上的,它使用Catalyst来优化查询逻辑以提升性能。 所有语言(R,Java,Scala,Python)的DF、DS API 的关系类型的查询都是用相同的代码优化器来提升空间和时间效率。 Dataset[T]的强类型API针对数据工程优化,无数据类型的DatasetRow速度更快,更适合交互式分析。

其次,既然Spark编译器可以理解强类型的Dataset,它会使用编码器把特定类型的JVM对象映射成Tungsten内部的内存表示。这使得Tungsten编码器能高效而序列化、反序列化数据并且声生成凑字节码,这可以提升代码执行速度。

我应该在什么时候使用DF或DS?

- 如果你想要丰富的语义,高层次抽象,特定领域的API,使用DF或DS

  • 如果你的处理需要高阶表达式,filter,map,aggregation,average,sum,SQL queries,按列访问或在半结构化数据上使用lambda表达式,使用DF或者DS。
  • 如果你想要更好的编译时类型安全,想要强类型JVM对象,利用Catalyst优化和Tungsten的高效代码生成功能,使用Dataset。
  • 如果你想要在不同的Spark库之间使用统一且简洁的API,使用DF或者DS。
  • 如果你使用R,使用DataFrame
  • 如果你使用Python, 使用DataFrame,并在需要更多控制权是切换到RDD。

注意你可以使用.rdd无缝的将你的DF和DS转换成RDD.例如,

// select specific fields from the Dataset, apply a predicate
// using the where() method, convert to an RDD, and show first 10
// RDD rows
val deviceEventsDS = ds.select($"device_name", $"cca3", $"c02_level").where($"c02_level" > 1300)
// convert to RDDs and take the first 10 rows
val eventsRDD = deviceEventsDS.rdd.take(10)

总结

总的来说,如何选择何时使用RDD,DF或DS是显而易见的。RDD提供底层方法和更多控制权,DF和DS允许你使用自定义视图和结构,提供高层次的特定领域的操作,节约空间并且速度更快。

在我们研究了从早期版本的Spark中学到的经验–怎样简化Spark的使用,怎么优化性能–我们决定将底层RDD API整合成先DF和DS一样的高层次的抽象,并在Catalyst和Tungsten之上建立统一的跨库的统一数据抽象。

从三者中选择一个满足你需求和应用场景的选项。如果你和大多数开发者一样选择结构化或半结构化数据结构,我不会感到惊讶。