大数据建模与分析笔记

banner

前言

就在十几年前,计算机的储存空间对于普通消费用户来说还是一个相对稀缺的资源。还记得当年家里购买的第一台台式机,它一整台机器满打满算不过只有40多GB的空间,而去除XP系统后,空间更是所剩无几。因此,对于当年的我来说,像“暗黑破坏神2”这种占用空间高达1.1GB的游戏就已经十分令人震撼了。

随着这些年的硬件迭代,用户的储存需求也高速增长着,如今,我们买的一台新电脑如果没个TB级的储存空间,仿佛都不好意思和人打招呼。当然,这并不仅仅是参数的好看与否的问题,更重要的是,每个人所需的存储空间的确是指数级的上升了。比如随便下载一个3A水准的游戏、随便下载一个蓝光电影、或随便装点软件,所需的容量就要50+GB了。

对于普通用户尚且如此,那就更不用说企业级的数据储存需求了。事实上,对于企业、或是数据中心来说,TB都算不上是基本单位。在数据中心中,EB这样的夸张单位其实也就是属于基本操作。

好了,意识流的废话就先说这么多了,总而言之,在这个所谓的“大数据”的时代,过去的存储计算结构,比如将所有数据放在一台服务器上进行存储、运算,显然已经无法胜任当前的需求。这种背景下,便出现了各类与“大数据”相关的技术,以更高效地解决分布式计算的各种需求。为了追赶一下时代的步伐,本学渣也在这一学期选了“大数据建模与分析”这门课,而一如既往,为了防止课程内容量超出本人微薄的脑容量,我在这里开一篇笔记博文,以备未来不时之需。

目前课程还未结束,本文会随着课程进度,同步更新。


概述

简而言之,这门课程主要只学习三大块内容:大数据分析核心“MapReduce”、MapReduce的新一代实现Spark以及Python环境下的PySpark、最后是云存储Amazon Athena (SQL)。

映射规约-MapReduce

大数据分析中,最基础的部分被称为“MapReduce”。那么什么是“MapReduce”呢?官方一点的说法是一种软件架构,但对我而言,这就是一套标准化的工作流程范式,最初,谷歌在它发表的一篇论文中对其进行了描述。当然,谷歌并没有好心到把它内部使用的MapReduce源代码放出,而仅仅是从宏观层面“描述”了整个流程。当然只看宏观描述就能复现实验的大佬总是存在的,于是在这篇论文后,Hadoop团队利用Java实现了MapReduce并进行了开源,得以让大众从代码层面可以使用这一套流程,因此当我们说到MapReduce时,大多指的就是Hadoop团队实现的这一套开源代码。

见名知意,MapReduce是“Map”和“Reduce”两词的和,也就是说,这一工作流程被分为映射Map和规约Reduce。在Map阶段之前,首先我们会将要处理的文件拆分,然后分发给不同的子节点机器,每个机器便会按照Map中我们自定义的方法对数据进行处理并输出结果。而Reduce阶段则将前面的结果进行汇总,最终得到我们想要的数据。

当然,由于MapReduce是在业界进化出来的模式,自带反直觉性,也就是说,我们正常人一般来说是不会按它那套模式思考问题的,这就导致了这些描述性的文字对我们而言其实没有实感。所以我就不在这多写了,直接上案例,多做几个案例就能大概明白这一套玩法了。

案例—-词频统计

词频统计属于是MapReduce的入门案例。

需求很直观:统计一份文档中每个词出现的频次。

我们最直观的想法,当然是遍历文档中每一个词,然后将其放到哈希表中。当然,这种方法最大问题就是耗时长,且对于超出机器内存大小的文件无法处理。

而MapReduce则将过程分为“Map”和“Reduce”,在map阶段,将原数据处理为许多(word, 1)的格式,在reduce阶段,则将这些键值对整理为(word, count)的结果输出。


Spark

