锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

数据分析大数据面试题大杂烩02

时间:2023-04-27 01:07:01 pb2081转速变送器

Map端将处理输入数据并产生中间结果,该中间结果将写盘,每个磁盘Map输出将首先写入内存缓冲区。当写入的数据达到设定阈值时,系统将启动线程将缓冲区的数据写入磁盘。这个过程叫做spill(spill在写入之前,将进行二次排序,首先根据数据所属partition进行排序,然后每个partition再次按下中数据key来排序 . partition目的是将记录划分为不同的部分Reducer上去,以期实现负载平衡,未来Reducer就会根据partition读取相应的数据 . 接着运行combiner(如果设置了),combiner本质也是一个Reducer,目的是先处理要写在磁盘上的文件,这样写在磁盘上的数据量就会减少 . 最后,将数据写到本地磁盘产生的数据spill文件(spill文件保存在{mapred.local.dir}在指定的目录中,Map任务结束后将被删除)
Reducer复制数据时,只复制与自己对应的数据partition数据可以在中间 . 每个Reducer会处理一个或多个partition,接下来就是sort也成为了阶段merge因为这个阶段的主要工作是实施并购排序 . 从Map端拷贝到Reduce端数据有序(指阶段),因此非常适合并排序 . 最终在Reduce作为一个大文件,端生成一个大文件Reduce的输入 .
最后是Reduce过程,产生最终的输出结果将写到HDFS上

partition数量和返回值(决定去哪里)reducer)一个概念
默认分区数量key.hash%reducetask的个数

reduceTask的数量由job设置在提交时numreducretask决定 .

0的话就没有reducer


1map数量通常是由hadoop集群的DFS确定块的大小,即输入文件的总块数,正常map数量的并行规模大致是每个数量Node是10~100个,对于CPU可设置消耗较小的作业Map数量在300左右,但是因为hadoop初始化时没有任务需要一定的时间,因此,每一种情况都是合理的map执行时间至少超过1分钟 . 具体的数据分片是这样的,InputFormat默认情况下会有依据hadoop集群的DFS块大小分片,每个分片由一个分片组成map处理任务,当然,用户仍然可以通过参数mapred.min.split.size参数在操作提交客户端自定义 . 另一个重要参数是mapred.map.tasks,该参数设置map数量仅仅是一个提示,只有当InputFormat 决定了map任务数比mapred.map.tasks值小时起作用 .
2Map也可以使用任务的数量JobConf 的conf.setNumMapTasks(int num)手动设置方法 . 该方法可用于增加map任务数量,但不能设定的任务数量小于Hadoop通过分割输入数据获得的值 . 当然,为了提高集群的并发效率,可以设置默认情况map数量,当用户的map相对交通大学的默认值可以在数量较小或比自动分割值小的时间内使用,从而改善整体hadoop集群的效率 .
reduce在运行过程中,往往需要从相关的相关方面进行操作map端复制数据到reduce因此,与节点处理相比,map任务 . reduce节点资源相对缺乏,运行相对缓慢,正确的reduce任务数应为0.95或者1.75 (节点数 ×mapred.tasktracker.tasks.maximum参数值) . 如果任务数是节点数的0.95倍,那么一切reduce任务能够在 map任务输出传输结束后,同时开始运行 . 若任务数为节点数的1.75倍,所以高速节点将完成他们的第一批reduce计算任务后,开始计算第二批 reduce任务更有利于负载平衡 . 同时要注意增加reduce虽然数量会增加系统的资源成本,但可以改善负载均衡,减少任务失败的负面影响 . 同样,Reduce任务也可以和 map任务一样,通过设定JobConf 的conf.setNumReduceTasks(int num)增加任务数量的方法 .
reduce数量为0
有些作业不需要合同处理,可以设置reduce在这种情况下,用户的运行速度相对较高,map输出将直接写入 SetOutputPath(path)设置的输出目录不作为中间结果写在本地 . 同时Hadoop框架在写入文件系统之前没有排序 .

首先map task会从本地文件系统读取数据,转换成key-value正式键值对集,使用hadoop内置数据类型,如Text,Longwritable等 .
集合输入键值mapper进行业务处理过程,将其转化为所需的key-value再输出 .
之后会有一个partition分区操作,默认使用hashpartitioner,可以重写hashpartitioner的getPartition方法来自定义分区规则 .
之后会对key进行sort排序,grouping分组操作将相同key的value合并分组输出,在这里可以使用自定义的数据类型,重写WritableComparator的Comparator该方法来自定义排序规则,重写RawComparator的compara方法来自定义分组规则 .
然后做一个combiner合同操作是本地的reduce减少预处理shuffle,reducer的工作量 .
Reduce task会用网络收集每个数据reduce处理,最后保存或显示数据,结束整个过程job .

InputFormat会在map操作前对数据进行预处理 .
1.是getSplits,返回的是InputSplit数组,数据Split分片,每片交给map操作一次 .
2.是getRecordReader,返回的是RecordReader对象,对每一个Split分片转换为key-value键值向格式传递map,常用的InputFormat是TextInputFormat,使用的是LineRecordReader以行偏移量为键,转换每个分片的键值对,行内容作为值 .
自定义继承InputFormat接口,重写createRecordReader和isSplitable方法在createRecordReader可自定义分隔符 .

两者都使用mr并行计算模型,hadoop一个作业叫job,job里面分为map task和reduce task,每个task都是在自己的过程中运行的,当task当过程结束时,过程也会结束 .
Spark用户提交的任务称为application,一个application对应一个SparkContext,app中存在多个job,每触发一个action操作就会产生一个job(线程) .
这些job可可以并行或串行执行,每一个都可以并行执行job有多个stage,stage是shuffle过程中DAGSchaduler通过RDD依赖关系的划分job来,每一个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor生命周期是和application即使没有,也一样job操作也存在,所以task读取内存可以快速启动计算 .
Hadoop的job只有map和reduce操作,缺乏表达能力,mr读写会在这个过程中重复hdfs,造成大量的io操作,多个job需要管理自己的关系 .
Spark在内存中进行迭代计算,API中提供了大量的RDD操作join,groupby等,通过DAG好的容错图可以实现 .

为什么要用flume导入hdfs,hdfs架构是什么?
Flume数据可以实时导入到hdfs中,当hdfs当上述文件达到指定大小时,将形成文件,或在指定时间内形成文件 .
存储文件datanode上的,namenode存储着datanode元数据信息,而namenode元数据信息存在于内存中,因此当文件切片很小或很多时候会卡住 .

比如大部分作业都完成了,但总有几个reduce一直在运行 .
因为这些reduce处理的数据远远大于其他数据reduce,对键值对任务划分不均匀可能导致数据倾斜 .
在分区时,可以重新定义分区规则value数据很多的key可拆分 均匀分散或在map端的combiner数据预处理的操作 .

简单说一下hadoop和spark的shuffle过程
Hadoop:map端保存分片数据,通过网络收集reduce端 .
Spark:spark的shuffles是在DAGSchedular划分Stage发生时,TaskSchedular要分发task任务到各个worker的executor . 减少shuffle能提高性能 .

存的是和hdfs,hive是逻辑数据仓库,实际操作是hdfs上的文件,HQL就是用SQL语法来写的MR程序 .

