背景
本文给出了一个简单的计算图中每一个点的N度关系点集合的算法,也就是N跳关系。
之前通过学习和理解了一下GraphX的计算接口。
N度关系
目标:
在N轮里。找到某一个点的N度关系的点集合。实现思路:
1. 准备好边数据集。即”1 3”, “4, 1” 这种点关系。使用GraphLoader 的接口load成Graph
2. 初始化每一个Vertice的属性为空Map 3. 使用aggregateMessages把VerticeID和totalRounds传播出度点上,出度点把收集到的信息合成一个大Map 4. 更新后的Vertice与原图进行”Join”,更新图中的变化过的点属性 5. 反复步骤3和4,最后输出更新了N轮之后的有关系的Verticespark-shell下可运行的代码:
import org.apache.spark._import org.apache.spark.graphx._import org.apache.spark.rdd.RDDval friendsGraph = GraphLoader.edgeListFile(sc, "data/friends.txt")val totalRounds: Int = 3 // total N roundvar targetVerticeID: Long = 6 // target vertice// round onevar roundGraph = friendsGraph.mapVertices((id, vd) => Map())var roundVertices = roundGraph.aggregateMessages[Map[Long, Integer]]( ctx => { if (targetVerticeID == ctx.srcId) { // only the edge has target vertice should send msg ctx.sendToDst(Map(ctx.srcId -> totalRounds)) } }, _ ++ _)for (i <- 2 to totalRounds) { val thisRoundGraph = roundGraph.outerJoinVertices(roundVertices){ (vid, data, opt) => opt.getOrElse(Map[Long, Integer]()) } roundVertices = thisRoundGraph.aggregateMessages[Map[Long, Integer]]( ctx => { val iterator = ctx.srcAttr.iterator while (iterator.hasNext) { val (k, v) = iterator.next if (v > 1) { val newV = v - 1 ctx.sendToDst(Map(k -> newV)) ctx.srcAttr.updated(k, newV) } else { // do output and remove this entry } } }, (newAttr, oldAttr) => { if (oldAttr.contains(newAttr.head._1)) { // optimization to reduce msg oldAttr.updated(newAttr.head._1, 1) // stop sending this ever } else { oldAttr ++ newAttr } } )}val result = roundVertices.map(_._1).collect
数据和输出
2 14 11 26 37 37 66 73 74 31 66 1
Array(6, 1, 3, 7)
总结
实现的比較naive。还有很多能够优化的地方。
全文完 :)