Apache Spark with Pipeline and LDA

最近入坑Apache Spark,这个分布式框架让我知道了什么叫做大数据,以及在处理大数据之中所碰到的一些问题。首先说明一下语言当然用Scala 虽然一开始让人感觉比较反人类,但是经过一段时间的摸索发现真的挺好用的,甚至喜欢上了它,前提是不报错。

说一下工作环境:

  • Scala IED
  • Scala 2.10.6
  • Apache Spark 1.6.1
  • Apache Zepprlin

这些版本要对应起来,要不然吃不了兜着走。这里集群的管理工具是ambari,这个工具可以让你轻松的进行图像可视化。

ambai

这里我们用的是 Spark on Yarn 模式,其中进行提交任务又有两种模式,这里简单的说一句,这两种模式分别为

  • yarn-cluster : driver运行在container之中,所有的日志本地看不到,只能看到running的信息。
  • yarn-client : drivier运行在client中,可以看到所有的日志,测试比较方便。

机器学习分为两个比较大的步骤: 收集预处理数据;进行学习

收集预处理数据

这里当然选择到处都看得到的数据了,比如法国政府的 一些数据,https://www.data.gouv.fr/fr/datasets/boamp/ 这里好多数据,这里挑选他们的招标信息。这里选择2016年的,进行处理和学习。文件是tgz格式的,利用命令解压,最后得到一堆xml文件,对于这些文件可以利用 https://github.com/databricks/spark-xml 进行处理,处理方式很简单。
命令方式引用外部库
$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.10:0.4.0
简单的例子
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml

1
2
3
4
5
6
7
8
9
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df = sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "book")
.load("books.xml")
val selectedData = df.select("author", "_id")

这里了选择相应的表情进行选择,这里我选择“OBJET_COMPLET”和”LIBELLE”,当然路径可以是hdfs的路径,最后你会得到一个DataFrame的类型

就是一个数据表,这个数据表中包含着各种奇葩的数据,错别字等等都会再这里找到,还有null的情况,这些都是要处理掉然后进行学习,学习的目的是找到对应的TOPIC也就是主题。就直接上代码吧,建立一个scala的类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class LDAml() {
def lda(dataset: DataFrame, sc: SparkContext, inputCol: String,
numbTopic: Int, MaxIterations: Int,
vocabSize: Int) = {
val (documents, vocabArray, model) = preprocess(dataset, inputCol, sc, vocabSize)
val corpus = documents.cache() // use cache
val corpusSize = corpus.count()
/**
* Configure and run LDA
*/
val mbf = {
// add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets.
2.0 / MaxIterations + 1.0 / corpusSize
}
// running lda
val lda = new LDA()
.setK(numbTopic)
.setMaxIterations(MaxIterations)
.setOptimizer(new OnlineLDAOptimizer().setMiniBatchFraction(math.min(1.0, mbf))) //add optimizer
.setDocConcentration(-1) // use default symmetric document-topic prior
.setTopicConcentration(-1) // use default symmetric topic-word prior
/**
* Print results.
*/
val startTime = System.nanoTime()
val ldaModel = lda.run(corpus)
val elapsed = (System.nanoTime() - startTime) / 1e9
ldaModel.save(sc, sc.getConf.get("spark.client.ldamodelPath"))
/************************************************************************
* Print results. for Zeppelin
************************************************************************/
// Print training time
println(s"Finished training LDA model. Summary:")
println(s"Training time (sec)\t$elapsed")
println(s"==========")
// Print the topics, showing the top-weighted terms for each topic.
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 5)
val topics = topicIndices.map {
case (terms, termWeights) =>
terms.map(vocabArray(_)).zip(termWeights)
}
println(s"$numbTopic topics:")
topics.zipWithIndex.foreach {
case (topic, i) =>
println(s"TOPIC $i")
topic.foreach { case (term, weight) => println(s"$term\t$weight") }
println(s"==========")
}
}
def preprocess(dataset: DataFrame, inputCol: String, sc: SparkContext, vocabSize: Int): (RDD[(Long, Vector)], Array[String], PipelineModel) = {
val stopWordText1 = sc.textFile(sc.getConf.get("spark.client.stopWordText")).collect().flatMap(_.stripMargin.split("\\s+"))
val stopWordText2 = sc.textFile(sc.getConf.get("spark.client.stopWordText2")).collect().flatMap(_.stripMargin.split("\\s+"))
val data = dataset.na.drop()
// ----------------Pipeline stages---------------------------------------------
// - tokenizer-->stopWordsRemover1-->stemmer-->stopWordRemover2-->AccentRemover
// 简单分词->删除无用词汇->词根->删除无用词汇->删除重音符号
// ----------------------------------------------------------------------------
val tokenizer = new RegexTokenizer()
.setInputCol(inputCol)
.setPattern("[a-z0-9éèêâîûùäüïëô]+")
.setGaps(false)
.setOutputCol("rawTokens")
val stopWordsRemover1 = new StopWordsRemover()
.setInputCol("rawTokens")
.setOutputCol("tokens")
stopWordsRemover1.setStopWords(stopWordsRemover1.getStopWords ++ stopWordText1)
val stemmer = new Stemmer()
.setInputCol("tokens")
.setOutputCol("stemmed")
.setLanguage("French")
val stopWordsRemover2 = new StopWordsRemover()
.setInputCol("stemmed")
.setOutputCol("tokens2")
stopWordsRemover2.setStopWords(stopWordsRemover2.getStopWords ++ stopWordText2)
val accentRemover = new AccentRemover()
.setInputCol("tokens2")
.setOutputCol("mot")
val countVectorizer = new CountVectorizer()
.setVocabSize(vocabSize)
.setInputCol("mot")
.setOutputCol("features")
//------------------------------------------------------
//stage 0,1,2,3,4,5
val pipeline = new Pipeline().setStages(Array(
tokenizer, //stage 0
stopWordsRemover1, //1
stemmer, //2
stopWordsRemover2, //3
accentRemover, //4
countVectorizer //stage 5
))
// creates the PipeLineModel to use for the dataset transformation
val model = pipeline.fit(data)
// countVectorizer stage ==> 5
val vocabArray = model.stages(5).asInstanceOf[CountVectorizerModel].vocabulary
sc.parallelize(vocabArray).saveAsTextFile(sc.getConf.get("spark.client.vocab"))
val documents = model.transform(data)
.select("features")
.rdd
.map { case Row(features: Vector) => features }
.zipWithIndex()
.map(_.swap)
(documents, vocabArray, model)
}
}

最重要的步骤就是tokenizer-->stopWordsRemover1-->stemmer-->stopWordRemover2-->AccentRemover 处理完成之后,需要transform成dataframe的类型。
这样就可以使用LDA里面的算法进行筛选TOPIC了,这里我选择100个TOPIC迭代100次,这很花时间的。
最后会打印出来。每个TOPIC都是不同的,LDAOptimizer 有两种算法,一种是em算法,一种是online算法,这里选择online算法,因em算法的问题导致之前结果的TOPIC是重复的。

这个结果就可以进行预测了,每个TOPIC都是不同的。