Hive与关系数据库的关系
没有关系,hive是数据仓库,不能像数据库一样实时进行CRUD操作 .
这是一个写入多读的操作,可以看作是ETL的工具 .

Flume什么是工作机制?
核心概念是agent,里面包括source,channel和sink三个组件 .
Source日志采集在日志采集节点运行,然后临时存储channel中,sink负责将channel数据发送到目的地 .
只有发送成功channel中的数据才会被删除 .
首先书写flume配置文件,定义agent source channel和sink然后组装执行flume-ng命令 .

Hbase行键列族的概念、物理模型表设计原则
行键:是hbase表自带,每个行键对应一个数据 .
列族:是创建表时指定的,为列的集合,每个列族作为一个文件单独存储,存储的数据都是字节数组,其中数据可以有很多,通过时间戳来区分 .
物理模型:整体hbase表会分为多个region,每个region将行键的起点记录在不同的节点上,并在查询时对每个节点进行并行查询region很大时使用.META表存各个region的起始点,-ROOT又可以存储.META的起始点 . 
Rowkey的设计原则:各个列族数据平衡,长度原则 相邻原则,创建表的时候设置表放入regionserver缓存中,避免自动增长和时间,使用字节数组代替string,最大长度64kb,最好16字节以内,按天分表,两个字节散列,四个字节存储时分毫秒 . 
列族的设计原则:尽可能少(按照列族进行存储,按照region进行读取,不必要的io操作),经常和不经常使用的两类数据放入不同列族中,列族名字尽可能短 . 

请列出正常的hadoop集群中hadoop都分别需要启动 哪些进程,他们的作用分别都是什么,请尽量列的详细一些 . 
namenode:负责管理hdfs中文件块的元数据,响应客户端请求,管理datanode上文件block的均衡,维持副本数量
Secondname:主要负责做checkpoint操作;也可以做冷备(对一定范围内数据做快照性备份)
Datanode:存储数据块,负责客户端对数据块的io请求
Jobtracker :管理任务,并将任务分配给 tasktracker . 
Tasktracker:执行JobTracker分配的任务 . 
Resourcemanager Nodemanager Journalnode Zookeeper Zkfc

读:
找到要读数据的region所在的RegionServer,然后按照以下顺序进行读取:先去BlockCache读取,若BlockCache没有,则到Memstore读取,若Memstore中没有,则到HFile中去读 . 
写:
找到要写数据的region所在的RegionServer,然后先将数据写到WAL(Write-Ahead Logging,预写日志系统)中,然后再将数据写到Memstore等待刷新,回复客户端写入完成 . 

HBase的特点是什么?
(1)hbase是一个分布式的基于列式存储的数据库,基于hadoop的HDFS存储,zookeeper进行管理 . 
(2)hbase适合存储半结构化或非结构化数据,对于数据结构字段不够确定或者杂乱无章很难按一个概念去抽取的数据 . 
(3)hbase为null的记录不会被存储 . 
(4)基于的表包括rowkey,时间戳和列族 . 新写入数据时,时间戳更新,同时可以查询到以前的版本 . 
(5)hbase是主从结构 . Hmaster作为主节点,hregionserver作为从节点 . 

请描述如何解决Hbase中region太小和region太大带来的结果 . 
Region过大会发生多次compaction,将数据读一遍并写一遍到hdfs上,占用io,region过小会造成多次split,region会下线,影响访问服务


如何确定map个数


搭建伪分布式hadoop开发环境
1 Linux环境
2 Jdk安装
3  关闭防火墙
4 配置hadoop
5 格式化namenode(不需要重复)
6 启动hdfs 守护进程
7 Web 访问界面  50070
8 配置YARN任务调度
9 启动hdfs YARA进程
10 检查YARN状态
11 向YARN提交任务

搭建hadoop完全分布式简单步骤
1 虚拟机装备
2 网络配置完好
3 JDK安装
4 Ssh 配置
5 同步服务器时间
6 Hadoop集群配置
A:环境变量
B:hadoop文件配置
7 启动hadoop集群
8 Web端口访问 . 

 hive的安装和使用


提前根据查询结果来组织数据 . 每种业务都是不同的,要想查询得快,就要提前分析场景,在数据入库时,就提前根据查询结果来组织数据 . 这也是微博等应用的做法,根据显示结果提前存储数据 . 

分钟级

Spark生态圈也称为BDAS(伯克利数据分析栈),是伯克利APMLab实验室打造的,力图在算法(Algorithms) 机器(Machines) 人(People)之间通过大规模集成来展现大数据应用的一个平台 . 伯克利AMPLab运用大数据 云计算 通信等各种资源以及各种灵活的技术方案,对海量不透明的数据进行甄别并转化为有用的信息,以供人们更好的理解世界 . 该生态圈已经涉及到机器学习 数据挖掘 数据库 信息检索 自然语言处理和语音识别等多个领域 . 
Spark生态圈以Spark Core为核心,从HDFS Amazon S3和HBase等持久层读取数据,以MESS YARN和自身携带的Standalone为资源管理器调度Job完成Spark应用程序的计算 .  这些应用程序可以来自于不同的组件,如Spark Shell/Spark Submit的批处理 Spark Streaming的实时处理应用 Spark SQL的即席查询 BlinkDB的权衡查询 MLlib/MLbase的机器学习 GraphX的图处理和SparkR的数学计算等 . 


Spark Core
Spark内核架构:
l  提供了有向无环图(DAG)的分布式并行计算框架,并提供Cache机制来支持多次迭代计算或者数据共享,大大减少迭代计算之间读取数据局的开销,这对于需要进行多次迭代的数据挖掘和分析性能有很大提升
2  在Spark中引入了RDD (Resilient Distributed Dataset) 的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据"血统"对它们进行重建,保证了数据的高容错性;
3  移动计算而非移动数据,RDD Partition可以就近读取分布式文件系统中的数据块到各个节点内存中进行计算
4  使用多线程池模型来减少task启动开稍
5  采用容错的 高可伸缩性的akka作为通讯框架

SparkStreaming
SparkStreaming是一个对实时数据流进行高通量 容错处理的流式处理系统,可以对多种数据源(如Kdfka Flume Twitter Zero和TCP 套接字)进行类似Map Reduce和Join等复杂操作,并将结果保存到外部文件系统 数据库或应用到实时仪表盘 . 
Spark Streaming构架:
l计算流程:Spark Streaming是将流式计算分解成一系列短小的批处理作业 . 这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中 . 整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备 . 下图显示了Spark Streaming的整个流程 . 

容错性:对于流式计算来说,容错性至关重要 . 首先我们要明确一下Spark中RDD的容错机制 . 每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的 .   
对于Spark Streaming来说,其RDD的传承关系如下图所示,图中的每一个椭圆形表示一个RDD,椭圆形中的每个圆形代表一个RDD中的一个Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每一个Batch Size所产生的中间结果RDD . 我们可以看到图中的每一个RDD都是通过lineage相连接的,由于Spark Streaming输入数据可以来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每一个数据流拷贝两份到其他的机器)都能保证容错性,所以RDD中任意的Partition出错,都可以并行地在其他机器上将缺失的Partition计算出来 . 这个容错恢复方式比连续计算模型(如Storm)的效率更高 . 

