val sc = new SparkContext(...)
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) // create an Accumulator[Int] initialized to 0
val callSigns = file.flatMap(line => {
if (line == ""){
blankLines += 1 // Add to the accumulator
}
line.split(" ")
})
callSigns.saveAsTextFile("output.txt")
println("Blank lines: " + blankLines.value)
注意只有在调用saveAsTextFile之后才能得到正确的count of blankLines,因为transfomation是lazy的。
An operation op is associative if (a op b) op c = a op (b op c) for all values a, b, and c.
# Look up the locations of the call signs on the # RDD contactCounts. We load a list of call sign # prefixes to country code to support this lookup.
signPrefixes = loadCallSignTable() def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes) count = sign_count[1] return (country, count) countryContactCounts = (contactCounts .map(processSignCount) .reduceByKey((lambda x, y: x+ y)))
上面的代码中,如果signPrefixes是一个很大的table,那么将该表从master传到每个slave将是很昂贵的。而且如果后期还要用到signPrefixes,它将会被再次发送到每个节点。通过将signPrefixes变成broadcast变量可以解决这个问题。如下:
// Look up the countries for each call sign for the
// contactCounts RDD. We load an array of call sign
// prefixes to country code to support this lookup.
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
val country = lookupInArray(sign, signPrefixes.value)
(country, count)
}.reduceByKey((x, y) => x + y)
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
原文:http://www.cnblogs.com/wttttt/p/6844918.html