Nodejs服务端开发

标签: Nodejs 分类: Javascript 创建时间:2019-11-20 05:37:36 更新时间:2025-01-17 10:39:22

没有好的思路,其实就应该先从参考文章说起。

1.可用的restful框架

2.高并发的设计

针对java和php,都可以同时处理多个http请求,但是如果使用nodejs,它是单线程的,该如何处理并发的请求呢?关于nodejs高并发的说法,其实是在nodejs处理io的时候,会用异步的方式,直接返回,然后后台去处理io请求,然后通过回调函数,将结果返回,这样才实现了串行处理的并发操作。

3.多线程

刚开始看nodejs多线程相关的内容,我有点莫名奇妙。创建一个线程使用new Worker,传递数据用postMessage(),问题是子线程如何接收数据呢?postMessage中可以传任何数据,假如我传递了postMessage(“Hello Word”),在子线程中如何接收这个字符串呢?
看了很多篇文章,都没人说到这个问题,是我太笨了吗?(一定是的)有说用workdata接收的,除了这个,还有其他的方法吗?

(1) 使用第一步,我就遇到了错误:

查看我的nodejs版本

发现了原来我的版本不支持啊( using-worker-threads-in-node-js )。

worker_threads became available in Node.js 10.5.0. Prior to Node.js 11.7.0, you could not access the module unless you started node with the --experimental-worker flag

那还有啥好说的,升级呗,反正我安装了nvm。

1
2
3
4
5
6
7
nvm list available

nvm install 12.13.1

nvm use 12.13.1

node -v

一顿操作下来,nodejs版本就升级到了12.13.1(如果有全局安装的包,更换版本的时候,需要重新全局安装)。

(2) 经过尝试,我明白了,原来接收和发送数据,只有一个事件,那就是message事件,主线程使用postMessage(value)发送数据,worker线程通过parentPort.on(“message”,function(){})接收发送来的value,同理,子线程通过parentPort.postMessage(‘Hello world!’); 发送的数据,主线程通过worker.on(“message”)来接受。除了message事件,还有error事件,exit事件等。
主线程:index.js

1
2
3
4
5
const { Worker } = require("worker_threads");
//启动新的线程
const worker = new Worker(`${__dirname}/worker/kringin_worker.js`);
worker.postMessage("hk");
worker.on('message', (msg) => { console.log(msg); });

子线程:worker.js

1
2
3
4
5
6
7
const {parentPort } = require("worker_threads");

parentPort.on("message",data=>{
console.log(data);
});
parentPort.postMessage('Hello world!');

打印输出:

就是这么简单。

(3) 除了上面的方法,还有主进程在创建worker时,直接传入数据

1
2
3
const worker = new Worker(__filename, {
workerData: "data"
});

相应的worker线程就变成了workerData 接收数据。

1
2
3

const { workerData } = require("worker_threads");

4.获取多线程的处理结果

现在的需求是这样的,一个数组,数组中的每一个元素都要执行一段复杂耗时的操作,如何使用多线程的方式,将数组元素分组后,分别进行线程运算,最后将所有的运算结果统一接收,然后传递给前台呢?这就类似于同时处罚多个http响应,然后同时接受数据,是一个道理。
我也尝试过使用es6的async/awit语法,理论上是可以同时并行执行,然后一起返回结果的,但是,由于我对于async理解的还不到位,所以始终写不出来像样的代码。最后呢,我是通过计数器实现的,原理就是在主线程中顶一个全局计数器,当子线程执行完返回之后,将计数器加一,最后判断计数器和分块的线程数是否相同,如果相同,则证明所有线程已经执行完毕,就可以调用 req.send() 像客户端发送数据了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
server.post('/api/kriging', function (req, res, next) {
try{

let url="http://120.27.147.231:8080/geoserver/proheng/ows?service=WFS&version=1.0.0&request=GetFeature&typeName=proheng%3Aall_factory_polygon&outputFormat=application%2Fjson";
// 从服务器获取厂区数据
axios.get(url)
.then(function (response) {
var data=response.data;
if(data){
// 保存结果
let resultFeatureCollec= {
"type" : "FeatureCollection",
"features" : []
};
resultFeatures=resultFeatureCollec.features;

//计算开始时间
var sd = new Date();
var sart=sd.getTime();
console.log("主线程计算开始:"+moment().format("YYYY-MM-DD HH:mm:ss"));

// 将数据分块
let size=20;
let chunk=Math.ceil(data.features.length/size);
var featureCollection=data.features;
let chunkId=0; // 子块的标志位

let resultFun=[];
for(let i=0;i<chunk;i++){
let s=i*size;
let e=(i+1)*size;
if(e>=featureCollection.length){
e=featureCollection.length;
}

var chunkData=featureCollection.slice(s,e);
resultFun.push(chunkData);

//启动新的线程
const worker = new Worker(`${__dirname}/worker/kringin_worker.js`);
// console.log(data.features);
worker.postMessage(chunkData);
worker.on('message', (result) => {
console.log("子线程处理完成");
resultFeatures=[...resultFeatures,...result];
chunkId++;
if(chunkId == chunk){

// 计算结束时间
let endt = new Date().getTime();
console.log('总插值时间:' + (endt- sart) / 1000 + '秒');

resultFeatureCollec.features=resultFeatures;
res.send(resultFeatureCollec);
}
worker.terminate();
});
worker.on("error",err=>{
console.log(err);
chunkId++;
res.send(resultFeatureCollec);
})
}
}
})
.catch(function (error) {
console.log(error);
});

}catch(err){
console.log(err);
return next(new errs.InternalError("服务器发生错误!"));
}
return next();
});