实时性:对于实时性的讨论,会牵涉到流式处理框架的应用场景 . Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程 . 对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景 . 
扩展性与吞吐量:Spark目前在EC2上已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm高2~5倍,Berkeley利用WordCount和Grep两个用例所做的测试,在Grep这个测试中,Spark Streaming中的每个节点的吞吐量是670k records/s,而Storm是115k records/s . 
2010 ,Spark正式对外开源

Spark SQL
Shark是SparkSQL的前身,那个时候Hive可以说是SQL on Hadoop的唯一选择,负责将SQL编译成可扩展的MapReduce作业,鉴于Hive的性能以及与Spark的兼容,Shark项目由此而生 . 
Shark即Hive on Spark,本质上是通过Hive的HQL解析,把HQL翻译成Spark上的RDD操作,然后通过Hive的metadata获取数据库里的表信息,实际HDFS上的数据和文件,会由Shark获取并放到Spark上运算 . Shark的最大特性就是快和与Hive的完全兼容,且可以在shell模式下使用rdd2sql()这样的API,把HQL得到的结果集,继续在scala环境下运算,支持自己编写简单的机器学习或简单分析处理函数,对HQL结果进一步分析计算 . 
在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上 . Databricks表示,Spark SQL将涵盖Shark的所有特性,用户可以从Shark 0.9进行无缝的升级 . 在会议上,Databricks表示,Shark更多是对Hive的改造,替换了Hive的物理执行引擎,因此会有一个很快的速度 . 然而,不容忽视的是,Shark继承了大量的Hive代码,因此给优化和维护带来了大量的麻烦 . 随着性能优化和先进分析整合的进一步加深,基于MapReduce设计的部分无疑成为了整个项目的瓶颈 . 因此,为了更好的发展,给用户提供一个更好的体验,Databricks宣布终止Shark项目,从而将更多的精力放到Spark SQL上 . 
Spark SQL允许开发人员直接处理RDD,同时也可查询例如在 Apache Hive上存在的外部数据 . Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析 . 除了Spark SQL外,Michael还谈到Catalyst优化框架,它允许Spark SQL自动修改查询方案,使SQL更有效地执行 . 
还有Shark的作者是来自中国的博士生辛湜(Reynold Xin),也是Spark的核心成员,具体信息可以看他的专访 http://www.csdn.net/article/2013-04-26/2815057-Spark-Reynold
Spark SQL的特点:
l引入了新的RDD类型SchemaRDD,可以象传统数据库定义表一样来定义SchemaRDD,SchemaRDD由定义了列数据类型的行对象构成 . SchemaRDD可以从RDD转换过来,也可以从Parquet文件读入,也可以使用HiveQL从Hive中获取 . 
2内嵌了Catalyst查询优化框架,在把SQL解析成逻辑执行计划之后,利用Catalyst包里的一些类和接口,执行了一些简单的执行计划优化,最后变成RDD的计算
3在应用程序中可以混合使用不同来源的数据,如可以将来自HiveQL的数据和来自SQL的数据进行Join操作 . 

Shark的出现使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高,  那么,摆脱了Hive的限制,SparkSQL的性能又有怎么样的表现呢?虽然没有Shark相对于Hive那样瞩目地性能提升,但也表现得非常优异 . 
为什么sparkSQL的性能会得到怎么大的提升呢?主要sparkSQL在下面几点做了优化:
1. 内存列存储(In-Memory Columnar Storage) sparkSQL的表数据在内存中存储不是采用原生态的JVM对象存储方式,而是采用内存列存储;
2. 字节码生成技术(Bytecode Generation) Spark1.1.0在Catalyst模块的expressions增加了codegen模块,使用动态字节码生成技术,对匹配的表达式采用特定的代码动态编译 . 另外对SQL表达式都作了GC优化, GC优化的实现主要还是依靠Scala2.10的运行时放射机制(runtime reflection);
3. Scala代码优化 SparkSQL在使用Scala编写代码的时候,尽量避免低效的 容易GC的代码

BlinkDB
BlinkDB 是一个用于在海量数据上运行交互式 SQL 查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内 . 为了达到这个目标,BlinkDB 使用两个核心思想:
l一个自适应优化框架,从原始数据随着时间的推移建立并维护一组多锐单本;
2一个动态样本选择策略,选择一个适当大小的示例基于查询的准确性和(或)响应时间需求 . 
和传统关系型数据库不同,BlinkDB是一个很有意思的交互式查询系统,就像一个跷跷板,用户需要在查询精度和查询时间上做一权衡;如果用户想更快地获取查询结果,那么将牺牲查询结果的精度;同样的,用户如果想获取更高精度的查询结果,就需要牺牲查询响应时间 . 用户可以在查询的时候定义一个失误边界 . 


MLBase/MLlib
MLBase是Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLbase . MLBase分为四部分:MLlib MLI ML Optimizer和MLRuntime . 
l  ML Optimizer会选择它认为最适合的已经在内部实现好了的机器学习算法和相关参数,来处理用户输入的数据,并返回模型或别的帮助分析的结果;
2  MLI 是一个进行特征抽取和高级ML编程抽象的算法实现的API或平台;
3  MLlib是Spark实现一些常见的机器学习算法和实用程序,包括分类 回归 聚类 协同过滤 降维以及底层优化,该算法可以进行可扩充; MLRuntime 基于Spark计算框架,将Spark的分布式计算应用到机器学习领域 . 
总的来说,MLBase的核心是他的优化器,把声明式的Task转化成复杂的学习计划,产出最优的模型和计算结果 . 与其他机器学习Weka和Mahout不同的是:
l  MLBase是分布式的,Weka是一个单机的系统;
2  MLBase是自动化的,Weka和Mahout都需要使用者具备机器学习技能,来选择自己想要的算法和参数来做处理;
3  MLBase提供了不同抽象程度的接口,让算法可以扩充
4  MLBase基于Spark这个平台

GraphX
GraphX是Spark中用于图(e.g., Web-Graphs and Social Networks)和图并行计算(e.g., PageRank and Collaborative Filtering)的API,可以认为是GraphLab(C++)和Pregel(C++)在Spark(Scala)上的重写及优化,跟其他分布式图计算框架相比,GraphX最大的贡献是,在Spark之上提供一栈式数据解决方案,可以方便且高效地完成图计算的一整套流水作业 . GraphX最先是伯克利AMPLAB的一个分布式图计算框架项目,后来整合到Spark中成为一个核心组件 . 
GraphX的核心抽象是Resilient Distributed Property Graph,一种点和边都带属性的有向多重图 . 它扩展了Spark RDD的抽象,有Table和Graph两种视图,而只需要一份物理存储 . 两种视图都有自己独有的操作符,从而获得了灵活操作和执行效率 . 如同Spark,GraphX的代码非常简洁 . GraphX的核心代码只有3千多行,而在此之上实现的Pregel模型,只要短短的20多行 . GraphX的代码结构整体下图所示,其中大部分的实现,都是围绕Partition的优化进行的 . 这在某种程度上说明了点分割的存储和相应的计算优化的确是图计算框架的重点和难点 . 

