流程控制模块

博客分类: 原创

流程控制模块

流程控制模块可以简化业务代码的复杂度,比如在接受一个form表单时,你会去解析每个表单的含义,然后分门别类的对表单参数进行验证。这个时候如果可以写成流程的模式,应该是很好的。流程用的非常好的还是connect模块的use方法。中间件的使用,所有中间件都通过流程控制来实现功能。这样既优化了代码又做了相关的功能,中间件之间也比较独立,修改起来相当清晰

async

asyncnode流程控制模块。是异步编程的主要方式。这里主要对其使用方法和API进行简单的介绍。实现方式第一个例子后有提到

串行任务

比如有两个串行的任务,我需要两个都读取完毕之后再操作的话可以使用series。解决的问题就是回调金字塔。

var async = require('async'),
    fs = require('fs');

async.series([
        function(callback) {
            fs.readFile('a.json', 'utf-8', callback);
        },
        function(callback) {
            fs.readFile('b.txt', 'utf-8', callback);
        }
    ],
    function(err, results) {
        console.log(results)
    }
)
// 上面的方法等价于:
fs.readFile('a.json', 'utf-8', function(err, content) {
    if (err) {
        return callback(err);
    }
    fs.readFile('b.txt', 'utf-8', function(err, content) {
        if (err) {
            return callback(err);
        }
        callback(null, [content, data]);
    })
});

这里的在读取文件方法readFile时的callback方法不是由使用者指定的。是有async通过高阶函数的方式注入的。其实是用于保存其运行结果的回调函数,其中一个报错就不再执行。

并行任务

并行任务可以使用parallel

var async = require('async'),
    fs = require('fs');

async.parallel([
        function(callback) {
            fs.readFile('a.json', 'utf-8', callback);
        },
        function(callback) {
            fs.readFile('b.txt', 'utf-8', callback);
        }
    ],
    function(err, results) {
        console.log(results)
    }
)

上面的方法基本等价于并行读取两个文件,都读取完后执行回调。传入参数callback的使用也是高阶函数的一种应用方式。

串行执行的依赖处理

如果串行执行,但是要求后面的执行依赖前面的结果,这种情况下第一种方式就支持不了了。可以用到waterfall

var async = require('async'),
    fs = require('fs');

async.waterfall([
        function(callback) {
            fs.readFile('a.json', 'utf-8', function(err, datas) {
                callback(err, datas);
            });
        },
        function(data1,callback) {
            fs.readFile('b.txt', 'utf-8', function(err, datas) {
                callback(err, datas);
            });
        }
    ],
    function(err, results) {
        console.log(results)
    }
)

data1第二个方法接受的参数是上一个函数的callback传的参数,如果上一个方法没有err,则会掉用后面的function,并将上一个方法的参数传进去。依次下去。

混合依赖处理

但依赖不是依次进行的时候,比如A、B、C、DC依赖A、BD依赖C。下面的例子可以看出如何使用API完成这一方案。

var async = require('async'),
    fs = require('fs');

async.auto({
    A: function(callback) {
        var err = null
        console.log('l am A')
        callback(err, 'a');
    },
    B: function(callback) {
        var err = null
        console.log('l am B')
        callback(err, 'b');
    },
    C: ['A', 'B', function(callback, datas) {
        var err = null
        console.log(datas)
        console.log('l am C')
        callback(err, 'c');
    }],
    D: ['C', function(callback, datas) {
        var err = null
        console.log(datas)
        console.log('l am D')
    }]
})
// 执行得到下面的执行结果
// l am A
// l am B
// { A: 'a', B: 'b' }
// l am C
// { A: 'a', B: 'b', C: 'c' }
// l am D

限制并发数量

var async = require('async'),
    fs = require('fs');

async.parallelLimit([
        function(callback) {
            fs.readFile('a.json', 'utf-8', callback);
        },
        function(callback) {
            fs.readFile('b.txt', 'utf-8', callback);
        }
    ], 1 ,
    function(err, results) {
        console.log(results)
    }
)

