嗯,记录大数据基本概念,以及核心软件Hadoop, Spark核心概念。

作为程序员入门大数据的记录。(写了近一天……我知道为啥现在自己写技术文章很少了)

谷歌大数据三大论文

也是大数据三大核心技术,该技术原本的设计目的是:Google在整个互联网上提供实时的搜索结果,现成为大数据的基石。

– MapReduce 超大集群上的简单数据处理

它是一个编程模型,用来处理和产生大数据集的相关实现。

用户指定一个Map函数来处理一个key/value对,从而产生中间的key/value对的集合,然后指定一个reduce函数合并其中所有具有相同中间key的中间value。

– GFS google file system可扩展的分布式文件系统

用于大型的,分布式的,大量数据进行访问的文件系统。它包括如下特点:

错误不视为异常,视为常见情况处理

对巨大文件可以进行高效管理,小型文件也可以常规处理

文件更新是通过添加新数据完成的,而不修改数据,不存在文件随机操作。一旦写完,文件即仅可读

自定义的读写操作

– BigTable 一套压缩高效高伸缩的大数据存储系统

底层每个table都是一个多维稀疏图,由行列组成

Table会被分割为tablets,一块100-200MB,每个机器存储100个左右的tablets

使用了高强度的压缩技术,压缩比率不是最高,但处理速度快的Zippy压缩算法

Hadoop

模块

Hadoop包括四个部分: Common, Yarn, HDFS和MapReduce

  • Common是个基础模块,用来支持其他模块的,基本不用管。

  • Yarn是一个进行作业任务调度和资源管理的框架。

  • HDFS是分布式文件系统,类似FAT32,NTFS,是一种磁盘文件格式,底层的东西。它把硬盘分割为64MB一块的硬盘块,文件存储时也会被分割为64MB一块一块存储。

  • Hive和Hbase的数据存储在Hadoop的HDFS上,它为Hbase,Hive提供高可靠的底层存储支持

HDFS分布式原理

  • HDFS配置时要求设置一个元数据节点(NameNode),其他机器就视为数据节点(DataNode)。

  • 元数据节点里面记录了一些数据,这些数据就类似C++的虚函数指针,记录了整个HDFS集群中有哪些文件,文件大小和文件位置(这个文件哪一部分记录在哪个DataNode的哪个块)

  • 数据节点才是真正的数据块存储位置。

  • 数据节点会定期的向元数据节点通知自己的数据块状态。

MapReduce介绍

我在写后面的spark shuffle时不得不回来说下map-reduce。首先这个概念是google三大论文之一提出的,挺难理解的但又很重要,我英语不好,难以理解’map’和’reduce’的精确定义,但我有自己的一套理解方式。那就是:

  • 首先大数据的操作,都是分布式的。即不在一个物理机上的,包括数据的存储,以及数据的计算。

  • 那么假设我们要数一个国家的图书馆有多少本书,怎么办?我们很容易想到的方法是。

  • step1: 让每个图书馆的管理员数自己图书馆里的书。 这就是map

  • step2: 把每个图书馆里的书总数量汇报上来,做加总。 这就是reduce

然后我没啥可说的,最开始转行大数据的时候被这玩意儿吓的不轻,感觉高大上的很…………而且代码里成吨的map(),reduceByKey(),酷炫无比的样子,就是纸老虎。

值得注意的是, hadoop的MR(MapReduce)过程中产生的中间数据(例如每个图书馆里书的数量),都被写入硬盘,所以效率很低;而spark出现的一个核心原因就是,它将中间数据写到内存中,于是就大幅度提升了效率。

HBase

  • 就是Hadoop Database,就是Hadoop数据库。主要特点:

适合非结构化数据存储

基于列存储,而不是传统的基于行存储

FK土话说明:它就是一个一列一列存储的一种数据库,没有索引,但每行有一个row_key可理解类似于sql的索引。它列存取比较快,行scan代价很大,但根据row_key进行select效率还不错。

  • 它是基于Google的Bigtable的开源实现。

Google bigtable使用GFS做文件系统,Hbase使用HDFS做文件系统。

  • 和其他模块的关系

Hadoop MapReduce为HBase提供高性能的计算能力

Hadoop HDFS为HBase提供可靠的底层存储支持