raphX的底层设计有以下几个关键点 . 
1.对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成 . 这样对一个图的计算,最终在逻辑上,等价于一系列RDD的转换过程 . 因此,Graph最终具备了RDD的3个关键特性:Immutable Distributed和Fault-Tolerant . 其中最关键的是Immutable(不变性) . 逻辑上,所有图的转换和操作都产生了一个新图;物理上,GraphX会有一定程度的不变顶点和边的复用优化,对用户透明 . 
2.两种视图底层共用的物理数据,由RDD[Vertex-Partition]和RDD[EdgePartition]这两个RDD组成 . 点和边实际都不是以表Collection[tuple]的形式存储的,而是由VertexPartition/EdgePartition在内部存储一个带索引结构的分片数据块,以加速不同视图下的遍历速度 . 不变的索引结构在RDD转换过程中是共用的,降低了计算和存储开销 . 
3.图的分布式存储采用点分割模式,而且使用partitionBy方法,由用户指定不同的划分策略(PartitionStrategy) . 划分策略会将边分配到各个EdgePartition,顶点Master分配到各个VertexPartition,EdgePartition也会缓存本地边关联点的Ghost副本 . 划分策略的不同会影响到所需要缓存的Ghost副本数量,以及每个EdgePartition分配的边的均衡程度,需要根据图的结构特征选取最佳策略 . 目前有EdgePartition2d EdgePartition1d RandomVertexCut和CanonicalRandomVertexCut这四种策略 . 在淘宝大部分场景下,EdgePartition2d效果最好 . 

SparkR
SparkR是AMPLab发布的一个R开发包,使得R摆脱单机运行的命运,可以作为Spark的job运行在集群上,极大得扩展了R的数据处理能力 . 
SparkR的几个特性:
l  提供了Spark中弹性分布式数据集(RDD)的API,用户可以在集群上通过R shell交互性的运行Spark job . 
2  支持序化闭包功能,可以将用户定义函数中所引用到的变量自动序化发送到集群中其他的机器上 . 
3  SparkR还可以很容易地调用R开发包,只需要在集群上执行操作前用includePackage读取R开发包就可以了,当然集群上要安装R开发包 . 

Tachyon
Tachyon是一个高容错的分布式文件系统,允许文件以内存的速度在集群框架中进行可靠的共享,就像Spark和 MapReduce那样 . 通过利用信息继承,内存侵入,Tachyon获得了高性能 . Tachyon工作集文件缓存在内存中,并且让不同的 Jobs/Queries以及框架都能内存的速度来访问缓存文件" . 因此,Tachyon可以减少那些需要经常使用的数据集通过访问磁盘来获得的次数 . Tachyon兼容Hadoop,现有的Spark和MR程序不需要任何修改而运行 . 
在2013年4月,AMPLab共享了其Tachyon 0.2.0 Alpha版本的Tachyon,其宣称性能为HDFS的300倍,继而受到了极大的关注 . Tachyon的几个特性如下:
lJAVA-Like File API
Tachyon提供类似JAVA File类的API,
2兼容性
Tachyon实现了HDFS接口,所以Spark和MR程序不需要任何修改即可运行 . 
3可插拔的底层文件系统
Tachyon是一个可插拔的底层文件系统,提供容错功能 . tachyon将内存数据记录在底层文件系统 . 它有一个通用的接口,使得可以很容易的插入到不同的底层文件系统 . 目前支持HDFS,S3,GlusterFS和单节点的本地文件系统,以后将支持更多的文件系统 . 

hadoop dfsadmin -report 
用这个命令可以快速定位出哪些节点down掉了,HDFS的容量以及使用了多少,以及每个节点的硬盘使用情况,但是并不能定位HDFS损坏块 . 

hadoop为各个守护进程(namenode,secondarynamenode,jobtracker,datanode,tasktracker)统一分配的内存在hadoop-env.sh中设置,参数为HADOOP_HEAPSIZE,默认为1000M . 

Hadoop Yarn 的三种资源调度器详解
1) 默认调度器FIFO
hadoop中默认的调度器,采用先进先出的原则
2) 计算能力调度器Capacity Scheduler
选择占用资源小,优先级高的先执行
3) 公平调度器Fair Scheduler
同一队列中的作业公平共享队列中所有资源

Hive中的元数据通常包括:表的名字,表的列和分区及其属性,表的属性(内部表和 外部表),表的数据所在目录 . Hive元存储(Metastore)管理参数默认储存在自带的Derby数据库中 . 缺点就是不适合多用户操作,并且数据存储目录不固定 . 数据库和Hive所在的机器绑定,极度不方便管理 . 通常将元数据存储在我们自己创建的MySQL数据库(本地或远程)当中 . 
元数据存储两种方法:
(1)derby数据库
默认内存数据库,一次只能打开一个会话,会话关闭metastore数据消失
(2)MySQL数据库
外部metastore存储引擎,可以让多个会话使用
即:
自带的Derby数据库
本地RDBMS数据库,即关系数据库管理系统(Relational Database Management System), 如MySQL
远程MySQL

flume采集日志时中间停了,怎么记录之前的日志?
当节点出现故障时,日志能够被传送到其他节点上而不会丢失
事件在通道中进行,该通道管理从故障中恢复 . Flume支持一个由本地文件系统支持的持久文件通道 . 还有一个内存通道,它只是将事件存储在内存中的队列中,这更快,但是当代理进程死亡时仍然留在内存通道中的任何事件都无法恢复 . 
a)Flume提供了三种级别的可靠性保障,从强到弱依次分别为
i.end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送)
ii.Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送)
iii.Best effort(数据发送到接收方后,不会进行确认)

在实际应用中还存在这样一个问题,比如导入数据的时候,Map Task 执行失败, 那么该 Map 任务会转移到另外一个节点执行重新运行,这时候之前导入的数据又要重新导入一份,造成数据重复导入 .  因为 Map Task 没有回滚策略,一旦运行失败,已经导入数据库中的数据就无法恢复 . 
Sqoop export 提供了一种机制能保证原子性, 使用--staging-table 选项指定临时导入的表 . 
Sqoop export 导出数据的时候会分为两步:
第一步,将数据导入数据库中的临时表,如果导入期间 Map Task 失败,会删除临时表数据重新导入;
第二步,确认所有 Map Task 任务成功后,会将临时表名称为指定的表名称 . 
sqoop export \--connect jdbc:mysql://db.dajiangtai.net:3306/djtdb_hadoop \--username sqoop \--password sqoop \--table user \--staging-table staging_user

