node 采用了事件驱动机制,而EventEmitter 就是node实现事件驱动的基础。在EventEmitter的基础上,node 几乎所有的模块都继承了这个类,以实现异步事件驱动架构。继承了EventEmitter的模块,拥有了自己的事件,可以绑定/触发监听器,实现了异步操作。EventEmitter是node事件模型的根基,由EventEmitter为基础构建的事件驱动架构处处体现着异步编程的思想,因此,我们在构建node程序时也要遵循这种思想。EventEmitter实现的原理是观察者模式,这也是实现事件驱动的基本模式。本文将围绕EventEmitter,从中探讨它的原理观察者模式、体现的异步编程思想以及应用。

正文

events模块的EventEmitter类

node 的events模块只提供了一个EventEmitter类,这个类实现了node异步事件驱动架构的基本模式——观察者模式,提供了绑定事件和触发事件等事件监听器模式一般都会提供的API:

1
2
3
4
5
6
7
8
9
10
11
const EventEmitter = require('events')

class MyEmitter extends EventEmitter {}
const myEmitter = new MyEmitter()

function callback() {
console.log('触发了event事件!')
}
myEmitter.on('event', callback)
myEmitter.emit('event')
myEmitter.removeListener('event', callback);

只要继承EventEmitter类就可以拥有事件、触发事件等,所有能触发事件的对象都是 EventEmitter 类的实例。

而观察者模式(事件发布/订阅模式)就是实现EventEmitter类的基本原理,也是事件驱动机制基本模式。

事件驱动原理:观察者模式

在事件驱动系统里,事件是如何产生的?一个事件发生为什么能”自动”调用回调函数?我们先看看观察者模式。

观察者(Observer)模式是一种设计模式,应用场景是当一个对象的变化需要通知其他多个对象而且这些对象之间需要松散耦合时。在这种模式中,被观察者(主体)维护着一组其他对象派来(注册)的观察者,有新的对象对主体感兴趣就注册观察者,不感兴趣就取消订阅,主体有更新的话就依次通知观察者们。说猿话就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
function Subject() {
this.listeners = {}
}

Subject.prototype = {
// 增加事件监听器
addListener: function(eventName, callback) {
if(typeof callback !== 'function')
throw new TypeError('"listener" argument must be a function')

if(typeof this.listeners[eventName] === 'undefined') {
this.listeners[eventName] = []
}
this.listeners[eventName].push(callback) // 放到观察者对象中
},
// 取消监听某个回调
removeListener: function(eventName, callback) {
if(typeof callback !== 'function')
throw new TypeError('"listener" argument must be a function')
if(Array.isArray(this.listeners[eventName]) && this.listeners[eventName].length !== 0) {
var callbackList = this.listeners[eventName]
for (var i = 0, len=callbackList.length; i < len; i++) {
if(callbackList[i] === callback) {
this.listeners[eventName].splice(i,1) // 找到监听器并从观察者对象中删除
}
}

}
},
// 触发事件:在观察者对象里找到这个事件对应的回调函数队列,依次执行
triggerEvent: function(eventName,...args) {
if(this.listeners[eventName]) {
for(var i=0, len=this.listeners[eventName].length; i<len; i++){
this.listeners[eventName][i](...args)
}
}
}
}

OK,我们现在来添加监听器和发送事件:

1
2
3
4
5
6
7
8
var event = new Subject()
function hello() {
console.log('hello, there')
}
event.addListener('hello', hello)
event.triggerEvent('hello') // 输出 hello, there
event.removeListener('hello', hello) // 取消监听
setTimeout(() => event.triggerEvent('hello'),1000) // 过了一秒什么也没输出

在观察者模式中,注册的回调函数即事件监听器,触发事件调用各个回调函数即是发布消息。

你可以看到,观察者模式只不过维护一个信号对应函数的列表,可以存,可以除,你只要给它信号(索引),它就按照这个信号执行对应的函数,也就相当于间接调用了。那直接调用函数不就行了,干嘛写的那么拐弯抹角?刚才也说了,这是因为观察者模式能够解耦对象之间的关系,实现了表示层和数据逻辑层的分离,并定义了稳定的更新消息传递机制。

