转载自:https://yq.aliyun.com/articles/60194
摘要: 这篇文章的主旨在于让你了解Spark UI体系,并且能够让你有能力对UI进行一些定制化增强。在分析过程中,你也会深深的感受到Scala语言的魅力。
_ui = if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, _env.securityManager, appName, startTime = startTime))
} else
{
// For tests, do not enable the UI None
}// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind())
parkUI -> WebUITab -> WebUIPage
在SparkContext初始化的过程中,SparkUI会启动一个Jetty。而建立起Jetty 和WebUIPage的桥梁是org.apache.spark.ui.WebUI类,该类有个变量如下:
protected val handlers = ArrayBuffer[ServletContextHandler]()
这个org.eclipse.jetty.servlet.ServletContextHandler是标准的jetty容器的handler,而
protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
val renderHandler = createServletHandler(
pagePath,
(request: HttpServletRequest) => page.render(request),
securityManager,
basePath)
val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json", (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors")
ExecutorsTab会作为一个标签显示在Spark首页上。接着定义一个ExecutorsPage,作为标签页的呈现内容,并且通过
attachPage(new ExecutorsPage(this, threadDumpEnabled))
private[ui] class ExecutorsPage( parent: ExecutorsTab, threadDumpEnabled: Boolean)
extends WebUIPage("")
实现ExecutorsPage.render方法:
def render(request: HttpServletRequest): Seq[Node]
最后一步调用
SparkUIUtils.headerSparkPage("Executors (" + execInfo.size + ")", content, parent)
val execTable = <table class={UIUtils.TABLE_CLASS_STRIPED}>
<thead>
<th>Executor ID</th>
<th>Address</th>
<th>RDD Blocks</th>
<th><span data-toggle="tooltip" title={ToolTips.STORAGE_MEMORY}>Storage Memory</span>
</th>
<th>Disk Used</th>
<th>Active Tasks</th>
def initialize() {
attachTab(new JobsTab(this)) attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
那我们新增的该怎么办?其实也很简单啦,通过sparkContext获取到 sparkUI对象,然后调用attachTab方法即可完成,具体如下:
sc.ui.getOrElse { throw new SparkException("Parent SparkUI to attach this tab to not found!")}
.attachTab(new ExecutorsTab)
如果你是在spark-streaming里,则简单通过如下代码就能把你的页面页面添加进去:
ssc.start()
new KKTab(ssc).attach()
ssc.awaitTermination(
添加新的Tab可能会报错,scala报的错误比较让人困惑,可以试试加入下面依赖:
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId> <version>9.3.6.v20151106</version>
</dependency>
package org.apache.spark.streaming.ui2
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.ui2.KKTab._
import org.apache.spark.ui.{SparkUI, SparkUITab}
import org.apache.spark.{Logging, SparkException}
/**
* 1/1/16 WilliamZhu(allwefantasy@gmail.com)
*/
class KKTab(val ssc: StreamingContext)
extends SparkUITab(getSparkUI(ssc), "streaming2") with Logging {
private val STATIC_RESOURCE_DIR = "org/apache/spark/streaming/ui/static"
attachPage(new TTPage(this))
def attach() {
getSparkUI(ssc).attachTab(this)
getSparkUI(ssc).addStaticHandler(STATIC_RESOURCE_DIR, "/static/streaming")
}
def detach() {
getSparkUI(ssc).detachTab(this)
getSparkUI(ssc).removeStaticHandler("/static/streaming")
}
}
private[spark] object KKTab {
def getSparkUI(ssc: StreamingContext): SparkUI = {
ssc.sc.ui.getOrElse {
throw new SparkException("Parent SparkUI to attach this tab to not found!")
}
}
}
org.apache.spark.streaming.ui2.TTPage 如下
import org.apache.spark.Logging
import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.json4s.JsonAST.{JNothing, JValue}
import scala.xml.Node
/**
* 1/1/16 WilliamZhu(allwefantasy@gmail.com)
*/
private[spark] class TTPage(parent: KKTab)
extends WebUIPage("") with Logging {
override def render(request: HttpServletRequest): Seq[Node] = {
val content = <p>TTPAGE</p>
SparkUIUtils.headerSparkPage("TT", content, parent, Some(5000))
}
override def renderJson(request: HttpServletRequest): JValue = JNothing
}
记得添加上面提到的jetty依赖。
原文:https://www.cnblogs.com/itboys/p/9153130.html