十个海量数据处理方法大总结
一 Bloom filter
适用范围:可以用来实现数据字典,进行数据的判重,或者集合求交集
基本原理及要点:
对于原理来说很简单,位数组+k个独立hash函数 . 将hash函数对应的值的位数组置1,查找时如果发现所有hash函数对应位都是1说明存在,很明显这个过程并不保证查找的结果是100%正确的 . 同时也不支持删除一个已经插入的关键字,因为该关键字对应的位会牵动到其他的关键字 . 所以一个简单的改进就是 counting Bloom filter,用一个counter数组代替位数组,就可以支持删除了 . 
还有一个比较重要的问题,如何根据输入元素个数n,确定位数组m的大小及hash函数个数 . 当hash函数个数k=(ln2)(m/n)时错误率最小 . 在错误率不大于E的情况下,m至少要等于nlg(1/E)才能表示任意n个元素的集合 . 但m还应该更大些,因为还要保证bit数组里至少一半为0,则m应该>=nlg(1/E)lge 大概就是nlg(1/E)1.44倍(lg表示以2为底的对数) . 
举个例子我们假设错误率为0.01,则此时m应大概是n的13倍 . 这样k大概是8个 . 
注意这里m与n的单位不同,m是bit为单位,而n则是以元素个数为单位(准确的说是不同元素的个数) . 通常单个元素的长度都是有很多bit的 . 所以使用bloom filter内存上通常都是节省的 . 
扩展:
Bloom filter将集合中的元素映射到位数组中,用k(k为哈希函数个数)个映射位是否全1表示元素在不在这个集合中 . Counting bloom filter(CBF)将位数组中的每一位扩展为一个counter,从而支持了元素的删除操作 . Spectral Bloom Filter(SBF)将其与集合元素的出现次数关联 . SBF采用counter中的最小值来近似表示元素的出现频率 . 
问题实例:给你A,B两个文件,各存放50亿条URL,每条URL占用64字节,内存限制是4G,让你找出A,B文件共同的URL . 如果是三个乃至n个文件呢?
根据这个问题我们来计算下内存的占用,4G=2^32大概是40亿8大概是340亿,n=50亿,如果按出错率0.01算需要的大概是650亿个bit . 现在可用的是340亿,相差并不多,这样可能会使出错率上升些 . 另外如果这些urlip是一一对应的,就可以转换成ip,则大大简单了 . 
二 Hashing
适用范围:快速查找,删除的基本数据结构,通常需要总数据量可以放入内存
基本原理及要点:
hash函数选择,针对字符串,整数,排列,具体相应的hash方法 . 
碰撞处理,一种是open hashing,也称为拉链法;另一种就是closed hashing,也称开地址法,opened addressing . 
扩展:
d-left hashing中的d是多个的意思,我们先简化这个问题,看一看2-left hashing . 2-left hashing指的是将一个哈希表分成长度相等的两半,分别叫做T1和T2,给T1和T2分别配备一个哈希函数,h1和h2 . 在存储一个新的key时,同时用两个哈希函数进行计算,得出两个地址h1[key]和h2[key] . 这时需要检查T1中的h1[key]位置和T2中的h2[key]位置,哪一个位置已经存储的(有碰撞的)key比较多,然后将新key存储在负载少的位置 . 如果两边一样多,比如两个位置都为空或者都存储了一个key,就把新key存储在左边的T1子表中,2-left也由此而来 . 在查找一个key时,必须进行两次hash,同时查找两个位置 . 
问题实例:
1).海量日志数据,提取出某日访问百度次数最多的那个IP . 
IP的数目还是有限的,最多2^32个,所以可以考虑使用hash将ip直接存入内存,然后进行统计 . 
三 bit-map
适用范围:可进行数据的快速查找,判重,删除,一般来说数据范围是int的10倍以下
基本原理及要点:使用bit数组来表示某些元素是否存在,比如8位电话号码
扩展:bloom filter可以看做是对bit-map的扩展
问题实例:
1)已知某个文件内包含一些电话号码,每个号码为8位数字,统计不同号码的个数 . 
8位最多99 999 999,大概需要99m个bit,大概10几m字节的内存即可 . 
2)2.5亿个整数中找出不重复的整数的个数,内存空间不足以容纳这2.5亿个整数 . 
将bit-map扩展一下,用2bit表示一个数即可,0表示未出现,1表示出现一次,2表示出现2次及以上 . 或者我们不用2bit来进行表示,我们用两个bit-map即可模拟实现这个2bit-map . 
四 堆
适用范围:海量数据前n大,并且n比较小,堆可以放入内存
基本原理及要点:最大堆求前n小,最小堆求前n大 . 方法,比如求前n小,我们比较当前元素与最大堆里的最大元素,如果它小于最大元素,则应该替换那个最大元素 . 这样最后得到的n个元素就是最小的n个 . 适合大数据量,求前n小,n的大小比较小的情况,这样可以扫描一遍即可得到所有的前n元素,效率很高 . 
扩展:双堆,一个最大堆与一个最小堆结合,可以用来维护中位数 . 
问题实例:
1)100w个数中找最大的前100个数 . 
用一个100个元素大小的最小堆即可 . 
五 双层桶划分----其实本质上就是[分而治之]的思想,重在分的技巧上!
适用范围:第k大,中位数,不重复或重复的数字
基本原理及要点:因为元素范围很大,不能利用直接寻址表,所以通过多次划分,逐步确定范围,然后最后在一个可以接受的范围内进行 . 可以通过多次缩小,双层只是一个例子 . 
扩展:
问题实例:
1).2.5亿个整数中找出不重复的整数的个数,内存空间不足以容纳这2.5亿个整数 . 
有点像鸽巢原理,整数个数为2^32,也就是,我们可以将这2^32个数,划分为2^8个区域(比如用单个文件代表一个区域),然后将数据分离到不同的区域,然后不同的区域在利用bitmap就可以直接解决了 . 也就是说只要有足够的磁盘空间,就可以很方便的解决 . 
2).5亿个int找它们的中位数 . 
这个例子比上面那个更明显 . 首先我们将int划分为2^16个区域,然后读取数据统计落到各个区域里的数的个数,之后我们根据统计结果就可以判断中位数落到那个区域,同时知道这个区域中的第几大数刚好是中位数 . 然后第二次扫描我们只统计落在这个区域中的那些数就可以了 . 
实际上,如果不是int是int64,我们可以经过3次这样的划分即可降低到可以接受的程度 . 即可以先将int64分成2^24个区域,然后确定区域的第几大数,在将该区域分成2^20个子区域,然后确定是子区域的第几大数,然后子区域里的数的个数只有2^20,就可以直接利用direct addr table进行统计了 . 
六 数据库索引
适用范围:大数据量的增删改查
基本原理及要点:利用数据的设计实现方法,对海量数据的增删改查进行处理 . 
七 倒排索引(Inverted index)
适用范围:搜索引擎,关键字查询
基本原理及要点:为何叫倒排索引?一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射 . 
以英文为例,下面是要被索引的文本:T0 = "it is what it is" T1 = "what is it" T2 = "it is a banana"
我们就能得到下面的反向文件索引:
"a":{2} "banana":{2} "is":{0, 1, 2} "it":{0, 1, 2} "what":{0, 1}
检索的条件"what","is"和"it"将对应集合的交集 . 
正向索引开发出来用来存储每个文档的单词的列表 . 正向索引的查询往往满足每个文档有序频繁的全文查询和每个单词在校验文档中的验证这样的查询 . 在正向索引中,文档占据了中心的位置,每个文档指向了一个它所包含的索引项的序列 . 也就是说文档指向了它包含的那些单词,而反向索引则是单词指向了包含它的文档,很容易看到这个反向的关系 . 
扩展:
问题实例:文档检索系统,查询那些文件包含了某单词,比如常见的学术论文的关键字搜索 . 
八 外排序
适用范围:大数据的排序,去重
基本原理及要点:外排序的归并方法,置换选择败者树原理,最优归并树
扩展:
问题实例:
1).有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16个字节,内存限制大小是1M . 返回频数最高的100个词 . 
这个数据具有很明显的特点,词的大小为16个字节,但是内存只有1m做hash有些不够,所以可以用来排序 . 内存可以当输入缓冲区使用 . 
九 trie树
适用范围:数据量大,重复多,但是数据种类小可以放入内存
基本原理及要点:实现方式,节点孩子的表示方式
扩展:压缩实现 . 
问题实例:
1).有10个文件,每个文件1G,每个文件的每一行都存放的是用户的query,每个文件的query都可能重复 . 要你按照query的频度排序 . 
2).1000万字符串,其中有些是相同的(重复),需要把重复的全部去掉,保留没有重复的字符串 . 请问怎么设计和实现?
3).寻找热门查询:查询串的重复度比较高,虽然总数是1千万,但如果除去重复后,不超过3百万个,每个不超过255字节 . 
十 分布式处理 mapreduce
适用范围:数据量大,但是数据种类小可以放入内存
基本原理及要点:将数据交给不同的机器去处理,数据划分,结果归约 . 
扩展:
问题实例:
1).The canonical example application of MapReduce is a process to count the appearances ofeach different word in a set of documents:
2).海量数据分布在100台电脑中,想个办法高效统计出这批数据的TOP10 . 
3).一共有N个机器,每个机器上有N个数 . 每个机器最多存O(N)个数并对它们操作 . 如何找到N^2个数的中数(median)?
经典问题分析
上千万or亿数据(有重复),统计其中出现次数最多的前N个数据,分两种情况:可一次读入内存,不可一次读入 . 
可用思路:trie树+堆,数据库索引,划分子集分别统计,hash,分布式计算,近似统计,外排序
所谓的是否能一次读入内存,实际上应该指去除重复后的数据量 . 如果去重后数据可以放入内存,我们可以为数据建立字典,比如通过 map,hashmap,trie,然后直接进行统计即可 . 当然在更新每条数据的出现次数的时候,我们可以利用一个堆来维护出现次数最多的前N个数据,当然这样导致维护次数增加,不如完全统计后在求前N大效率高 . 
如果数据无法放入内存 . 一方面我们可以考虑上面的字典方法能否被改进以适应这种情形,可以做的改变就是将字典存放到硬盘上,而不是内存,这可以参考数据库的存储方法 . 
当然还有更好的方法,就是可以采用分布式计算,基本上就是map-reduce过程,首先可以根据数据值或者把数据hash(md5)后的值,将数据按照范围划分到不同的机子,最好可以让数据划分后可以一次读入内存,这样不同的机子负责处理各种的数值范围,实际上就是map . 得到结果后,各个机子只需拿出各自的出现次数最多的前N个数据,然后汇总,选出所有的数据中出现次数最多的前N个数据,这实际上就是reduce过程 . 
实际上可能想直接将数据均分到不同的机子上进行处理,这样是无法得到正确的解的 . 因为一个数据可能被均分到不同的机子上,而另一个则可能完全聚集到一个机子上,同时还可能存在具有相同数目的数据 . 比如我们要找出现次数最多的前100个,我们将1000万的数据分布到10台机器上,找到每台出现次数最多的前 100个,归并之后这样不能保证找到真正的第100个,因为比如出现次数最多的第100个可能有1万个,但是它被分到了10台机子,这样在每台上只有1千个,假设这些机子排名在1000个之前的那些都是单独分布在一台机子上的,比如有1001个,这样本来具有1万个的这个就会被淘汰,即使我们让每台机子选出出现次数最多的1000个再归并,仍然会出错,因为可能存在大量个数为1001个的发生聚集 . 因此不能将数据随便均分到不同机子上,而是要根据hash 后的值将它们映射到不同的机子上处理,让不同的机器处理一个数值范围 . 
而外排序的方法会消耗大量的IO,效率不会很高 . 而上面的分布式方法,也可以用于单机版本,也就是将总的数据根据值的范围,划分成多个不同的子文件,然后逐个处理 . 处理完毕之后再对这些单词的及其出现频率进行一个归并 . 实际上就可以利用一个外排序的归并过程 . 
另外还可以考虑近似计算,也就是我们可以通过结合自然语言属性,只将那些真正实际中出现最多的那些词作为一个字典,使得这个规模可以放入内存 . 

