进程进阶

博客分类: 原创

进程进阶

国庆节玩了一圈回来完全没有心思学习了怎么办,网络安全的后面的章节看的不是很仔细,只有看完再回头回去看了,现在整理好心绪。开始看看进程吧。目前的项目性能还是一个很大的问题。包括日志收集和数据分析。虽然这个是一个离线计算,但是还是觉得非常慢呢。

前面可以了解到,node是运行在单个进程的单个线程上。它带来的好处是:程序状态单一,没有多线程的锁和线程同步问题,也没有切换上下文的开销。可以更好的提高CPU的使用率,但是当面临多核服务器的时候,只能利用单核其实是非常浪费的。

进程和线程

首先想解释一下,自己一直没有搞懂的一个概念,线程与进程。

进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位,

线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。

一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行。

相对进程而言,线程是一个更加接近于执行体的概念,它可以与同进程中的其他线程共享数据,但拥有自己的栈空间,拥有独立的执行序列。

在串行程序基础上引入线程和进程是为了提高程序的并发度,从而提高程序运行效率和响应时间。

线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反。同时,线程适合于在SMP机器上运行,而进程则可以跨机器迁移。

区别:

  • 简而言之,一个程序至少有一个进程,一个进程至少有一个线程。
  • 线程的划分尺度小于进程,使得多线程程序的并发性高。
  • 另外,进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。
  • 线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
  • 从逻辑角度来看,多线程的意义在于一个应用程序中,有多个执行部分可以同时执行。但操作系统并没有将多个线程看做多个独立的应用,来实现进程的调度和管理以及资源分配。这就是进程和线程的重要区别。

node创建子进程

child_process提供了4个方法创建子进程。

  • spawn():启动一个子进程来执行命令。
  • exec():启动一个子进程来执行命令,与spawn()不同的是其接口不同,他有一个回调函数可以知道子进程的状况。
  • execFile():启动一个子进程来执行可执行文件。
  • fork():和spawn()类似,不同点在于它创建Node的子进程只需要指定要执行的javascript文件模块即可。

前面三个可以设置创建进程的超时设置。exec()适合执行已有的命令,而execFile()适合执行文件

进程间的通信

子进程对象通过send()方法实现主进程向子进程发送数据。message事件实现收听子进程发来的数据。下面是一个子进程与主进程之间通信的实例

// 主进程
var cp = require('child_process'),
    n = cp.fork('xxx.js');

n.on('message', function(msg){
    console.log('aaaaa' + msg);
})

n.send({a:1});

// 子进程
console.log('pid in worker:', process.pid);

process.on('message', function(msg) {
  console.log('3:', msg);
});

process.send('bbb');

// 启动主进程
pid in worker: 11730
3: { a: 1 }
aaaaabbb

应当注意的是,fork的子进程必须要是一个node进程,不然是不能进行通信的。而且这个通信是双向的通信。

可以通过进程间的通信做负载均衡。同一个端口

// 主进程
var cp = require('child_process'),
    n = cp.fork('xxx.js'),
    server = require('net').createServer();

server.on('connection', function(socket){
    socket.end('parent')
})

server.listen(3333, function(){
    n.send('server', server)
})

// 子进程
process.on('message', function(msg, server) {
    if(msg === 'server'){
        server.on('connection', function(socket){
            socket.end('child')
        })
    }
});

// 启动主进程
parent
child
child
parent

随机出现,访问同样的端口,父进程和子进程都有可能对请求进行处理。这个时候如果我们的主进程只用来调度,而真正执行的是子进程,就可以达到做负载均行的效果。

// 主进程
var cp = require('child_process'),
    n1 = cp.fork('xxx.js'),
    n2 = cp.fork('xxx.js'),
    server = require('net').createServer();

server.listen(3333, function(){
    n1.send('server', server);
    n2.send('server', server);
    server.close();
})

// 子进程
var http = require('http');

var server = http.createServer(function(req, res){
    res.writeHead(200)
    res.end('pid is' + process.pid)
})

process.on('message', function(msg, tcp) {
  if(msg === 'server'){
    tcp.on('connection', function(socket){
        server.emit('connection', socket)
    })
  }
});

// 启动主进程,并访问页面
pid is13360
pid is13359

