首页技术文章正文

怎样用编程的方法定义Schema?

更新时间:2021-09-08 来源:黑马程序员 浏览量:

IT培训班

当case类不能提前定义的时候,就需要采用编程方式定义Schema信息,定义DataFrame主要包含3个步骤,具体如下:

(1)创建一个Row对象结构的RDD;

(2)基于StructType类型创建Schema;

(3)通过SparkSession提供的createDataFrame()方法来拼接Schema。

根据上述步骤,创建SparkSqlSchema. scala文件,使用编程方式定义Schema信息的具体代码如文件4-3所示。

文件4-3 SparkSqlSchema.scala

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sq1.types.
{IntegerType,StringType,StructField,StructType}
import org.apache.spark.sql.(DataFrame,Row,Sparkession)
object SparkSqlSchema {
def main(args: Array[string]): Unit=(
//1.创建SparkSession
val spark: sparkSession=Sparksession.bullder()
.appName ("SparkSq1Schema")
.master ("1oca1[2]")
.getOrCreate ()
//2.获取sparkConttext对象
val sc: SparkContext=spark.sparkContext
//设置日志打印级别
sc.setLogLevel ( "WARN")
//3.加载数据
val dataRDD:RDD[String]=sc.textFile("D://spark//person.txt")
//4.切分每一行
val dataArrayRDD:RDD[ Array[string]]=dataRDD.map( .split(" "))
//5.加载数据到Row对象中
val personRDD:RDD[Row]=
dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
//6.创建Schema
val schema:StructType=StructType(Seq(
StructField("id",IntegerType,false),
StructField("name",StringType,false),
StructField("age", IntegerType, false)
))
//7.利用personRDD与Schema创建DataFrame
val personDF:DataFrame=spark.createDataFrame(personRDD,schema)
//8.DSL操作显示DataFrame的数据结果
personDF . show ()
//9.将DataFrame注册成表
personDF.createOrReplaceTempView ("t_person")
//10.sq1语句操作
spark.sq1 ("select¥from t_ person") .show()
//11.关闭资源
sc.stop()
spark.stop ()

在文件4-3中,第9~23行代码表示将文件转换成为RDD的基本步骤,第25~29行代码即为编程方式定义Schema的核心代码,Spark SQL提供了Class StructType( val fields:Array[StructField])类来表示模式信息,生成一个StructType对象,需要提供fields作为输入参数,fields是个集合类型,StructField(name,dataTypenullable)参数分别表示为字段名称、字段数据类型、字段值是否允许为空值,根据person.txt文本数据文件分别设置id、name、age字段作为Schema,第31行代码表示通过调用spark.createDataFrame()方法将RDD和Schema进行合并转换为DataFrame,第33~40行代码即为操作DataFrame进行数据查询。





猜你喜欢:

什么是Schema约束?有什么优势?

怎样引入XML Schema文档?

怎样创建Dataset对象?

黑马程序员python大数据培训

分享到:
在线咨询 我要报名
和我们在线交谈!