和异步方法相比,这里会限制一步并发的数量,不会无限制的并发,内核数是固定的,能处理的并发是固定的。无限制的并发只会降低CPU的效率。

异步增加任务

var async = require('async'),
    fs = require('fs');

var q = async.queue(function(file, callback){
    fs.readFile(file, 'utf-8', callback);
}, 2);

q.drain = function(){
    console.log('完成');
}

fs.readdirSync('.').forEach(function(file){
    q.push(file, function(err, data){
        console.log(data)
    })
})

可以先创建一个公共处理类型的函数,然后动态向这个函数增加任务,增加任务这个过程的时候一定要是上个任务还没有执行结束。如果发现执行列表里面已经执行完了,就会调用一次drain

step

stepasync更轻量级,在API暴露方面也更具有一致性,因为只有一个接口Step。实现是内部有一个next方法。当前任务结束时继续调用下一个任务。并行任务其实中间又加了一个计数器,这个计数器用于控制并行的量。添加任务时计数器+1,计数器归零调用next

串行任务

var Step = require('step'),
    fs = require('fs');

Step(
    function() {
        fs.readFile('a.json', 'utf-8', this);
    },
    function(err, aData) {
        fs.readFile('b.txt', 'utf-8', this);
    },
    function(err, results) {
        console.log(results)
    }
);

将串行的任务都放进去,后一个方法能得到前一个方法的回调,参数能得到上一个方法的返回值。this其实是内部调用了next()方法,这个的使用和中间件类似。没有this将不会执行后续的操作。

并行任务

var Step = require('step'),
    fs = require('fs');

Step(
    function readFile() {
        fs.readFile('a.json', 'utf-8', this.parallel());
        fs.readFile('b.txt', 'utf-8', this.parallel());
        fs.readFile('a.json', 'utf-8', this.parallel());
        (function(callback){
            process.nextTick(function(){
                callback(null, 'a', 'b', 'c')
            })
        })(this.parallel())
    },
    function(err, results1, results2, results3, results4) {
        console.log(err, results1, results2, results3, results4)
    }
)

可以通过传入回调this.parallel()来告诉step需要等到所有任务完成时才能进行下一个任务。但是会有个问题,process.nextTick例子中,只会获取前两个参数,第一个参数是err

结果分组

var Step = require('step'),
    fs = require('fs');

Step(
    function readFile() {
        var group = this.group()
        fs.readFile('a.json', 'utf-8', group());
        fs.readFile('b.txt', 'utf-8', group());
        fs.readFile('a.json', 'utf-8', group());
        (function(callback){
            process.nextTick(function(){
                callback(null, 'a', 'b', 'c')
            })
        })(group())
    },
    function(err, results1, results2) {
        console.log(err, results1, results2)
    }
)

上面的方法其实就是将数据分组。现在除了results1其他都是undefined

bagpipe

bagpipe主要解决的是并发过高导致服务器过载的问题,用于限制并发量。实现上就是内部有一个列队,任务通过push方法传入列队,如果活跃调用小于并发,直接执行,如果大于则等空闲时再调用。使用方法如下:

var Bagpipe = require('bagpipe'),
    fs = require('fs');

// 设定最大并发数为1
var bagpipe = new Bagpipe(2);
fs.readdirSync('.').forEach(function(file){
    bagpipe.push(fs.readFile, file, 'utf-8', function(err, data) {
        console.log(data)
    });
})

还可以设置并发超限的值。大于限制时会有个等待列队,当等待列队长度大于100或者大于最大并发数的2倍时,Bagpipe对象将会触发它的full事件,该事件传递队列长度值。可以记一个日志,如果长期收到这个报警,可以考虑一下整体性能了。

var Bagpipe = require('bagpipe'),
    fs = require('fs');

// 设定最大并发数为1
var bagpipe = new Bagpipe(2);
bagpipe.on('full', function (length) {
    console.warn('底层系统处理不能及时完成,队列拥堵,目前队列长度为:' + length);
});
fs.readdirSync('.').forEach(function(file){
    bagpipe.push(fs.readFile, file, 'utf-8', function(err, data) {
        console.log(data)
    });
})