这样可以实现共同监听一个端口,将接受到的请求分发给子进程,这里的解决方案不在于解决高并发,在高并发的时候,node的事件机制就已经可以缓解高并发了,因为没有等待时间,为什么是缓解,因为node始终是单进程的,如果服务器是单核的,这样其实已经做到node的极限了,但是如果服务器是多核的,我们可以通过主进程用于分发任务,子进程执行请求的方式,可以更高的利用多核服务器。

进程事件

除了上述的父子进程通信事件,还有下面这些事件:

  • error:当子进程无法被复制创建、无法被杀死、无法发送消息时会触发该事件。
  • exit:子进程退出时触发该时间,子进程如果是正常退出,这个事件的第一个参数是退出码,否则为 null,如果进程是通过 kill() 方法被杀死的,会得到第二个参数,它表示杀死进程时的信号。
  • close:在子进程的标注输入输出流中止时触发该事件,参数与 exit 相同。
  • disconnect:在父进程或子进程中调用 disconnect() 方法时触发该事件,在调用该方法时将关闭监听 IPC 通道。

自动重启

这个也可以做进程守护,当其中一个子进程被kill时,自动重启。当主进程退出时kill掉所有的进程。下面是写的一个主进程守护子进程的模式。

var fork = require('child_process').fork,
    cpus = require('os').cpus();

var server = require('net').createServer();
server.listen(3333);

var workers = {},
    createWorker = function(){
        var worker = fork('./xxx.js');
        worker.on('exit', function(){
            console.log('worker' + worker.pid + 'exit');
            delete workers[worker.pid];
            createWorker();
        });
        worker.send('server', server);
        worker[worker.pid] = worker;
        console.log('create worker pid' + worker.pid);
    }

for(var i=cpus.length; i--;){
    createWorker();
}

process.on('exit', function(){
    for(var pid in workers){
        workers[pid].kill();
    }
})

// 运行结果
create worker pid17091
create worker pid17092
create worker pid17093
create worker pid17094
create worker pid17095
create worker pid17096
create worker pid17097
create worker pid17098
17091
17092
17096
17098
17097
17093
17094
17095
// 强杀掉一个进程
sudo kill -9 17094
// 进程重启
worker17094exit
create worker pid17167
17167

进程超时和限量重启

一个连接过长和无限次的重启都使得程序的茁壮型得不到保证。

var fork = require('child_process').fork,
    cpus = require('os').cpus();

// 限制重启次数
var limit = 10,
    during = 60000,
    restart = [],
    server = require('net').createServer();
server.listen(3333);

function isTooFrequently(){
    var time = Date.now(),
        length = restart.push(time);
    if(length > limit){
        restart = restart.slice(limit * -1);
    }
    return restart.length >= limit && restart[restart.length - 1] - restart[0] < during;
}

var workers = {},
    createWorker = function(){
        if(isTooFrequently()){
            process.emit('giveup', length, during);
            return;
        }

        var worker = fork('./xxx.js');
        worker.on('exit', function(){
            console.log('worker' + worker.pid + 'exit');
            delete workers[worker.pid];
            createWorker();
        });
        worker.send('server', server);
        worker[worker.pid] = worker;
        console.log('create worker pid' + worker.pid);
    }

for(var i=cpus.length; i--;){
    createWorker();
}

process.on('exit', function(){
    for(var pid in workers){
        workers[pid].kill();
    }
})

process.on('uncaughtException', function(err){
    // 没有捕获异常就会退出
    setTimeout(function(){
        process.exit(1)
    }, 5000)
})

cluster

前面对进程的各方面的操作都有了一定的了解,通用性这么强的东西怎么能没有被模块化呢,上述对于进程的操作有一个非常好的模块叫进程模块clustercluster是一个nodejs内置的模块,用于nodejs多核处理。cluster模块,可以帮助我们简化多进程并行化程序的开发难度,轻松构建一个用于负载均衡的集群。具体参考cluster

一个主进程监听管理多个子进程