kafka的message包括哪些信息
一个Kafka的Message由一个固定长度的header和一个变长的消息体body组成
header部分由一个字节的magic(文件格式)和四个字节的CRC32(用于判断body消息体是否正常)构成 . 当magic的值为1的时候,会在magic和crc32之间多一个字节的数据:attributes(保存一些相关属性,比如是否压缩 压缩格式等等);如果magic的值为0,那么不存在attributes属性
body是由N个字节构成的一个消息体,包含了具体的key/value消息

怎么查看kafka的offset
0.9版本以上,可以用最新的Consumer client 客户端,有consumer.seekToEnd() / consumer.position() 可以用于得到当前最新的offset:

hadoop的shuffle过程
一 Map端的shuffle
  Map端会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS . 每个Map的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill . 
  在spill写入之前,会先进行二次排序,首先根据数据所属的partition进行排序,然后每个partition中的数据再按key来排序 . partition的目是将记录划分到不同的Reducer上去,以期望能够达到负载均衡,以后的Reducer就会根据partition来读取自己对应的数据 . 接着运行combiner(如果设置了的话),combiner的本质也是一个Reducer,其目的是对将要写入到磁盘上的文件先进行一次处理,这样,写入到磁盘的数据量就会减少 . 最后将数据写到本地磁盘产生spill文件(spill文件保存在{mapred.local.dir}指定的目录中,Map任务结束后就会被删除) . 
  最后,每个Map任务可能产生多个spill文件,在每个Map任务完成前,会通过多路归并算法将这些spill文件归并成一个文件 . 至此,Map的shuffle过程就结束了 . 
二 Reduce端的shuffle
  Reduce端的shuffle主要包括两个阶段,copy sort(merge) . 
  首先要将Map端产生的输出文件拷贝到Reduce端,但每个Reducer如何知道自己应该处理哪些数据呢?因为Map端进行partition的时候,实际上就相当于指定了每个Reducer要处理的数据(partition就对应了Reducer),所以Reducer在拷贝数据的时候只需拷贝与自己对应的partition中的数据即可 . 每个Reducer会处理一个或者多个partition,但需要先将自己对应的partition中的数据从每个Map的输出结果中拷贝过来 . 
  接下来就是sort阶段,也成为merge阶段,因为这个阶段的主要工作是执行了归并排序 . 从Map端拷贝到Reduce端的数据都是有序的,所以很适合归并排序 . 最终在Reduce端生成一个较大的文件作为Reduce的输入 . 