Spark是Apache下的大数据处理引擎,其建立在“弹性分布式数据集”(Resilient Distributed Dataset)之上,当然我们一般将这个数据集简称为“RDD”

这里不去讨论RDD的具体实现,也不过多探讨Spark的底层原理,简介可以在 这里 看到。总而言之,相较于之前的Hadoop MapReduce,Spark的中间数据都是储存在内存层面,而不是硬盘,因此其速度较Hadoop MapReduce要快许多。也就是说Spark是Hadoop MapReduce的继任者。


PySpark

Spark起初是写在Java语言之上,然而,由于Python相较java而言,对于大部分用户来说更简单易用,Spark也对于Python进行了适配,而这就是PySpark。

当我们在Python平台下使用Spark,由于其弱类语言的特性,我们无需考虑返回值的类型之类的东西,开发极其简单。可以说PySpark将大数据的门槛极大的拉低。

SparkContext对象

使用Spark时,我们首先要获取一个SparkContext对象来进行初始化。

RDD算子们

Spark处理大数据,其实和SQL很像,我们作为开发者,代码无需过多关注集群机器之间的交互,在开发时,只要使用RDD的算子,Spark会在运行时自动调度集群资源。只要当成是在同一台机器上运行进行开发即可。

RDD算子有很多,这里不一一阐述,就用一次作业来作为案例。

需求:我们现在有一份电影评分数据,格式为:<用户名user_id><,><电影名movie_id><,><评分rating>。我们想得到的结果有:

  1. 每个电影的平均分数,输出为:[movie_id_1] [average-rating-for-movie_id_1]
  2. 获得到5分的电影列表,输出为:[“rating-5”] [list-of-unique_movies-rated-as-5]
    P.S. 额外要求是过滤掉低于2的评分以及结果过滤掉均分小于2.5的结果。

做法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# 首先我们创建一个SparkContext对象,这是标准流程
from pyspark import SparkContext
sc = SparkContext('local', 'test')

# 通过textFile方法来从txt中读取数据
raw_rdd = sc.textFile("movies.txt")

# 按逗号切分每一行数据,然后将评分小于2的过滤
filtered_rdd = raw_rdd \
.map(lambda x: x.split(',')) \
.filter(lambda x: (len(x)==3) and (x[2]>='2'))

# 将数据转换为(“电影名”,评分)的格式
mapped_rdd = filtered_rdd \
.map(lambda x: (x[1], ord(x[2])-48))

# 这里做一下缓存,因为第二个需求还是要用到这个RDD
mapped_rdd.cache()

# 计算并输出评分均值结果
average_rdd = mapped_rdd.mapValues(lambda v: (v, 1)) \
.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
.mapValues(lambda v: v[0]/v[1]) \
.filter(lambda x: x[1]>=2.5)

# 统计获得到五分的电影
rate_five_rdd = mapped_rdd.filter(lambda x: x[1]==5) \
.map(lambda x: ("rating-5", x[0])) \
.distinct() \
.groupByKey() \
.map(lambda x: (x[0], list(x[1])))

# 加入缓存区
rate_five_rdd.cache()
average_rdd.cache()

# 后续无需这个缓存,将其清出内存
mapped_rdd.unpersist()

# 将两个需求结果连接为一份RDD
result_rdd = average_rdd.union(rate_five_rdd)

# 输出结果
result_rdd.saveAsTextFile("result")

# 最终结果为:
# [('m1', 3.0),
# ('m2', 4.0),
# ('m3', 4.2),
# ('m4', 4.5),
# ('m5', 5.0),
# ('rating-5', ['m1', 'm3', 'm2', 'm4', 'm5'])]

还有一个作业,这里用到了csv文件,其实更好的做法是放在下文讲的PySpark SQL中来做,但这里还是先采用RDD算子的方式,以增加熟练度。