回到开始的问题,事件是如何产生又“自动”被调用的?是像上面那样当调用event.triggerEvent的时侯产生的吗?并不是,调用event.triggerEvent就相当于调用了回调函数,是事件执行过程,而事件产生过程则更多由底层来产生并通知给node的。我们拿node的全局变量 process来举例,process是EventEmitter的实例:

1
2
3
process.on('exit', (code) => {
console.log(`About to exit with code: ${code}`);
});

node执行时会在process的exit事件上绑定你指定的回调,相当于调用了上面的addListener,而当你退出进程时,你会发现你指定的函数被执行了,但是你没有手动调用触发exit事件的方法,也就是上面的triggerEvent,这是因为node底层帮你调用了——操作系统底层使这个进程退出了,node会得到这个信息,然后触发事先定义好的触发方法,回调函数就因此依次执行了。像这样的内置事件是node模块事先写好并开放出来的,使用时直接绑定回调函数即可,如果要自定义事件,那就得自己发送信号了。

上面代码实现了最基本的观察者模式,node 源码中EventEmitter的实现原理跟这差不多,除了这些还加入了其他有用的特性,而且各种实现都尽可能使用性能最好的方式(node源码真是处处反映着智慧的光芒)。

node中众多模块都继承了EventEmitter,比如文件模块系统下的FSWatcher

1
2
3
4
5
6
7
8
9
const EventEmitter = require('events')
const util = require('util')
...

function FSWatcher() {
EventEmitter.call(this);// 调用构造函数
...
}
util.inherits(FSWatcher, EventEmitter); // 继承 EventEmitter

其他模块也是如此。它们一同组成了node的异步事件驱动架构。

异步编程范式

可以看到,由于采用事件模型和异步I/O,node中大量模块的API采用了异步回调函数的方式,底层也处处体现了异步编程的方式。虽然异步也带来了很多问题——理解困难、回调嵌套过深、错误难以捕捉、多线程编程困难等,不过相比于异步带来的高性能,加上这些问题都有比较好的解决方案,异步编程范式还是很值得尝试的,尤其对于利用node构建应用程序的时候。

从最基本的回调函数开始

回调函数是异步编程的体现,而回调函数的实现离不开高阶函数。得益于javascript语言的灵活性,函数作为参数或返回值,而将函数作为参数或返回值的函数就是高阶函数:

1
2
3
4
5
6
7
8
9
10
function foo(x,bar) {
return bar(x)
}// 对于相同的foo,传进去不同的bar就有不同的操作结果

var arr = [2,3,4,5]
arr.forEach(function(item,index){
// do something for every item
}) // 数组的高阶函数

event.addListener('hello', hello) // 还有上面观察者模式实现的addListener

基于高阶函数的特性,就可以实现回调函数的模式。实际上,正式因为javascript函数用法非常灵活,才有高阶函数和众多设计模式。

采用事件发布/订阅模式(观察者模式)

单纯地使用高阶函数特性不足以构建简单、灵活、强大的异步编程模式的应用程序,我们需要从其他语言借鉴一些设计模式。就像上面提到的,node的events模块实现了事件发布/订阅模式,这是一种广泛用于异步编程的模式。它将回调函数事件化,将事件与各回调函数相关联,注册回调函数就是添加事件监听器,这些事件监听器可以很方便的添加、删除、被执行,使得事件和处理逻辑(注册的回调函数)之间轻松实现关联和解耦——事件发布者无需关注监听器是如何实现业务逻辑的,也不用关注有多少个事件监听器,只需按照消息执行即可,而且数据通过这种消息的方式可以灵活的传递。

不仅如此,这种模式还可以实现像类一样的对功能进行封装:将不变的逻辑封装在内部,将需要自定义、容易变化的部分通过事件暴露给外部定义。Node中很多对象大多都有这样黑盒子的特点,通过事件钩子,可以使使用者不用关注这个对象是如何启动的,只需关注自己关注的事件即可。

像大多数node核心模块一样,继承EventEmitter,我们就可以使用这种模式,帮助我们以异步编程方式构建node程序。

利用Promise

Promise是CommonJs发布的一个规范,它的出现给异步编程带来了方便。Promise所作的只是封装了异步调用、嵌套回调,使得原本复杂嵌套逻辑不清的回调变得优雅和容易理解。有了Promise的封装,你可以这样写异步调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
function fn1(resolve, reject) {
setTimeout(function() {
console.log('步骤一:执行');
resolve('1');
},500);
}