spark集群运算的模式
Spark 有很多种模式,最简单就是单机本地模式,还有单机伪分布式模式,复杂的则运行在集群中,目前能很好的运行在 Yarn和 Mesos 中,当然 Spark 还有自带的 Standalone 模式,对于大多数情况 Standalone 模式就足够了,如果企业已经有 Yarn 或者 Mesos 环境,也是很方便部署的 . 
standalone(集群模式):典型的Mater/slave模式,不过也能看出Master是有单点故障的;Spark支持ZooKeeper来实现 HA
on yarn(集群模式):运行在 yarn 资源管理器框架之上,由 yarn 负责资源管理,Spark 负责任务调度和计算
on mesos(集群模式):运行在 mesos 资源管理器框架之上,由 mesos 负责资源管理,Spark 负责任务调度和计算
on cloud(集群模式):比如 AWS 的 EC2,使用这个模式能很方便的访问 Amazon的 S3;Spark 支持多种分布式存储系统:HDFS 和 S3


Yarn的基本思想是拆分资源管理的功能,作业调度/监控到单独的守护进程 . 
ResourceManager是全局的,负责对于系统中的所有资源有最高的支配权 . 
ApplicationMaster 每一个job有一个ApplicationMaster  . 
NodeManager,NodeManager是基本的计算框架 . 

spark2.0的了解
更简单:ANSI SQL与更合理的API
速度更快:用Spark作为编译器
更智能:Structured Streaming

rdd 怎么分区宽依赖和窄依赖
宽依赖:父RDD的分区被子RDD的多个分区使用   例如 groupByKey reduceByKey sortByKey等操作会产生宽依赖,会产生shuffle
窄依赖:父RDD的每个分区都只被子RDD的一个分区使用  例如map filter union等操作会产生窄依赖

spark streaming 读取kafka数据的两种方式
这两种方式分别是:
Receiver-base
使用Kafka的高层次Consumer API来实现 . receiver从Kafka中获取的数据都存储在Spark Executor的内存中,然后Spark Streaming启动的job会去处理那些数据 . 然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据 . 如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL) . 该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中 . 所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复 . 
Direct
Spark1.3中引入Direct方式,用来替代掉使用Receiver接收数据,这种方式会周期性地查询Kafka,获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围 . 当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据 . 

kafka的数据存在内存还是磁盘
Kafka最核心的思想是使用磁盘,而不是使用内存 . 
而且Linux对于磁盘的读写优化也比较多,包括read-ahead和write-behind,磁盘缓存等 . 如果在内存做这些操作的时候,一个是JAVA对象的内存开销很大,另一个是随着堆内存数据的增多,JAVA的GC时间会变得很长,使用磁盘操作有以下几个好处:
磁盘缓存由Linux系统维护,减少了程序员的不少工作 . 
磁盘顺序读写速度超过内存随机读写 . 
JVM的GC效率低,内存占用大 . 使用磁盘可以避免这一问题 . 
系统冷启动后,磁盘缓存依然可用 . 

怎么解决kafka的数据丢失
producer端:
宏观上看保证数据的可靠安全性,肯定是依据分区数做好数据备份,设立副本数 . 
broker端:
topic设置多分区,分区自适应所在机器,为了让各分区均匀分布在所在的broker中,分区数要大于broker数 . 
分区是kafka进行并行读写的单位,是提升kafka速度的关键 . 
Consumer端
consumer端丢失消息的情形比较简单:如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失 . 由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做 . 为了避免数据丢失,现给出两点建议:
enable.auto.commit=false  关闭自动提交位移
在消息被完整处理之后再手动提交位移

zookeeper 在 kafka 中起到什么作用
Controller 选举
Kafka集群中的其中一个 Broker 会被选举为Controller, 其负责维护所有 Partition 的 leader/follower 关系(Partition 管理和副本状态管理) . 当有 partition 的 leader 挂掉之后,controller 会从 follower 中选出一个 leader,也会执行类似于重分配 Partition 之类的管理任务 . 
==Zookeeper 负责从 Broker 中选举出一个作为 Controller, 并确保其唯一性 .  同时, 当Controller 宕机时, 选举一个新的 . ==
集群 membership
==记录集群中都有哪些活跃着的Broker . ==
Topic 配置
==记录有哪些Topic, Topic 都有哪些 Partition,Replica 存放在哪里, Leader 是谁 . ==
==在 consumer group 发生变化时进行 rebalance . ==
配额(0.9.0+)
记录每个客户能够读写的数据量 . 
ACLs(0.9.0+)
记录对Topic 的读写控制 . 

kafka 在 zookeeper 上创建的目录结构