Zookeeper为HBase提供稳定服务和FailOver机制

Pig和Hive为HBase提供了高层语言支持,使其使用简单

Sqoop则为HBase提供了关系型数据库数据导入功能,使得传统数据库数据和HBase之间的迁移变的简单方便

FK土话说明:一般我们会用Sqoop进行关系数据库的数据导入。在HBase表设计时候要稍微注意有所不同,到底是使用宽表,还是关联表,会和常规数据库设计思路有所不同,时刻注意它是列存储的。它的基本语法和SQL类似,不会感到困难。

Hive 和 HBase 的区别

  • Hive是Hadoop的数据仓库,不是数据库。只有表单结构(逻辑表),里面不做数据存储,更类似一个编程接口

  • HBase利用HDFS进行存储,自身保存物理表,适合存放非结构化的数据

  • Hive内它将结构化的数据文件映射为一个数据库表,提供SQL查询功能,最终将SQL语句转换为MapReduce任务运行,MapReduce是基于行处理的

  • HBase处理数据是基于列,而非基于行的,适合海量数据的随机访问

  • Hive使用Hadoop来分析处理数据,而Hadoop是批处理系统,不保证处理的延迟

  • HBase则接近实时系统,支持实时查询

  • FK土话说明:首先,HDFS我们理解成Windows下的磁盘分区格式就好了,类似FAT32,NTFS这些。然后HBase我们认为是一种奇怪的SQL数据库就好了,里面的确有具体数据的。Hive我们理解成一个翻译器好了,就是把SQL语句翻译成map-reduce语句,自己带了一个小的表,但不是数据,数据还是在HDFS上面的。如果我们要全部数据的清洗整理扫描,那就是用Hive,它执行map-reduce进行全部数据的修改;如果我们要根据row-key( SQL中每行的唯一ID )去找某一条或部分数据,那就是用HBase,然后HBase从Hadoop的HDFS里面找数据。

大数据数据流

FK土话说明:首先我要先说下大数据干点啥,大数据就是把数据进行汇集起来,做一些清理去杂质,汇总计算,得到一些分析的结果,例如我们去年总计多少访问量,总收入啦,预测明年的访问量啦这些,再把这些数据丢给前端做成图表。OK,这就是大数据主要工作。

所以,我们会发现大数据核心工作就几个:数据拉取,数据清洗汇总分析(称之为ETL),数据展示(称之为可视化)。

因为处理方式不同,大数据分为及时的,和历史的处理。

及时的意思就是说,根据近期数据马上给出现反馈。例如工厂监视机器状态,要实时获取机器状态然后做分析预测,判断这机器三分钟后可能就要坏了。此时得到的输入数据是近期的,反馈也要迅速,不能说等几个小时后才给预测吧,那时候事情可能已经发生了。

而历史的处理一般是事后处理,即上面的例子,去年总收入怎么样,明年我们大约能收入多少,这些参考数据是之前的,允许间隔几小时几天甚至几个月,计算过程也可以慢慢来,处理几天都可以等的起的。

因为大数据的数据量量很大,所以及时的和历史的数据处理方式有许多不同。部分情况下,及时的还会被分为高实时(以毫秒级处理和响应)和准实时的(允许等待几分钟产生结果)。

好,接下来我们说大数据的数据流向。

首先数据源可能是日志Log,Http/Https消息,常规数据库,分别可以spark, kafka, sqoop/spark 等方式拉到大数据存储。

大数据内一般用HDFS存储,如果数据调取的及时性要求比较高,则存储到HBase里,如果数据存储量大,则存储Hive/HDFS里,特别及时的处理可以用Spark stream或者Flink之类的做处理后,部分进行HBase存储,部分输出。

一般来说,数据进到HDFS里属于原始数据,这些数据需要被清洗(去除垃圾数据,重新调整数据结构),处理完的数据存储到Hive中,可以称为数据仓库。数据清洗处理一般称为ETL,可以用python或者scala等配合spark开发为jar包作为定时任务让spark执行。

Hive中的数据,通过针对业务的聚合统计之后,得到的新的数据通常存储到HBase或者SQL里,等待外界调用进行可视化。

然后用thirft等机制开发一些API(我个人喜欢用Golang RESTful框架或者Python Djiango, Flask做)

