第十周,大數(shù)據(jù)建模學(xué)習(xí)應(yīng)用于模具行業(yè)的培訓(xùn)班
早上, 很早到書城, 今天最后一天培訓(xùn), 還是在 SPARK 復(fù)習(xí)。
開發(fā) SPARK 使用 scala 、java 、 python 、 api 以及 shell 語言開發(fā)的 搜索引擎。
缺點: 不適合 web服務(wù) , dao層 、web爬蟲
優(yōu)點: 伯克利數(shù)據(jù)分析 生態(tài)圈 , 機器學(xué)習(xí) , 數(shù)據(jù)挖掘, 數(shù)據(jù)庫, 信息檢索, 自然語言處理, 語音識別
以 spark core 為核心 , 從 hdfs , amazon s3 hbase 等持久讀取數(shù)據(jù)
SPARK SQL 基于內(nèi)存的,
查看數(shù)據(jù)庫
這個在前端也能看到,點擊進去, 詳細說明。
HIVE 不支持事物的, 增刪改查。 ACRD 使用
在那一臺機器完成的, 可以看看
提交的人物, 由系統(tǒng)分配節(jié)點去跑。
HIVE 主要做 數(shù)據(jù)倉庫 DW ,
插入數(shù)據(jù)完成, 做一下 查詢數(shù)據(jù)
將 HIVE 的內(nèi)核, 改成 SPARK 以后, 提升速度 80倍
上面是 HOVE 的插入和查詢的過程。
接著跑 SPARK SQL 的過程
所有的 數(shù)據(jù)庫和表, 都是存儲在 HDFS 上的。
現(xiàn)在要用 SPARK SQL 來執(zhí)行。
show database
use
select
這個是 SPARK的管理頁面
DAG 的有向性的結(jié)合圖
DAG Scheduler
RDD 經(jīng)過 4個 聚合步驟后, 形成一個 DAG , 多個 DAG 組合編程 DAG計劃表
下面是 scala 寫的
要統(tǒng)計下面文件中的個數(shù)
上述雖然是 一行代碼, 但是也顯示的淋漓盡致
運行的狀況
管理頁面的結(jié)果
環(huán)境配置
SPARK 單詞統(tǒng)計的 DEMO , 接下來再看看
SPARK STREM 消費kaFka 的數(shù)據(jù), 管理者確定是 ZooKeeper
下午講解 Klin 做一些報表的功能, SPARK
要搭建不同的集群, 由原來的 store 的程序, 遷移到 spark 上來, Flink 是把交互式查詢和實時計算合在一起了。
Flink 也是用 Scala 來寫。
1、客戶端提交作業(yè)后,啟動Driver,Driver是Spark作業(yè)的Master(也就是通過Driver來啟動Receiver,定時去啟動任務(wù)的處理,
注意的是,驅(qū)動啟動任務(wù)會受前一個任務(wù)執(zhí)行的影響。也就是前一個任務(wù)沒有執(zhí)行完成后,是不會啟動后邊的任務(wù)的。
所以,注意你的streaming的執(zhí)行時間,絕對不要超過Recive數(shù)據(jù)的時間)
2、每個作業(yè)包含多個Executor,每個Executor以線程的方式運行task,Spark Streaming至少包含一個Receiver task。
(一個Executor就是一個spark進程,在yarn中就是一個container,這個大家應(yīng)該知道。
然后Receiver task是在driver中創(chuàng)建的,我理解一個Receiver是運行在一個Executor中的。
然后如果想要創(chuàng)建多個Receiver,那么需要大概這樣做(1 to 10).map(_.createStream…),這樣就能創(chuàng)建10個receiver task啦。
注意這個數(shù)量當然不能超過你的結(jié)點數(shù)量啦。
還有個問題,通常使用kafka比較合適,因為kafka是stream向kafka來poll數(shù)據(jù)。
而他媽的flume默認只支持pull,如果想支持poll,那需要定制sink,那真是太惡心了。)
3、Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報給Driver,然后備份到另外一個Executor上。
(默認情況下接受數(shù)據(jù)是200毫秒生成一個block,我理解一個block應(yīng)該是一個partition?
這個還不確定,需要對照源代碼看一下;
然后會把生成的Block隨機扔到不同的Executor,同時,driver去派發(fā)任務(wù)時,也會找到就近的Executor。
我理解,節(jié)點中的所有executor都應(yīng)該會有數(shù)據(jù)才對)
4、ReceiverTracker維護Receiver匯報的BlockId。
(這個ReceiverTracker應(yīng)該是維護在Driver中,Driver會根據(jù)維護的這些數(shù)據(jù)塊進行任務(wù)的派發(fā))
5、Driver定時生成JobGenerator,根據(jù)DStream的關(guān)系生成邏輯RDD,然后創(chuàng)建Jobset,交給JobScheduler。
6、JobScheduler負責(zé)調(diào)度Jobset,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,
每個stage包含一到多個task。(我記得DAGScheduler會對任務(wù)做一層優(yōu)化)
7、TaskScheduler負責(zé)把task調(diào)度到Executor上,并維護task的運行狀態(tài)。
8、當tasks,stages,jobset完成后,單個batch(批處理)才算完成。
具體流程:
客戶端提交作業(yè)后啟動Driver,Driver是spark作業(yè)的Master。
每個作業(yè)包含多個Executor,每個Executor以線程的方式運行task,Spark Streaming至少包含一個receiver task。
Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報給Driver,然后備份到另外一個Executor上。
ReceiverTracker維護Reciver匯報的BlockId。
Driver定時啟動JobGenerator,根據(jù)Dstream的關(guān)系生成邏輯RDD,然后創(chuàng)建Jobset,交給JobScheduler。
JobScheduler負責(zé)調(diào)度Jobset,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,每個stage包含一到多個task。
TaskScheduler負責(zé)把task調(diào)度到Executor上,并維護task的運行狀態(tài)。
當tasks,stages,jobset完成后,單個batch才算完成。
--------------------------------------------------- 第十周 盧老師 講課筆記
Spark Streaming原理:
Spark Streaming是基于spark的流式批處理引擎,其基本原理是把輸入的數(shù)據(jù)以某一時間間隔批量的處理,當批處理縮短到秒級時,便可以用于處理實時數(shù)據(jù)流。
kafka
flume HDFS
hdfs -------> spark streaming --------> DataBase
zeromq | Dashboards
|
twitter |
|
input data streaming -->sparkstreaming-->batches of input data--->spark engine --->batches of processed data
live input stream --> spark Streaming --->t t t --->spark job --->results
sparkstreaming架構(gòu)
spark streaming作業(yè)流程:
Driver:
receivertracker
jobGenerator
JobScheduler
DagScheduler
TaskScheduler
BlockManagerMaster
Executor:
Receiver
BlockManagerSlave
Executor:
Task
BlockManagerSlave
運行機制:
1、客戶端提交作業(yè)后啟動一個Driver,Driver是spark作業(yè)的Master。
2、每個作業(yè)包含多個Executor,每個Executor以線程的方式運行Task,Spark Streaing至少包含一個receiver task。
3、Receiver接收數(shù)據(jù)后生成Block,并把Blockid匯報給Driver,然后備份到另外一個Executor上
4、ReceiverTracker維護Recivver匯報的BlockId。
5、Driver定時啟動JobGenerator,根據(jù)Dstream的關(guān)系生成邏輯RDD,然后創(chuàng)建JobSet,交給JobScheduler.
6、JobScheduler負責(zé)調(diào)度JobSet,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,每個Stages包含一個到多個Task。
7、TaskScheduler負責(zé)把task調(diào)度到Executor上,并維護task的運行狀態(tài)。
8、當tasks、stages、jobset完成后,單個batch才算完成。
Spark Streaming消費Kafka數(shù)據(jù):
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
Spark統(tǒng)計單詞次數(shù):
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
SparkSQL數(shù)據(jù)查詢:
// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
val url =
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
val df = sqlContext
.read
.format("jdbc")
.option("url", url)
.option("dbtable", "people")
.load()
// Looks the schema of this DataFrame.
df.printSchema()
// Counts people by age
val countsByAge = df.groupBy("age").count()
countsByAge.show()
// Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
Spark機器學(xué)習(xí)實現(xiàn)邏輯回歸算法:
// Every record of this DataFrame contains the label and
// features represented by a vector.
val df = sqlContext.createDataFrame(data).toDF("label", "features")
// Set parameters for the algorithm.
// Here, we limit the number of iterations to 10.
val lr = new LogisticRegression().setMaxIter(10)
// Fit the model to the data.
val model = lr.fit(df)
// Inspect the model: get the feature weights.
val weights = model.weights
// Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
-------------------------------------------------------------------------------------------------------
最后做一次復(fù)習(xí) , 有始有終。
-------在最下面增加 2行代碼。
如果有 幾千臺服務(wù)器, 必須配置免登錄, 生成的密鑰, 追加到這個文件中去。
hadoop 的官方網(wǎng)站, 有非常詳細的內(nèi)容。
多種方式都可以查看是不是啟動成功
50070端口, HDFS的端口
搭建完成后,
下面是 Yarn 的配置信息
需要 shuffle 的, 需要增加 各類 shuffle .
FIFO FAIR 先進先出 公平調(diào)度
上面是 Yarn 的配置文件, 這是第三周講的
第四周講 Hbase , 是 google 的大表實現(xiàn)
單機版和集群版的不同
第5周 、 第六周
一切階是對像
以上是 5周 Scala