需求:我们现在有一份白宫的访客数据。我们想得到的有:

  1. 前十的访客名单,输出为:[visitor] [frequency],其中[visitor]中为(NAMELAST, NAMEFIRST)
  2. 前十的受访者名单,输出为:[visitor] [frequency],其中[visitor]中为(visitee_namelast, visitee_namefirst)
  3. 前十的访客-受访者列表,输出为:[visitor][-][visitee] [frequency]
  4. 丢弃(无效)的数据数,无效的判定标准为:NAMELAST或visitee_namelast为空

做法如下:

1
2
# 下载文件
wget https://obamawhitehouse.archives.gov/sites/default/files/disclosures/whitehouse_waves-2016_12.csv_.zip
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 解压文件
import zipfile
with zipfile.ZipFile("whitehouse_waves-2016_12.csv_.zip", "r") as zip_ref:
zip_ref.extractall()

# 创建SparkContext对象
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("white-house-log").getOrCreate()
sc = spark.sparkContext

# 读取文件
records_raw_rdd = sc.textFile("whitehouse_waves-2016_12.csv")

# 保存首行数据并在rdd中移除首行
header = records_raw_rdd.first()
recordsNoHeader_raw_rdd = records_raw_rdd.filter(lambda row: row != header)

# 由于首行数据存放在driver机器中,其他节点想要使用该数据,
# 则需向其网络请求,因此先将首行数据进行广播,以节约网络IO资源
header_list = header.split(",")
broadcast = sc.broadcast(header_list)

# 声明一个累加器,来记录抛弃的数据数
# 这里为什么不能直接声明一个变量count=0,然后使用count+=1?
# 和上面一样,因为普通变量只会存在driver机器中,
# 导致每次子节点使用,都会先请求该变量,
# 而driver永远都只会将count=0发出。
# 这里由于我们已经去除首行,所以初始化为1
acmlt = sc.accumulator(1)

# 声明一个函数,
# 用来过滤 'NAMELAST' 或 'visitee_namelast'为空的记录
def filter_function(line):
if(line[broadcast.value.index('NAMELAST')] == "" or line[broadcast.value.index('visitee_namelast')] == ""):
global acmlt
acmlt += 1
return False
else:
return True

# 获取过滤后的RDD并进行缓存
filtered_rdd = recordsNoHeader_raw_rdd\
.map(lambda line: line.split(","))\
.filter(filter_function)
filtered_rdd.cache()

# 实现需求一并缓存
vistor_rdd = filtered_rdd\
.map(lambda line: ((line[broadcast.value.index('NAMELAST')].upper(), line[broadcast.value.index('NAMEFIRST')].upper()), 1))\
.reduceByKey(lambda x, y: x + y)\
.sortBy(lambda x: x[1], False)
vistor_rdd.cache()

# 实现需求二并缓存
vistee_rdd = filtered_rdd\
.map(lambda line: ((line[broadcast.value.index('visitee_namelast')].upper(), line[broadcast.value.index('visitee_namefirst')].upper()), 1))\
.reduceByKey(lambda x, y: x + y)\
.sortBy(lambda x: x[1], False)
vistee_rdd.cache()

# 实现需求三并缓存
vistor_to_vistee_rdd = filtered_rdd\
.map(lambda line: ("<"+line[broadcast.value.index('NAMELAST')].upper() + "_" +\
line[broadcast.value.index('NAMEFIRST')].upper() + "> - <" +\
line[broadcast.value.index('visitee_namelast')].upper() + "_" +\
line[broadcast.value.index('visitee_namefirst')].upper()+">", 1))\
.reduceByKey(lambda x, y: x + y)\
.sortBy(lambda x: x[1], False)
vistor_to_vistee_rdd.cache()

PySpark SQL

要进行数据处理、分析,除了上述的文本文档格式外,处理结构化数据SQL是无法绕开的一环,毕竟大部分学大数据的人,本质是要给企业打工的,而在企业中,数据一般都是存储在数据库中。

对于SQL来说,Spark也支持直接对数据库进行访问,并可以SQL语句与代码混合运行,很方便。

SparkSQL中,数据存储在一种叫DataFrame的类中,当然这与Pandas的DataFrame并不是一个东西,不过他们两个的实现十分相似,不同点是Spark的数据表存储在分布式集合之中,而非本机。

当然,对于Jvm的开发,SparkSQL使用的数据抽象则是DataSet,其带有泛型特性,这里我们主要不讨论这个,因为python不支持泛型。

那么DataFrame与之前的RDD有什么区别?主要是在于其存储的数据结构不同,RDD中什么都可以存,而DataFrame则仅存二维结构化数据(其实就是我们常见的表格)。

SparkSession对象

在之前的使用中,我们通过SparkContext来获取初始化,而在SparkSQL中,我们则是用更新的SparkSession来创建初始化对象。

1
2
3
4
5
6
7
8
# 构建核心对象
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()

# 也可以通过其构造之前使用的SparkContext
sc = spark.sparkContext

之前的白宫访客问题,也可以使用DataFrame来处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# create a SparkSession object
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
spark = SparkSession.builder.appName("white-house-log").getOrCreate()

import zipfile
with zipfile.ZipFile("whitehouse_waves-2016_12.csv_.zip", "r") as zip_ref:
zip_ref.extractall()

# Header to be used as metadata
df = spark.read.option("header", "true").csv("whitehouse_waves-2016_12.csv")

df_visitor_n_visitee = df["namelast", "namefirst", "visitee_namelast", "visitee_namefirst"] \
.na.drop(subset=["namelast", "visitee_namelast"]) \
.toDF(*[c.lower() for c in ["namelast", "namefirst", "visitee_namelast", "visitee_namefirst"]]) \
.withColumn('namelast', f.lower(f.col('namelast'))) \
.withColumn('namefirst', f.lower(f.col('namefirst'))) \
.withColumn('visitee_namelast', f.lower(f.col('visitee_namelast'))) \
.withColumn('visitee_namefirst', f.lower(f.col('visitee_namefirst'))) \
.select(f.concat_ws('_',"namelast","namefirst").alias("visitor"),
f.concat_ws('_',"visitee_namelast","visitee_namefirst").alias("visitee")) \
.select("visitor",
"visitee",
f.concat_ws(' - ',"visitor","visitee").alias("visitor-visitee"))

df_visitor_count = df_visitor_n_visitee.groupBy("visitor") \
.count() \
.sort("count", ascending=False) \
.limit(10)

df_visitee_count = df_visitor_n_visitee.groupBy("visitee") \
.count() \
.sort("count", ascending=False) \
.limit(10)

df_visitor_n_visitee_count = df_visitor_n_visitee.groupBy("visitor-visitee") \
.count() \
.sort("count", ascending=False) \
.limit(10)

dropped_count = df.count() - df_visitor_n_visitee.count()

print("The 10 most frequent visitors to the White House are:")
df_visitor_count.show()
print("The 10 most frequently visited people in the White House are:")
df_visitee_count.show()
print("The 10 most frequent visitor-visitee combinations are:")
df_visitor_n_visitee_count.show()
print(f"The number of records dropped is: {dropped_count}")

# The 10 most frequent visitors to the White House are:
# +----------------+-----+
# | visitor|count|
# +----------------+-----+
# | kidwell_lauren| 222|
# | thomas_benjamin| 196|
# | haro_steven| 183|
# |berner_katherine| 177|
# | grant_patrick| 155|
# | haas_jordan| 152|
# | garza_steven| 127|
# | martin_kathryn| 122|
# | cohen_mandy| 122|
# | brown_jennifer| 117|
# +----------------+-----+

# The 10 most frequently visited people in the White House are:
# +--------------------+------+
# | visitee| count|
# +--------------------+------+
# | office_visitors|430881|
# |waves_visitorsoffice| 44129|
# | bryant_ruth| 13970|
# | oneil_olivia| 13155|
# | thompson_jared| 11618|
# | /_potus| 10900|
# | burton_collin| 9672|
# | megan_matthew| 7944|
# | mayerson_asher| 6886|
# | dessources_kalisha| 5289|
# +--------------------+------+