5.线程池

所谓线程池,就是好几个线程都放到一个池子里,那个空闲就用那个计算,就避免了每一次新建一个线程所进行的时间耗费了。关于线程池,nodejs好像也没有找到好用的解决方案。从网上找了人家写的线程池的例子,但是对于我的计算克里金插值来说,没有多大的效率提升。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// main.js
const path = require('path');
const { Worker } = require('worker_threads');

class WorkerPool {
_workers = []; // 线程引用数组
_activeWorkers = []; // 激活的线程数组
_queue = []; // 任务队列

constructor(workerPath, numOfThreads) {
this.workerPath = workerPath;
this.numOfThreads = numOfThreads;
this.init();
}

// 初始化多线程
init() {
if (this.numOfThreads < 1) {
throw new Error('线程池最小线程数应为1');
}

for (let i = 0;i < this.numOfThreads; i++) {
const worker = new Worker(this.workerPath);

this._workers[i] = worker;
this._activeWorkers[i] = false;
}
}

// 结束线程池中所有线程
destroy() {
for (let i = 0; i < this.numOfThreads; i++) {
if (this._activeWorkers[i]) {
throw new Error(`${i}号线程仍在工作中...`);
}
this._workers[i].terminate();
}
}

// 检查是否有空闲worker
checkWorkers() {
for (let i = 0; i < this.numOfThreads; i++) {
if (!this._activeWorkers[i]) {
return i;
}
}

return -1;
}

run(getData) {
return new Promise((resolve, reject) => {
const restWorkerId = this.checkWorkers();

const queueItem = {
getData,
callback: (error, result) => {
if (error) {
return reject(error);
}
return resolve(result);
}
}

// 线程池已满,将任务加入任务队列
if (restWorkerId === -1) {
this._queue.push(queueItem);
return null;
}

// 空闲线程执行任务
this.runWorker(restWorkerId, queueItem);
})
}

async runWorker(workerId, queueItem) {
const worker = this._workers[workerId];
this._activeWorkers[workerId] = true;

// 线程结果回调
const messageCallback = (result) => {
queueItem.callback(null, result);
cleanUp();
};

// 线程错误回调
const errorCallback = (error) => {
queueItem.callback(error);
cleanUp();
};

// 任务结束消除旧监听器,若还有待完成任务,继续完成
const cleanUp = () => {
worker.removeAllListeners('message');
worker.removeAllListeners('error');

this._activeWorkers[workerId] = false;

if (!this._queue.length) {
return null;
}

this.runWorker(workerId, this._queue.shift());
}

// 线程创建监听结果/错误回调
worker.once('message', messageCallback);
worker.once('error', errorCallback);
// 向子线程传递初始data
worker.postMessage(queueItem.getData);
}
}
module.exports=WorkerPool;

然后使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
const pool = new WorkerPool(path.join(__dirname, 'worker.js'), 4);

const items = [...new Array(10)].fill(null);

