流程控制模块可以简化业务代码的复杂度,比如在接受一个form表单时,你会去解析每个表单的含义,然后分门别类的对表单参数进行验证。这个时候如果可以写成流程的模式,应该是很好的。流程用的非常好的还是connect模块的use方法。中间件的使用,所有中间件都通过流程控制来实现功能。这样既优化了代码又做了相关的功能,中间件之间也比较独立,修改起来相当清晰
async
async是node流程控制模块。是异步编程的主要方式。这里主要对其使用方法和API进行简单的介绍。实现方式第一个例子后有提到
串行任务
比如有两个串行的任务,我需要两个都读取完毕之后再操作的话可以使用series。解决的问题就是回调金字塔。
var async = require('async'),
fs = require('fs');
async.series([
function(callback) {
fs.readFile('a.json', 'utf-8', callback);
},
function(callback) {
fs.readFile('b.txt', 'utf-8', callback);
}
],
function(err, results) {
console.log(results)
}
)
// 上面的方法等价于:
fs.readFile('a.json', 'utf-8', function(err, content) {
if (err) {
return callback(err);
}
fs.readFile('b.txt', 'utf-8', function(err, content) {
if (err) {
return callback(err);
}
callback(null, [content, data]);
})
});
这里的在读取文件方法readFile时的callback方法不是由使用者指定的。是有async通过高阶函数的方式注入的。其实是用于保存其运行结果的回调函数,其中一个报错就不再执行。
并行任务
并行任务可以使用parallel。
var async = require('async'),
fs = require('fs');
async.parallel([
function(callback) {
fs.readFile('a.json', 'utf-8', callback);
},
function(callback) {
fs.readFile('b.txt', 'utf-8', callback);
}
],
function(err, results) {
console.log(results)
}
)
上面的方法基本等价于并行读取两个文件,都读取完后执行回调。传入参数callback的使用也是高阶函数的一种应用方式。
串行执行的依赖处理
如果串行执行,但是要求后面的执行依赖前面的结果,这种情况下第一种方式就支持不了了。可以用到waterfall
var async = require('async'),
fs = require('fs');
async.waterfall([
function(callback) {
fs.readFile('a.json', 'utf-8', function(err, datas) {
callback(err, datas);
});
},
function(data1,callback) {
fs.readFile('b.txt', 'utf-8', function(err, datas) {
callback(err, datas);
});
}
],
function(err, results) {
console.log(results)
}
)
data1第二个方法接受的参数是上一个函数的callback传的参数,如果上一个方法没有err,则会掉用后面的function,并将上一个方法的参数传进去。依次下去。
混合依赖处理
但依赖不是依次进行的时候,比如A、B、C、D,C依赖A、B,D依赖C。下面的例子可以看出如何使用API完成这一方案。
var async = require('async'),
fs = require('fs');
async.auto({
A: function(callback) {
var err = null
console.log('l am A')
callback(err, 'a');
},
B: function(callback) {
var err = null
console.log('l am B')
callback(err, 'b');
},
C: ['A', 'B', function(callback, datas) {
var err = null
console.log(datas)
console.log('l am C')
callback(err, 'c');
}],
D: ['C', function(callback, datas) {
var err = null
console.log(datas)
console.log('l am D')
}]
})
// 执行得到下面的执行结果
// l am A
// l am B
// { A: 'a', B: 'b' }
// l am C
// { A: 'a', B: 'b', C: 'c' }
// l am D
限制并发数量
var async = require('async'),
fs = require('fs');
async.parallelLimit([
function(callback) {
fs.readFile('a.json', 'utf-8', callback);
},
function(callback) {
fs.readFile('b.txt', 'utf-8', callback);
}
], 1 ,
function(err, results) {
console.log(results)
}
)
和异步方法相比,这里会限制一步并发的数量,不会无限制的并发,内核数是固定的,能处理的并发是固定的。无限制的并发只会降低CPU的效率。
异步增加任务
var async = require('async'),
fs = require('fs');
var q = async.queue(function(file, callback){
fs.readFile(file, 'utf-8', callback);
}, 2);
q.drain = function(){
console.log('完成');
}
fs.readdirSync('.').forEach(function(file){
q.push(file, function(err, data){
console.log(data)
})
})
可以先创建一个公共处理类型的函数,然后动态向这个函数增加任务,增加任务这个过程的时候一定要是上个任务还没有执行结束。如果发现执行列表里面已经执行完了,就会调用一次drain。
step
step比async更轻量级,在API暴露方面也更具有一致性,因为只有一个接口Step。实现是内部有一个next方法。当前任务结束时继续调用下一个任务。并行任务其实中间又加了一个计数器,这个计数器用于控制并行的量。添加任务时计数器+1,计数器归零调用next
串行任务
var Step = require('step'),
fs = require('fs');
Step(
function() {
fs.readFile('a.json', 'utf-8', this);
},
function(err, aData) {
fs.readFile('b.txt', 'utf-8', this);
},
function(err, results) {
console.log(results)
}
);
将串行的任务都放进去,后一个方法能得到前一个方法的回调,参数能得到上一个方法的返回值。this其实是内部调用了next()方法,这个的使用和中间件类似。没有this将不会执行后续的操作。
并行任务
var Step = require('step'),
fs = require('fs');
Step(
function readFile() {
fs.readFile('a.json', 'utf-8', this.parallel());
fs.readFile('b.txt', 'utf-8', this.parallel());
fs.readFile('a.json', 'utf-8', this.parallel());
(function(callback){
process.nextTick(function(){
callback(null, 'a', 'b', 'c')
})
})(this.parallel())
},
function(err, results1, results2, results3, results4) {
console.log(err, results1, results2, results3, results4)
}
)
可以通过传入回调this.parallel()来告诉step需要等到所有任务完成时才能进行下一个任务。但是会有个问题,process.nextTick例子中,只会获取前两个参数,第一个参数是err。
结果分组
var Step = require('step'),
fs = require('fs');
Step(
function readFile() {
var group = this.group()
fs.readFile('a.json', 'utf-8', group());
fs.readFile('b.txt', 'utf-8', group());
fs.readFile('a.json', 'utf-8', group());
(function(callback){
process.nextTick(function(){
callback(null, 'a', 'b', 'c')
})
})(group())
},
function(err, results1, results2) {
console.log(err, results1, results2)
}
)
上面的方法其实就是将数据分组。现在除了results1其他都是undefined
bagpipe
bagpipe主要解决的是并发过高导致服务器过载的问题,用于限制并发量。实现上就是内部有一个列队,任务通过push方法传入列队,如果活跃调用小于并发,直接执行,如果大于则等空闲时再调用。使用方法如下:
var Bagpipe = require('bagpipe'),
fs = require('fs');
// 设定最大并发数为1
var bagpipe = new Bagpipe(2);
fs.readdirSync('.').forEach(function(file){
bagpipe.push(fs.readFile, file, 'utf-8', function(err, data) {
console.log(data)
});
})
还可以设置并发超限的值。大于限制时会有个等待列队,当等待列队长度大于100或者大于最大并发数的2倍时,Bagpipe对象将会触发它的full事件,该事件传递队列长度值。可以记一个日志,如果长期收到这个报警,可以考虑一下整体性能了。
var Bagpipe = require('bagpipe'),
fs = require('fs');
// 设定最大并发数为1
var bagpipe = new Bagpipe(2);
bagpipe.on('full', function (length) {
console.warn('底层系统处理不能及时完成,队列拥堵,目前队列长度为:' + length);
});
fs.readdirSync('.').forEach(function(file){
bagpipe.push(fs.readFile, file, 'utf-8', function(err, data) {
console.log(data)
});
})