# The 10 most frequent visitor-visitee combinations are:
# +--------------------+-----+
# | visitor-visitee|count|
# +--------------------+-----+
# |kidwell_lauren - ...| 103|
# |haas_jordan - yud...| 90|
# |grant_patrick - y...| 89|
# |thomas_benjamin -...| 89|
# |haro_steven - yud...| 84|
# |cohen_mandy - lam...| 84|
# |berner_katherine ...| 82|
# |roche_shannon - y...| 70|
# |urizar_jennifer -...| 68|
# |martin_kathryn - ...| 61|
# +--------------------+-----+

# The number of records dropped is: 59255

利用不同API风格进行开发

pyspark sql 可以使用两种风格:SQL风格或DSL风格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# HelloWorld
df = spark.read.csv(xxx.csv)
df2 = df.toDF("id", "name", "score")
df2.printSchema()
df2.show()

# 使用SQL风格处理
df2.createTempView("score")

spark.sql("""
SELECT * FROM score WHERE name='aa' LIMIT 5
""").show()

# 使用DSL风格处理,这就和我们熟悉的pandas df很像了
df2.where("name='aa'").limit(5).show()

当然,pyspark的API还有很多很多,使用时查文档即可,这里就不多赘述了。


GraphFrames

简介

除了上述的结构化表数据外,还有一种数据结构会在很多地方用到:图Graph。

Graph简而言之就是用来描述关系的一类数据,如下图:

而Spark原生并未对图数据定义相应的数据抽象,为了对图进行处理,社区大佬们制作了GraphFrames这一插件,以便对图进行计算。

配置

Python 环境使用 GraphFrames

如果pySpark代码里用到了GraphFrame,则需要在提交程序文件时让spark加载相应版本的GraphFrame插件:

1
spark-submit --packages graphframes:graphframes:0.8.0-spark3.0-s_2.12 xxxx.py

Jupyter 环境使用 GraphFrames

很多时候我们希望用jupyter来进行程序的调试,然而GraphFrame并不能安装到python环境中,因此运行import时会提示无法找到GraphFrame包。为了解决这一问题,我们需要在启动pyspark时开启一个包含了GraphFrame的jupyter的服务器。

只需要在环境变量中添加两个变量即可:

1
2
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS=notebook

运行下述指令:

1
pyspark --packages graphframes:graphframes:0.8.0-spark3.0-s_2.12 --jars graphframes-0.8.0-spark3.0-s_2.12.jar

如果不加之前的环境变量,此时会直接进入pyspark环境,然而添加环境变量后,便会开启一个jupyter服务器,按提示连接便可。

这里要提一句,该方法只有在本地环境才能使用,如果你在用Colab一类的远程环境,就没法用GraphFrame了。

Motif Find

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from graphframes import *
from pyspark import *
from pyspark.sql import *
import pyspark.sql.functions as f
import sys

input_path = sys.argv[1]
spark = SparkSession.builder.master("local[*]").appName('fun').getOrCreate()
sc = spark.sparkContext

# read text as rdd
records_raw_rdd = sc.textFile(input_path)

# create edge df from rdd
edges = records_raw_rdd\
.map(lambda x: x.split(","))\
.toDF(['src', 'dst', 'type'])

# create vertices df from rdd
vertices = records_raw_rdd\
.map(lambda x: x.split(","))\
.flatMap(lambda x: [x[0], x[1]])\
.distinct()\
.map(lambda x: [x])\
.toDF(['id'])

# motif finding
graph = GraphFrame(vertices, edges)
motifs = graph.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(a)")
motifs[(motifs.a<motifs.b) & (motifs.b<motifs.c)].show()

Amazon Athena

简介

没啥高深的,就是一个文件服务器,用户可以用SQL语句访问。

使用

买了参考文档即可,要花钱,本人就没有很深入的研究。


后记


<br>

**未完待续...**