Promise.all(items.map(async (_, i) => {
const res = await pool.run(_);

console.log(`任务${i}完成结果:`, res);
})).then(() => {
console.log('所有任务完成 !');
// 销毁线程池
pool.destroy();
});

至于worker中长什么样,就不贴了,每个人不一样,反正就是有parentPort.on和parentPort.postMessage就对了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
const {parentPort } = require("worker_threads");
const krigingUtils=require("../utils/KrigingUtils");
const moment=require("moment");

parentPort.on("message",data=>{

console.log("子线程计算开始:"+moment().format("YYYY-MM-DD HH:mm:ss"))
let s=new Date().getTime();

let count=data.length;
let resultFeatures=[];
for(let fi=0;fi<count;fi++){
let featureData=data[fi];

let extent=[100000000,100000000,-100000000,-100000000];
// 数据点范围边界
var coodinates=featureData.geometry.coordinates[0][0];
let cout=coodinates.length;
for(let i=0;i<cout;i++){
if(extent[0]>coodinates[i][0])
extent[0]=coodinates[i][0];
if(extent[1]>coodinates[i][1])
extent[1]=coodinates[i][1];
if(extent[2]<coodinates[i][0])
extent[2]=coodinates[i][0];
if(extent[3]<coodinates[i][1])
extent[3]=coodinates[i][1];
}
//随机生成一些离散点
let dataset = {
"type" : "FeatureCollection",
"features" : []
};

//创建10个位置随机、属性值随机的特征点
for (let i = 0; i < 10; i++) {
let feature={
"type" : "Feature",
"properties" : {
"level" : Math.random()*100
},
"geometry" : {
"type" : "Point",
"coordinates" : [ extent[0] + Math.random() * (extent[2] - extent[0]),extent[1]+Math.random()*(extent[3]-extent[1])]
}
};
dataset.features.push(feature);
}
var breaks = [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0];
// 计算插值
let isobands=krigingUtils.constructShape(dataset,"level",breaks,extent,featureData.geometry);
resultFeatures=[...resultFeatures,...isobands.features];
}
let e=new Date().getTime();
console.log('子线程执行时间:' + (e - s) / 1000 + '秒');

parentPort.postMessage(resultFeatures);
});

6.pm2部署nodejs

pm2提供了集群的方式启动nodejs应用,而且多个应用之间共享监听端口,简直不能在方便了。
(1) 安装:npm install pm2 -g
(2) 启动:pm2 start index.js -i max

小额赞助
本人提供免费与付费咨询服务,感谢您的支持!赞助请发邮件通知,方便公布您的善意!
**光 3.01 元
Sun 3.00 元
bibichuan 3.00 元
微信公众号
广告位
诚心邀请广大金主爸爸洽谈合作
每日一省
isNaN 和 Number.isNaN 函数的区别?

1.函数 isNaN 接收参数后,会尝试将这个参数转换为数值,任何不能被转换为数值的的值都会返回 true,因此非数字值传入也会返回 true ,会影响 NaN 的判断。

2.函数 Number.isNaN 会首先判断传入参数是否为数字,如果是数字再继续判断是否为 NaN ,不会进行数据类型的转换,这种方法对于 NaN 的判断更为准确。

每日二省
为什么0.1+0.2 ! == 0.3,如何让其相等?

一个直接的解决方法就是设置一个误差范围,通常称为“机器精度”。对JavaScript来说,这个值通常为2-52,在ES6中,提供了Number.EPSILON属性,而它的值就是2-52,只要判断0.1+0.2-0.3是否小于Number.EPSILON,如果小于,就可以判断为0.1+0.2 ===0.3。

每日三省
== 操作符的强制类型转换规则?

1.首先会判断两者类型是否**相同,**相同的话就比较两者的大小。

2.类型不相同的话,就会进行类型转换。

3.会先判断是否在对比 null 和 undefined,是的话就会返回 true。

4.判断两者类型是否为 string 和 number,是的话就会将字符串转换为 number。

5.判断其中一方是否为 boolean,是的话就会把 boolean 转为 number 再进行判断。

6.判断其中一方是否为 object 且另一方为 string、number 或者 symbol,是的话就会把 object 转为原始类型再进行判断。

每日英语
Happiness is time precipitation, smile is the lonely sad.
幸福是年华的沉淀,微笑是寂寞的悲伤。