注册的节点如下:
==consumers admin config controller brokers controller_epoch==
topic 注册信息
/brokers/topics/[topic]:存储某个 topic 的 partitions 所有分配信息
partition状态信息
/brokers/topics/[topic]/partitions/[0...N]  其中[0..N]表示partition索引号
/brokers/topics/[topic]/partitions/[partitionId]/state
Broker注册信息
/brokers/ids/[0...N]
每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL)
Controller epoch
/controller_epoch -> int (epoch)
此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1
Controller注册信息
/controller -> int (broker id of the controller)  存储center     controller中央控制器所在kafka broker的信息 . 这个值默认是 1,当 controller 节点挂掉后重新选举 controller 后,值会 +1
Consumer注册信息
每个consumer都有一个唯一的ID(consumerId可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息
/consumers/[groupId]/ids/[consumerIdString]
Consumer owner
/consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引编号

kafka consumer 均衡算法
当一个group中,有consumer加入或者离开时,会触发partitions均衡 . 均衡的最终目的,是提升topic的并发消费能力 . 

1假如 topic1 具有如下 partitions:P0,P1,P2,P3
2假如 group 中有如下 consumer:C0,C1
3首先根据 partition 索引号对 partitions 排序:P0,P1,P2,P3
4根据(consumer.id + '-'+ thread序号)排序:C0,C1
5计算倍数:M = [P0,P1,P2,P3].size / [C0,C1].size,本例值 M = 2 (向上取整)
6然后依次分配 partitions:C0 = [P0,P1],C1=[P2,P3],即 Ci = [P(i  M),P((i + 1)  M -1)]

kafka 数据高可用的原理是什么
一致性定义:若某条消息对Consumer可见,那么即使Leader宕机了,在新Leader上数据依然可以被读到
HighWaterMark简称HW:Partition的高水位,取一个partition对应的ISR中最小的LEO作为HW,消费者最多只能消费到HW所在的位置,另外每个replica都有highWatermark,leader和follower各自负责更新自己的highWatermark状态,highWatermark <= leader. LogEndOffset
对于Leader新写入的msg,Consumer不能立刻消费,Leader会等待该消息被所有ISR中的replica同步后,更新HW,此时该消息才能被Consumer消费,即Consumer最多只能消费到HW位置
这样就保证了如果Leader Broker失效,该消息仍然可以从新选举的Leader中获取 . 对于来自内部Broker的读取请求,没有HW的限制 . 同时,Follower也会维护一份自己的HW,Followr.HW = min(Leader.HW, Follower.offset)

kafka 的数据可靠性保证
当Producer向Leader发送数据时,可以通过acks参数设置数据可靠性的级别
0:不论写入是否成功,server不需要给Producer发送Response,如果发生异常,server会终止连接,触发Producer更新meta数据;
1:Leader写入成功后即发送Response,此种情况如果Leader fail,会丢失数据
-1:等待所有ISR接收到消息后再给Producer发送Response,这是最强保证
仅设置acks=-1也不能保证数据不丢失,当Isr列表中只有Leader时,同样有可能造成数据丢失 . 要保证数据不丢除了设置acks=-1, 还要保证ISR的大小大于等于2,具体参数设置:
1.request.required.acks:设置为-1 等待所有ISR列表中的Replica接收到消息后采算写成功;
2.min.insync.replicas:设置为大于等于2,保证ISR中至少有两个Replica
注意:Producer要在吞吐率和数据可靠性之间做一个权衡

kafka partition 分区的策略是什么
消息发送到哪个分区上,有两种基本的策略,一是采用 Key Hash 算法,一是采用 Round Robin 算法 . 另外创建分区时,最好是 broker 数量的整数倍,这样才能让一个 Topic 的分区均匀的分布在整个 Kafka 集群中 . 
默认情况下,Kafka 根据传递消息的 key 来进行分区的分配,即 hash(key) % numPartitions . 
如果发送消息时没有指定key,那么 Producer 将会把这条消息发送给随机的一个 Partition . 但是代码层面的逻辑并不完全是这样 . 首先看看Kafka有没有缓存的现成的分区Id,如果有的话直接使用这个分区Id . 如果没有的话,找出所有可用分区的leader所在的broker,从中随机挑一个并放到缓存中,下次就直接从缓存中拿这个 partition id . 注意这个缓存是每隔一段时间就会被清空的 . 这么做的目的是为了减少服务器端的sockets数 . 

Kafka Producer是如何动态感知Topic分区数变化
问题是,如果在 Kafka Producer 往 Kafka 的 Broker 发送消息的时候用户通过命令修改了改主题的分区数,Kafka Producer 能动态感知吗?答案是可以的 . 那是立刻就感知吗?不是,是过一定的时间(topic.metadata.refresh.interval.ms参数决定)才知道分区数改变的 . 

Spark 任务提交过程源码
Driver 程序的代码运行到 action 操作,触发了 SparkContext 的 runJob 方法
SparkContext 调用 DAGScheduler 的 runJob 函数,内部调用 DAGScheduler 的 submitJob 方法,返回一个 JobWaiter 对象 . 接着向 EventProcessLoop 的阻塞队列中 put 一个 JobSubmitted 事件 . 
这时候 DAGScheduler 的 onReceive 方法被调用,模式匹配,调用 handleJobSubmitted 方法,用来切分 stage . stage 的划分过程是递归调用,从前往后的划分 stage . 
根据 final stage 递归找到第一个 stage,然后将第一个 stage 提交 . 
由于 stage 的类型不同,这里会有两种不同类型的 task,ShuffleMapTask 和 ResultTask . 把 task 封装到 taskSet 里,把 Tasks 交给 TaskScheduler . (RPC 调用,向 Executor 提交 task)
Executor 将 task 封装到 TaskRunner 对象中,将 taskRunner 放入到 Executor 中的线程池中 . 
最后会调用 ShuffleMapTask 或 ResultTask 的 runTask 方法,执行业务逻辑 . 

Spark 部署的三种模式介绍
standalone模式,即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统 . 目前 Spark 在 standalone 模式下是没有任何单点故障问题的,这是借助zookeeper实现的,思想类似于Hbase master单点故障解决方案 . 
Spark On Mesos模式 . 这是很多公司采用的模式,官方推荐这种模式(当然,原因之一是血缘关系) . 正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然 . 目前在Spark On Mesos环境中,用户可选择两种调度模式之一运行自己的应用程序(可参考Andrew Xia的"Mesos Scheduling Mode on Spark"):
粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个"slot") . 应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源 . 
细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配 . 与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源 . 每个Task会汇报状态给Mesos slave和Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于MapReduce调度模式,每个Task完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大 . 
Spark On YARN模式 . 这是一种最有前景的部署模式 . 但限于YARN自身的发展,目前仅支持粗粒度模式(Coarse-grained Mode) . 这是由于YARN上的Container资源是不可以动态伸缩的,一旦Container启动之后,可使用的资源不能再发生变化,不过这个已经在YARN计划中了 . 
spark on yarn 的支持两种模式:
1. yarn-cluster:适用于生产环境;
2. yarn-client:适用于交互 调试,希望立即看到app的输出
yarn-cluster 和 yarn-client 的区别在于 yarn appMaster,每个 yarn app 实例有一个 appMaster 进程,是为 app 启动的第一个 container;负责从 ResourceManager 请求资源,获取到资源后,告诉 NodeManager 为其启动 container . 
yarn-cluster和yarn-client模式的区别其实就是Application Master进程的区别,yarn-cluster模式下,driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况 . 当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行 . 然而yarn-cluster模式不适合运行交互类型的作业 . 而yarn-client模式下,Application Master仅仅向YARN请求executor,client会和请求的container通信来调度他们工作,也就是说Client不能离开 . 

如果集群是spark专用的,可以用standalone,是和别的应用共享的建议用yarn

会产生 shuffle 的算子
combineByKey reduceByKey groupByKey cogroup join leftOutJoin rightOutJoin

Spark Streaming 和 Storm 的区别
处理模型,延迟
虽然这两个框架都提供可扩展性和容错性,它们根本的区别在于他们的处理模型 . 而 Storm 处理的是每次传入的一个事件,而 Spark Streaming 是处理某个时间段窗口内的事件流 . 因此,Storm 处理一个事件可以达到秒内的延迟,而 Spark Streaming 则有几秒钟的延迟 . 
容错 数据保证
在容错数据保证方面的权衡是,Spark Streaming 提供了更好的支持容错状态计算 . 在 Storm 中,每个单独的记录当它通过系统时必须被跟踪,所以 Storm 能够至少保证每个记录将被处理一次,但是在从错误中恢复过来时候允许出现重复记录 . 这意味着可变状态可能不正确地被更新两次 . 
另一方面,Spark Streaming 只需要在批级别进行跟踪处理,因此可以有效地保证每个 mini-batch 将完全被处理一次,即便一个节点发生故障 . (实际上 Storm 的 Trident library 库也提供了完全一次处理 . 但是,它依赖于事务更新状态,这比较慢,通常必须由用户实现) . 
总结
简而言之,如果你需要秒内的延迟,Storm 是一个不错的选择,而且没有数据丢失 . 如果你需要有状态的计算,而且要完全保证每个事件只被处理一次,Spark Streaming 则更好 . Spark Streaming 编程逻辑也可能更容易,因为它类似于批处理程序(Hadoop),特别是在你使用批次(尽管是很小的)时 . 

Tungsten-sort 实现了内存的自主管理,管理方式模拟了操作系统的方式,通过Page可以使得大量的record被顺序存储在内存,整个shuffle write 排序的过程只需要对指针进行运算(二进制排序),并且无需反序列化,整个过程非常高效,对于减少GC,提高内存访问效率,提高CPU使用效率确实带来了明显的提升 . 

spark shuffle 有:hash,sort,tungsten-sort . 
Spark的Shuffle总体而言就包含两个基本的过程:Shuffle write和Shuffle read . ShuffleMapTask的整个执行过程就是Shuffle write . hash-based机制就是在Shuffle的过程

锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章