function fn2(resolve, reject) {
setTimeout(function() {
console.log('步骤二:执行');
resolve('2');
},100);
}

new Promise(fn1).then(function(val){
console.log(val);
return new Promise(fn2);
}).then(function(val){
console.log(val);
return 33;
}).then(function(val){
console.log(val);
});

那Promise是如何封装的呢?

首先,Promise经常用于处理异步、延时操作,为了放在then里面的”接下来要做的事“以正确的顺序被执行,Promise被设计为状态机,状态变化为pending => resolve(成功)、pending => reject(失败),而且,Promise还维护成功或失败时要执行的函数List,List中的回调正是Promise处在pending状态时将then中注册的回调push进去的;Promise内部有一个resolve和reject函数,分别在成功/失败时执行函数List,并且这两个函数会传递给回调函数,由用户决定什么时候resolve/reject;为了实现链式调用,then中返回的是promise:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
function getUserId() {
return new Promise(function(resolve, i) {
//异步请求
setTimeout(function(){
console.log('异步操作成功,下一步执行promise的'+i+'的resolve')
resolve('Fuck you Promise!', i)
},1000)
}, 1)
}

getUserId().then(function(words) {
console.log(words)
})

// 实现
function Promise(fn, i) {
var i = i
var state = 'pending'
var result = null
var promises = []
console.log('Promise' + i + 'constructing')

this.then = function(onFulfilled) {
console.log('then被调用')
return new Promise(function(resolve) {
console.log('返回一个promise')
handle({
onFulfilled: onFulfilled || null,
resolve: function(ret, i) {resolve(ret,i)}
})
},2)
}

function handle(promise) {
if(state === 'pending') {
console.log('promise' + i + '还在pending中')
promises.push(promise)
console.log('注册回调')
return
}

if(!promise.onFulfilled) {
console.log('回调为空,resolve结果')
promise.resolve(result, i)
return
}
console.log('执行回调')
var ret = promise.onFulfilled(result)
console.log('处理回调返回的值(可能是另一个promise)')
promise.resolve(ret, 2)

}

function resolve (newResult, i) {
console.log('执行promise' + i + '的resolve')
if(newResult && (typeof newResult === 'object' || typeof newResult === 'function')) {
console.log('then中注册的回调返回了promise')
var then = newResult.then
if(typeof then === 'function') {
console.log('调用then')
then.call(newResult, resolve)
}
}
console.log('设置promise' + i + '的状态为fulfilled')
state = 'fulfilled'
result = newResult
setTimeout(function(){
console.log('遍历promise' + i + '注册的回调执行')
console.log(promises[0])
promises.forEach(function(promise) {
handle(promise)
});
},0)

}
console.log('传resolve到promise' + i + '函数参数')
fn(resolve, i)
}

注意,这是Promise/A+规范的简单实现,还有reject原理一样的。我在这里为了更好的理解promise,不至于弄混乱,加入了标号,方便理解,Promise/A+规范里并没有。

实际上,node高版本已经支持promise了,可以直接使用,但不如Bluebird这类三方库快,而且Bluebird扩展了很多Promise/A+没有的方法。

使用第三方库Async/Step

async是著名的流程控制库,经常被npm install,它提供了20多个方法帮助我们处理异步协作模式。比如:

  • series ——异步任务的串行执行,就像Promise一样,只不过形式不同
  • parallel——异步任务并行执行,相当于Promise.all
  • waterfall——处理具有依赖关系的异步调用,比如前一个结果是后一个输入
  • auto——自动分析异步调用的依赖关系,参数是一个依赖关系对象

Step比async更轻量、更简单,只有一个接口Step, 在接口里可以调用Step提供的方法,功能与async差不多。

异步编程范式远不止这么多,还有很多重要的思想、设计模式,还有一些需要在实践中去发现、总结。

总结

EventEmitter提供的接口非常简单,但是它背后体现的思想贯穿了Node整个架构。Node不是第一个使用异步编程的平台,但异步架构在Node中处处体现,是Node设计的基本思想。在学习node时,透过现象看本质、深入浅出,是一个明智的方法,对待任何事物也是如此。

参考文献: