使用Jetty和DWR创建伸缩性Comet程序
异步服务器端事件驱动的Ajax程序很难实现,也很难获得伸缩性,Plilip McCarthy展示了一个有效的方式:
Comet模式允许您push数据到客户端,而且Jetty 6的Continuations API让您的Comet程序对大量客户端获得高可伸缩性。您可以方便的同DWR 2使用Comet和Continuations。
随着Ajax在Web程序开发技术里建立了牢固的位置,出现了几种常见的Ajax使用模式。例如,Ajax通常用于响应用户输入来使用新数据修改局部页面。但有时候,Web程序的用户界面需要根据偶尔的异步服务器端事件来更新,而不需要用户动作 -- 例如,在Ajax聊天程序里显示其他用户输入的一条新消息。由于Web浏览器和服务器间的HTTP连接只能由浏览器建立,服务器不能"推"更改数据到浏览器。
Ajax程序有两个解决该问题的基本方式:浏览器每隔几秒请求服务器来获得更改,或者服务器维持与浏览器的连接并且传递数据。长连接技术称为Comet。展示了怎样使用Jetty服务器引擎和DWR来实现简单而高效的Comet Web程序。
为什么要Comet?
然而,对Comet策略的呼唤来自它可感知的高效。客户端不会产生轮询方式特有的传输浪费,一旦事件发生,就会被发布到客户端。但是维持长连接也消耗了服务器资源。当servlet位置持久的请求在等候状态时,servlet独占一个线程。这样传统的servlet引擎就限制了Comet的伸缩性,因为客户端的数量会迅速超过服务器栈可以有效处理的线程的数量。
Jetty 6有什么不同
为了让它更简单,我将限制Jetty servlet引擎为一个单一的请求处理线程。列表1显示了相关的jetty.xml配置。事实上我需要允许在ThreadPool里总共有3个线程:Jetty服务器本身使用一个,HTTP连接器使用一个来监听进来的请求,最后剩一个线程来执行servlet代码。
代码
|
下一步,为了模仿异步事件,列表2显示了BlockingServlet的service()方法,它简单的使用Thread.sleep()调用来在完成前暂停2,000毫秒。同时它也在执行开始和结束时输出系统时间。为了帮助区分不同请求的输出,它也把一个请求参数作为标识符记录到日志。
列表2. BlockingServlet
代码 public class BlockingServlet extends HttpServlet { public void service(HttpServletRequest req, HttpServletResponse res) throws java.io.IOException { String reqId = req.getParameter("id"); res.setContentType("text/plain"); try { res.getWriter().println("Request: " + reqId + ""tend:"t" + new Date()); |
|||
现在您可以观察几个同步请求下servlet的行为。列表3显示了使用lynx的5个并行请求时控制台的输出。命令行简单的启动5个lynx进程,加上一个标识符序数到请求的URL。
代码 $ for i in 'seq 1 5' ; do lynx -dump localhost:8080/blocking?id=$i & done Request: 2 start: Sun Jul 01 12:32:31 BST 2007 Request: 3 start: Sun Jul 01 12:32:33 BST 2007 Request: 4 start: Sun Jul 01 12:32:35 BST 2007 Request: 5 start: Sun Jul 01 12:32:37 BST 2007 |
列表3的输出并不惊奇。由于Jetty只有一个线程来执行servlet的service()方法,Jetty将每个请求列队并按顺序服务。时间戳显示了在一个应答分派给一个请求(以及end消息)后,servlet开始处理下一个请求(下一个start消息)。所以即使所有的5个请求是同时发出的,最后的那个请求必须等待8秒才能得到处理。
现在,看看Jetty 6的Continuations特性在这种情形下是多么的有用。列表4显示了列表2的BlockingServlet使用Continuations API重写后的样子。我将在后面解释代码。
代码 public class ContinuationServlet extends HttpServlet { public void service(HttpServletRequest req, HttpServletResponse res) throws java.io.IOException { String reqId = req.getParameter("id"); Continuation cc = ContinuationSupport.getContinuation(req, null); res.setContentType("text/plain"); cc.suspend(2000); res.getWriter().println("Request: " + reqId + ""tend:"t" + new Date()); |
列表5显示了对ContinuationServlet作5个并发请求时的输出,可以和列表3比较一下。
列表5. 到ContinuationServlet的几个并发请求的输出
代码 $ for i in 'seq 1 5' ; do lynx -dump localhost:8080/continuation?id=$i & done Request: 3 start: Sun Jul 01 12:37:37 BST 2007 Request: 2 start: Sun Jul 01 12:37:37 BST 2007 Request: 5 start: Sun Jul 01 12:37:37 BST 2007 Request: 4 start: Sun Jul 01 12:37:37 BST 2007 |
在列表5里有两件重要的事情值得注意。首先,每个start消息出现了两次,暂时不要担心这点。其次,更重要的是,现在请求是并发处理的,没有排队。注意所有的start和end消息时间戳是一样的。因此,没有哪个请求耗时超多两秒,即使只有单一的servlet线程在运行。
深入Jetty的Continuations机制
暂停的请求停留在未决 Continuations队列里直到指定的过期时间,或者在它的Continuation上调用resume()方法。当任何一个条件触发时,请求会重新提交给servlet(通过filter链)。这样,整个请求被"重播"直到RetryRequest异常不再抛出,然后继续按正常情况执行。
列表5里的输出现在应该能理解了。对每个请求,按顺序进入到servlet的service()方法,start消息发送给应答,然后Continuation的suspend()方法保留servlet,然后释放线程来开始下一请求。所有的5个请求迅速运行service()方法的第一部分并马上进入暂停状态,所有的start消息在几毫秒内输出。两秒后,suspend()过期,第一个请求从未决队列里重新得到并重新提交给ContinuationServlet。start消息第二次输出,对suspend()方法的第二次调用立即返回,然后end消息被发送给应答。然后servlet代码执行下一个队列请求,以此类推。
所以,在BlockingServlet和ContinuationServlet两种情况下,请求被排入队列来访问单一的servlet线程。尽管如此,在BlockingServlet里的两秒钟暂停在servlet线程里执行时,ContinuationServlet的暂停发生于servlet外面的SelectChannelConnector里。ContinuationServlet全部的吞吐量会更高,因为servlet线程不会在sleep()调用时阻碍大多数时间。|||
让Continuations变得有用
一个resume()方法和一个suspend()方法配对。您可以认为它们是标准的Object wait()/notify()机制的Continuations等价物。即,suspend()维持一个Continuation直到过期或者另一个线程调用resume()。suspend()/resume()方法是使用Continuations实现真实的Comet风格服务的关键所在。基本的模式是从当前请求维持Continuation,调用suspend(),然后等待直到您的异步时间到达,然后调用resume()并生成应答。
但是,不像编程语言里真实的语言级continuations,如Scheme,或Java语言里的wait()/notify(),在Jetty Continuation上调用resume()并不意味着代码执行于它停止的确切位置。您已经看到,真正发生的是与Continuation相关的请求被重播。这导致两个问题:列表4的ContinuationServlet里代码不合需要的重新执行,以及丢失状态 -- 暂停时作用域里的任何东西都丢失了。
第一个问题的解决方案是isPending()方法,如果isPending()方法的返回值为true,这意味着suspend()在前面已经被调用过了,并且二次请求的执行不会再次接触suspend()方法。换句话说,给您的suspend()调用前的代码加上isPending()条件可以确保它只被执行一次。Continuation也提供了一个简单的机制来保持状态:putObject(Object)和getObject()方法。使用它们来维持一个context对象,这样当Continuation暂停时任何您需要维持的状态都可以得到保护。您也可以使用该机制作为一种在线程之间传递事件数据的方法,后面您将看到。
写一个基于Continuations的程序
图1. 显示GPS跟踪程序主要组件的类图
首先,该程序需要生成坐标的一些东西,这是RandomWalkGenerator的工作。从一个初始坐标开始,每次对它的私有方法generateNextCoord()的调用都从该位置随机走一步并返回一个GpsCoord对象。当初始化时,RandomWalkGenerator创建一个线程,该线程在随机间隔内调用generateNextCoorld()方法并发送生成的坐标给任何使用addListener()注册自己的CoordListener实例。
编辑推荐:
温馨提示:因考试政策、内容不断变化与调整,长理培训网站提供的以上信息仅供参考,如有异议,请考生以权威部门公布的内容为准! (责任编辑:长理培训)
点击加载更多评论>>