API提供给前端可视化展示(我个人喜欢JS开发,使用HighChart或者E-Chart)

Spark

基本概念

Spark最开始就是基于内存的Map reduce。它分为

  • Core 基本功能模块
  • Spark SQL 类似SQL的语法用以封装MapReduce
  • Spark streaming 为提供流式计算
  • MLlib 机器学习库
  • GraphX 图计算使用,处理例如关系链之类的事务

RDD,DataFrame, DataSet的关系

我们根据上面的了解会发现,从数据源的拉取,数据清洗计算,到数据存储这整个过程中spark是非常活跃的。而进行Spark开发过程,我们会频繁接触以下数据结构 RDD, DataFrame, DataSet。

我们的数据清洗过程,基本上都是将A这个RDD加载进来,然后DataFrame转换,再进行一些聚合清理转换为B这个RDD,然后存储起来。它是数据清洗操作的核心逻辑对象,所以我们非常有必要知道这几个的区别。

首先,RDD的英文我不想说了,因为它让人更混乱。我只强调几点:

  • 1:它是分布式的,即我们在对一个RDD进行操作的时候,实际上是发出一个指令,对多个物理机器上的许多数据块进行操作,一个RDD对象对应的是很多机器上的数据。这点必须清楚,因为之后性能开销优化,减少shuffle全靠这个了。

  • 2:它是一种java对象,即它的任意一条数据格式我们可以想象成一个json,我们把这些json文件分割到很多机器中存储了。这些json格式的对象组合我们称为一个RDD。

  • 3:RDD的一条数据是一个json,虽然json可以有属性名,但spark并不知道,它只认为自己存了一个list,具体object里面是什么,spark不清楚,所以在一些数据变换操作的时候,会发现我们不能用SQL类似的”select a where param = 3”这样的方式进行处理,因为spark不知道object里面有没有param这个属性的存在。

  • 4:RDD是不可写的。意思就是你对RDD做的修改,其实不是修改了RDD数据,而是创建了一个新的修改后的RDD,原本的RDD还是没变的,所以内存开销会很大,在性能优化时候需要考虑到这一点。

  • 然后,DataFrame也是个分布的东西,但是它比较容易理解,认为是数据库或者Excel表就好了。

    • 1:它和RDD的区别是,它有schema,即excel的列名,我们就可以通过SQL类似的”select a where param = 3”来处理了,因为spark知道param列的存在。

    当然,DataFrame也是分布式的

    先说这两者,其实就我当前的开发情况看来,大部分数据都是结构化的,而且DataFrame提供的操作功能都多于RDD,所以我个人非常不喜欢用RDD,我并不想讲任何RDD的好话。不过我们要知道DataFrame底层是RDD实现的,另外它们可以很容易的互相转换也就差不多了。

    只有在极其特殊的情况下,(例如读了一个小说文本,这个文本在Spark中就是RDD,因为默认是没有列名这个概念存在的),只有这个情况才用RDD。(但其实,我还是可以用并会用DataFrame做啊啊啊啊,列名叫 _1, _2 不就行了。。。都这么干的)

    最后,DataSet,这东西也让人比较迷惑,首先它是新出来的,目的似乎是想把RDD和DataFrame之间的混乱不清的情况统一化,简单来说,以后都用DataSet,也别用什么RDD什么DataFrame了。但现在还没有彻底完成这个更替,所以这货的出现只让人更加迷惑。

    • 首先,dataSet也是分布式的。

    • 它和DataFrame基本一致,两者底层都是RDD。

    • 它就是个DataFrame加强版。一旦是无类型的DataSet(我们表示为DataSet[Row])它就等同于DataFrame。一种是指定类型的DataSet(我们表示为DataSet[T]).

    就我现在个人看来,DataSet非常舒服,有编译期类型安全检查,有Spark SQL支持,有大量API接口,有优化机制,我会尽可能考虑丢弃RDD和DataFrame。

    但考虑到历史原因,很多代码还是使用RDD和DataFrame做的,所以我这里顺道备份下三种格式的转换代码:

    • DataFrame -> RDD

    var rdd = df.rdd

    • DataSet -> RDD

    var rdd = ds.rdd

    • RDD -> DataFrame
        import spark.implicits._
        val df = rdd.map {line=>
          (line._1,line._2)
        }.toDF("col1","col2") // 指定Schema
    
    • RDD -> DataSet
        import spark.implicits._
        case class FKObjectName(col1:String,col2:Int)extends Serializable //定义字段名和类型
        val ds = rdd.map {line=>
          FKObjectName(line._1,line._2)
        }.toDS
    
    • DataSet -> DataFrame
        import spark.implicits._
        val df = ds.toDF
    
    • DataFrame -> DataSet
        import spark.implicits._
        case class FKObjectName(col1:String,col2:Int)extends Serializable //定义字段名和类型
        val ds = df.as[FKObjectName]
    

    Spark的惰性机制

    我们上面提到了,ETL(数据清洗整理统计)的过程就是加载,数据计算转换,写入这个过程,所以它通常以链式编码开发。例如经常 A.load().add().sum().write() 这样的代码。

    但和常规代码开发不同,它的代码执行过程并不是逐步执行的,传统代码可能是

    A = [1, 2]
    print(A.add())  // 打印 [2,3]
    print(A.sum())  // 打印 5
    

    但spark中对DataFrame和RDD的操作不是这样,它将操作函数分为两类

    • 一种称为Transform函数,输入的是RDD,返回的也是RDD。例如map(),groupby()这种。这种函数不会立即执行,只会被记录,等待Action函数执行时再进行真正处理。

    • 一种称为Action函数,输入的是RDD,但是返回的则不是RDD。例如count()输出的是一个值,collect()show()等没有实际返回值的函数,这种函数才会真正的引起操作的执行。

    这种等待Action函数才真正执行的延迟操作,我们称为“惰性机制”。

    我们继续看上面的例子,流程就成了

    A = [1, 2]
    A.add()  // 此时A没变,因为输出的还是一个RDD
    A.add()  // 此时A依然没变,因为输出的还是一个RDD
    print(A) // 依然输出[1,2]
    A.sum()  // sum操作才真正启动了操作的执行
    print(A) // 这时候A就是[3,4]
    

    这样做的原因是,数据是分散的,如果能批量进行执行,会比多次单次执行更有效率,而且其实spark代码在底层会有优化器,它会对我们的操作代码进行分析优化(然而我没看源码,不知道优化机制,但大致可以脑补上面代码的两次add()就可能被优化成add(2),这样一次操作就OK了)。

    分析优化的另外一个目的是,解决数据的依赖顺序。因为大数据的数据处理和我们平时的简单逻辑不同,不是毫秒级处理完成的。一行代码可能要被执行数分钟甚至数小时,如果源数据之间的依赖关系没有理清并做优化,那么在实际处理时会等待过长的时间。

    被分析后的操作会被组成一个DAG(有向无环图),这个图就一个特点:

    • 不可能回头。即数据指向是单向的,不可能回环。一个有向图,从任一节点出去经历任意边,都不可能回到该节点。

    DAG就是spark底层真正的数据处理流,而不一定是我们的编码表示的顺序。

    Spark的宽依赖,窄依赖和shuffle

    我们刚说了,我们的代码被Spark解析后生成DAG,然后我们的业务需求会按照DAG拆成不同的小任务(称之为stage)分发下去给各个节点先做map处理,然后统计起来到一个或多个中心机器做reduce,然后再将数据返回给我们。

    其中各个子节点在map完毕后,生成的那些结果数据,有可能仅被一个RDD使用,也有可能被多个RDD使用。

    如果map后的数据仅被一个子RDD使用,那么我们称为窄依赖。如果一个节点map后的数据被多个子RDD使用,我们就称为宽依赖。

    而宽依赖情况下,为了下一步RDD的计算,我们就必须将节点先前的父RDD数据进行拆分,再将不同的块分发到子RDD处理的节点机器上。

    这个从各个节点将数据拆分,再进行分发的过程就叫shuffle。

    因为shuffle过程有网络IO开销(数据从子节点到中心节点),这些shuffle数据会写入磁盘(为了安全回滚)所以会有硬盘IO开销,所以shuffle性能耗损很大,一般要尽量避免或减少。

    窄依赖不会产生shuffle,因为父RDD无需拆分给不同的node.


    今天就先到这里吧

    作者的图片

    FreeKnight.Wong

    Ever tried, ever failed, fail better.

    Game software engineer

    Makati