var cluster = require('cluster'),
    http = require('http'),
    numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log("master start...");

    // Fork workers.
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork('./xxx.js');
    }

    cluster.on('listening', function(worker, address) {
        console.log('listening: worker ' + worker.process.pid + ', Address: ' + address.address + ":" + address.port);
    });

    cluster.on('exit', function(worker, code, signal) {
        console.log('worker ' + worker.process.pid + ' died');
    });
} else {
    http.createServer(function(req, res) {
        res.writeHead(200);
        res.end("hello world\n");
    }).listen(0);
}
// 运行结果
master start...
listening: worker 1595, Address: null:52371
listening: worker 1602, Address: null:52371
listening: worker 1599, Address: null:52371
listening: worker 1600, Address: null:52371
listening: worker 1597, Address: null:52371
listening: worker 1596, Address: null:52371
listening: worker 1601, Address: null:52371
listening: worker 1598, Address: null:52371

进程间的通信

var cluster = require('cluster'),
    http = require('http'),
    numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log('[master] ' + "start master...");

    for (var i = 0; i < numCPUs; i++) {
        var wk = cluster.fork();
        wk.send('[master] ' + 'hi worker' + wk.id);
    }

    cluster.on('fork', function(worker) {
        console.log('[master] ' + 'fork: worker' + worker.id);
    });

    cluster.on('online', function(worker) {
        console.log('[master] ' + 'online: worker' + worker.id);
    });

    cluster.on('listening', function(worker, address) {
        console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
    });

    cluster.on('disconnect', function(worker) {
        console.log('[master] ' + 'disconnect: worker' + worker.id);
    });

    cluster.on('exit', function(worker, code, signal) {
        console.log('[master] ' + 'exit worker' + worker.id + ' died');
    });

    function eachWorker(callback) {
        for (var id in cluster.workers) {
            callback(cluster.workers[id]);
        }
    }

    setTimeout(function() {
        eachWorker(function(worker) {
            worker.send('[master] ' + 'send message to worker' + worker.id);
        });
    }, 3000);

    Object.keys(cluster.workers).forEach(function(id) {
        cluster.workers[id].on('message', function(msg) {
            console.log('[master] ' + 'message ' + msg);
        });
    });

} else if (cluster.isWorker) {
    console.log('[worker] ' + "start worker ..." + cluster.worker.id);

    process.on('message', function(msg) {
        console.log('[worker] ' + msg);
        process.send('[worker] worker' + cluster.worker.id + ' received!');
    });

    http.createServer(function(req, res) {
        res.writeHead(200, {
            "content-type": "text/html"
        });
        res.end('worker' + cluster.worker.id + ',PID:' + process.pid);
    }).listen(3000);
}

负载均衡

var cluster = require('cluster'),
    http = require('http'),
    numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log('[master] ' + "start master...");

    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    cluster.on('listening', function(worker, address) {
        console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
    });

} else if (cluster.isWorker) {
    console.log('[worker] ' + "start worker ..." + cluster.worker.id);
    http.createServer(function(req, res) {
        console.log('worker' + cluster.worker.id);
        res.end('worker' + cluster.worker.id + ',PID:' + process.pid);
    }).listen(3000);
}

API

cluster对象

属性:

  • cluster.setttings:配置集群参数对象
  • cluster.isMaster:判断是不是master节点
  • cluster.isWorker:判断是不是worker节点

事件:

  • Event: ‘fork’:监听创建worker进程事件
  • Event: ‘online’:监听worker创建成功事件
  • Event: ‘listening’:监听worker向master状态事件
  • Event: ‘disconnect’:监听worker断线事件
  • Event: ‘exit’:监听worker退出事件
  • Event: ‘setup’:监听setupMaster事件

方法:

  • cluster.setupMaster([settings]):设置集群参数
  • cluster.fork([env]):创建worker进程
  • cluster.disconnect([callback]):关闭worket进程
  • cluster.worker:获得当前的worker对象
  • cluster.workers:获得集群中所有存活的worker对象

worker对象

属性:

  • worker.id: 进程ID号
  • worker.process: ChildProcess对象
  • worker.suicide: 在disconnect()后,判断worker是否自杀

事件:

  • Event: ‘message’: 监听master和worker的message事件
  • Event: ‘online’: 监听指定的worker创建成功事件
  • Event: ‘listening’: 监听master向worker状态事件
  • Event: ‘disconnect’: 监听worker断线事件
  • Event: ‘exit’: 监听worker退出事件

方法:

  • worker.send(message, [sendHandle]): master给worker发送消息。注:worker给发master发送消息要用process.send(message)
  • worker.kill([signal=’SIGTERM’]): 杀死指定的worker,别名destory()
  • worker.disconnect(): 断开worker连接,让worker自杀