讓我們同樣以計(jì)算字?jǐn)?shù)為例子,在使用之前,使用shell命令。 在這里,我們考慮同樣 spark 應(yīng)用程序的例子。
下面的文字是輸入數(shù)據(jù),并命名該文件為 in.txt.
people are not as beautiful as they look, as they walk or as they talk. they are only as beautiful as they love, as they care as they share.
請看下面的程序 ?
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ object SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) /* local = master URL; Word Count = application name; */ /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ /* Map = variables to work nodes */ /*creating an inputRDD to read text file (in.txt) through Spark context*/ val input = sc.textFile("in.txt") /* Transform the inputRDD into countRDD */ valcount = input.flatMap(line ? line.split(" ")) .map(word ? (word, 1)) .reduceByKey(_ + _) /* saveAsTextFile method is an action that effects on the RDD */ count.saveAsTextFile("outfile") System.out.println("OK"); } }
保存上述程序到指定的文件 SparkWordCount.scala 并將其放置在一個(gè)用戶定義的目錄名為 spark-application.
注 ? 雖然轉(zhuǎn)化 inputRDD 成 countRDD 我們使用 flatMap() 用于標(biāo)記化(從文本文件),行成單詞, map() 方法統(tǒng)計(jì)詞頻和 reduceByKey() 方法計(jì)算每個(gè)單詞的重復(fù)。
使用以下步驟來提交應(yīng)用程序。通過終端在 spark-application目錄中執(zhí)行所有步驟。
Spark需要核心 jar 來編譯,因此,從下面的鏈接下載spark-core_2.10-1.3.0.jar 移動(dòng)下載 jar 的文件到 spark-application 應(yīng)用程序目錄。
使用下面給出的命令編譯上述程序。這個(gè)命令應(yīng)該在spark-application應(yīng)用程序目錄下執(zhí)行。這里,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar ,Spark 采用了 Hadoop 的 jar 支持程序。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
使用以下命令提交 spark 應(yīng)用 ?
spark-submit --class SparkWordCount --master local wordcount.jar
如果成功執(zhí)行,那么會(huì)發(fā)現(xiàn)有下面給出的輸出。在下面輸出的正常用戶識(shí)別,這是程序的最后一行。如果仔細(xì)閱讀下面的輸出,會(huì)發(fā)現(xiàn)不同的東西,比如 ?
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954] 15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s OK 15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 15/07/08 13:56:14 INFO Utils: Shutdown hook called 15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
$ cd outfile $ ls Part-00000 part-00001 _SUCCESS
part-00000 文件檢查輸出命令 ?
$ cat part-00000 (people,1) (are,2) (not,1) (as,8) (beautiful,2) (they, 7) (look,1)
part-00001文件查看輸出命令 ?
$ cat part-00001 (walk, 1) (or, 1) (talk, 1) (only, 1) (love, 1) (care, 1) (share, 1)
spark-submit [options] <app jar | python file> [app arguments]
S.No | 選項(xiàng) | 描述 |
---|---|---|
1 | --master | spark://host:port, mesos://host:port, yarn, 或 local. |
2 | --deploy-mode | 無論是在本地啟動(dòng)驅(qū)動(dòng)程序(client),或在工作人員的機(jī)器中的一個(gè)集群內(nèi) ("cluster") (默認(rèn): client) |
3 | --class |
應(yīng)用程序的主類(適用于 Java/Scala 的應(yīng)用程序)
|
4 | --name |
應(yīng)用程序的名稱
|
5 | --jars |
以逗號(hào)分隔本地 jar 列表包括驅(qū)動(dòng)器和執(zhí)行者類路徑
|
6 | --packages |
逗號(hào)分隔 jar 的 Maven 坐標(biāo)系列表,包括驅(qū)動(dòng)器和執(zhí)行者類路徑
|
7 | --repositories | 逗號(hào)分隔額外遠(yuǎn)程存儲(chǔ)庫列表搜索Maven給定的坐標(biāo),使用 --packages |
8 | --py-files |
用逗號(hào)分隔 .zip,.egg 或.py文件的列表放在Python路徑中的 Python 應(yīng)用程序
|
9 | --files |
逗號(hào)分隔放置在每一個(gè)執(zhí)行者的工作目錄中的文件的列表
|
10 | --conf (prop=val) |
任意 Spark 配置屬性
|
11 | --properties-file |
路徑從一個(gè)文件來加載額外屬性。如果沒有指定,這將在 conf/spark-defaults 尋找默認(rèn)值
|
12 | --driver-memory | 存儲(chǔ)驅(qū)動(dòng)程序 (e.g. 1000M, 2G) (默認(rèn): 512M) |
13 | --driver-java-options |
額外的Java選項(xiàng)傳遞給驅(qū)動(dòng)程序
|
14 | --driver-library-path |
額外的庫路徑條目傳遞給驅(qū)動(dòng)程序
|
15 | --driver-class-path |
額外的類路徑條目傳遞給驅(qū)動(dòng)程序
需要注意的是使用 --jars 添加 jar 會(huì)自動(dòng)包含在類路徑中
|
16 | --executor-memory | 每個(gè)執(zhí)行者的內(nèi)存(e.g. 1000M, 2G) (默認(rèn): 1G) |
17 | --proxy-user |
用戶在提交申請時(shí)模仿
|
18 | --help, -h |
顯示此幫助信息并退出
|
19 | --verbose, -v |
打印額外的調(diào)試輸出
|
20 | --version |
打印當(dāng)前 Spark 版本
|
21 | --driver-cores NUM |
核心驅(qū)動(dòng)程序(默認(rèn)值:1)
|
22 | --supervise |
如果給定,重新啟動(dòng)對(duì)故障的驅(qū)動(dòng)程序
|
23 | --kill |
如果給定,殺死指定的驅(qū)動(dòng)程序
|
24 | --status |
如果給定,請求指定的驅(qū)動(dòng)程序的狀態(tài)
|
25 | --total-executor-cores |
為所有執(zhí)行者的核心總數(shù)
|
26 | --executor-cores |
每執(zhí)行者內(nèi)核的數(shù)量。 (默認(rèn)值:1是YARN模式,或在獨(dú)立模式下,工人利用多內(nèi)核)
|