没有好的思路,其实就应该先从参考文章说起。
1.可用的restful框架 2.高并发的设计 针对java和php,都可以同时处理多个http请求,但是如果使用nodejs,它是单线程的,该如何处理并发的请求呢?关于nodejs高并发的说法,其实是在nodejs处理io的时候,会用异步的方式,直接返回,然后后台去处理io请求,然后通过回调函数,将结果返回,这样才实现了串行处理的并发操作。
3.多线程 刚开始看nodejs多线程相关的内容,我有点莫名奇妙。创建一个线程使用new Worker,传递数据用postMessage(),问题是子线程如何接收数据呢?postMessage中可以传任何数据,假如我传递了postMessage(“Hello Word”),在子线程中如何接收这个字符串呢? 看了很多篇文章,都没人说到这个问题,是我太笨了吗?(一定是的)有说用workdata接收的,除了这个,还有其他的方法吗?
(1) 使用第一步,我就遇到了错误:
查看我的nodejs版本
发现了原来我的版本不支持啊( using-worker-threads-in-node-js )。
worker_threads became available in Node.js 10.5.0. Prior to Node.js 11.7.0, you could not access the module unless you started node with the --experimental-worker flag
那还有啥好说的,升级呗,反正我安装了nvm。
1 2 3 4 5 6 7 nvm list available nvm install 12.13.1 nvm use 12.13.1 node -v
一顿操作下来,nodejs版本就升级到了12.13.1(如果有全局安装的包,更换版本的时候,需要重新全局安装)。
(2) 经过尝试,我明白了,原来接收和发送数据,只有一个事件,那就是message事件,主线程使用postMessage(value)发送数据,worker线程通过parentPort.on(“message”,function(){})接收发送来的value,同理,子线程通过parentPort.postMessage(‘Hello world!’); 发送的数据,主线程通过worker.on(“message”)来接受。除了message事件,还有error事件,exit事件等。 主线程:index.js
1 2 3 4 5 const { Worker } = require ("worker_threads" );const worker = new Worker (`${__dirname} /worker/kringin_worker.js` );worker.postMessage ("hk" ); worker.on ('message' , (msg ) => { console .log (msg); });
子线程:worker.js
1 2 3 4 5 6 7 const {parentPort } = require ("worker_threads" );parentPort.on ("message" ,data => { console .log (data); }); parentPort.postMessage ('Hello world!' );
打印输出:
就是这么简单。
(3) 除了上面的方法,还有主进程在创建worker时,直接传入数据
1 2 3 const worker = new Worker (__filename, { workerData : "data" });
相应的worker线程就变成了workerData 接收数据。
1 2 3 const { workerData } = require ("worker_threads" );
4.获取多线程的处理结果 现在的需求是这样的,一个数组,数组中的每一个元素都要执行一段复杂耗时的操作,如何使用多线程的方式,将数组元素分组后,分别进行线程运算,最后将所有的运算结果统一接收,然后传递给前台呢?这就类似于同时处罚多个http响应,然后同时接受数据,是一个道理。 我也尝试过使用es6的async/awit语法,理论上是可以同时并行执行,然后一起返回结果的,但是,由于我对于async理解的还不到位,所以始终写不出来像样的代码。最后呢,我是通过计数器实现的,原理就是在主线程中顶一个全局计数器,当子线程执行完返回之后,将计数器加一,最后判断计数器和分块的线程数是否相同,如果相同,则证明所有线程已经执行完毕,就可以调用 req.send() 像客户端发送数据了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 server.post ('/api/kriging' , function (req, res, next ) { try { let url="http://120.27.147.231:8080/geoserver/proheng/ows?service=WFS&version=1.0.0&request=GetFeature&typeName=proheng%3Aall_factory_polygon&outputFormat=application%2Fjson" ; axios.get (url) .then (function (response ) { var data=response.data ; if (data){ let resultFeatureCollec= { "type" : "FeatureCollection" , "features" : [] }; resultFeatures=resultFeatureCollec.features ; var sd = new Date (); var sart=sd.getTime (); console .log ("主线程计算开始:" +moment ().format ("YYYY-MM-DD HH:mm:ss" )); let size=20 ; let chunk=Math .ceil (data.features .length /size); var featureCollection=data.features ; let chunkId=0 ; let resultFun=[]; for (let i=0 ;i<chunk;i++){ let s=i*size; let e=(i+1 )*size; if (e>=featureCollection.length ){ e=featureCollection.length ; } var chunkData=featureCollection.slice (s,e); resultFun.push (chunkData); const worker = new Worker (`${__dirname} /worker/kringin_worker.js` ); worker.postMessage (chunkData); worker.on ('message' , (result ) => { console .log ("子线程处理完成" ); resultFeatures=[...resultFeatures,...result]; chunkId++; if (chunkId == chunk){ let endt = new Date ().getTime (); console .log ('总插值时间:' + (endt- sart) / 1000 + '秒' ); resultFeatureCollec.features =resultFeatures; res.send (resultFeatureCollec); } worker.terminate (); }); worker.on ("error" ,err => { console .log (err); chunkId++; res.send (resultFeatureCollec); }) } } }) .catch (function (error ) { console .log (error); }); }catch (err){ console .log (err); return next (new errs.InternalError ("服务器发生错误!" )); } return next (); });
5.线程池 所谓线程池,就是好几个线程都放到一个池子里,那个空闲就用那个计算,就避免了每一次新建一个线程所进行的时间耗费了。关于线程池,nodejs好像也没有找到好用的解决方案。从网上找了人家写的线程池的例子,但是对于我的计算克里金插值来说,没有多大的效率提升。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 const path = require ('path' );const { Worker } = require ('worker_threads' );class WorkerPool { _workers = []; _activeWorkers = []; _queue = []; constructor (workerPath, numOfThreads ) { this .workerPath = workerPath; this .numOfThreads = numOfThreads; this .init (); } init ( ) { if (this .numOfThreads < 1 ) { throw new Error ('线程池最小线程数应为1' ); } for (let i = 0 ;i < this .numOfThreads ; i++) { const worker = new Worker (this .workerPath ); this ._workers [i] = worker; this ._activeWorkers [i] = false ; } } destroy ( ) { for (let i = 0 ; i < this .numOfThreads ; i++) { if (this ._activeWorkers [i]) { throw new Error (`${i} 号线程仍在工作中...` ); } this ._workers [i].terminate (); } } checkWorkers ( ) { for (let i = 0 ; i < this .numOfThreads ; i++) { if (!this ._activeWorkers [i]) { return i; } } return -1 ; } run (getData ) { return new Promise ((resolve, reject ) => { const restWorkerId = this .checkWorkers (); const queueItem = { getData, callback : (error, result ) => { if (error) { return reject (error); } return resolve (result); } } if (restWorkerId === -1 ) { this ._queue .push (queueItem); return null ; } this .runWorker (restWorkerId, queueItem); }) } async runWorker (workerId, queueItem ) { const worker = this ._workers [workerId]; this ._activeWorkers [workerId] = true ; const messageCallback = (result ) => { queueItem.callback (null , result); cleanUp (); }; const errorCallback = (error ) => { queueItem.callback (error); cleanUp (); }; const cleanUp = ( ) => { worker.removeAllListeners ('message' ); worker.removeAllListeners ('error' ); this ._activeWorkers [workerId] = false ; if (!this ._queue .length ) { return null ; } this .runWorker (workerId, this ._queue .shift ()); } worker.once ('message' , messageCallback); worker.once ('error' , errorCallback); worker.postMessage (queueItem.getData ); } } module .exports =WorkerPool ;
然后使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 const pool = new WorkerPool (path.join (__dirname, 'worker.js' ), 4 );const items = [...new Array (10 )].fill (null );Promise .all (items.map (async (_, i) => { const res = await pool.run (_); console .log (`任务${i} 完成结果:` , res); })).then (() => { console .log ('所有任务完成 !' ); pool.destroy (); });
至于worker中长什么样,就不贴了,每个人不一样,反正就是有parentPort.on和parentPort.postMessage就对了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 const {parentPort } = require ("worker_threads" );const krigingUtils=require ("../utils/KrigingUtils" );const moment=require ("moment" );parentPort.on ("message" ,data => { console .log ("子线程计算开始:" +moment ().format ("YYYY-MM-DD HH:mm:ss" )) let s=new Date ().getTime (); let count=data.length ; let resultFeatures=[]; for (let fi=0 ;fi<count;fi++){ let featureData=data[fi]; let extent=[100000000 ,100000000 ,-100000000 ,-100000000 ]; var coodinates=featureData.geometry .coordinates [0 ][0 ]; let cout=coodinates.length ; for (let i=0 ;i<cout;i++){ if (extent[0 ]>coodinates[i][0 ]) extent[0 ]=coodinates[i][0 ]; if (extent[1 ]>coodinates[i][1 ]) extent[1 ]=coodinates[i][1 ]; if (extent[2 ]<coodinates[i][0 ]) extent[2 ]=coodinates[i][0 ]; if (extent[3 ]<coodinates[i][1 ]) extent[3 ]=coodinates[i][1 ]; } let dataset = { "type" : "FeatureCollection" , "features" : [] }; for (let i = 0 ; i < 10 ; i++) { let feature={ "type" : "Feature" , "properties" : { "level" : Math .random ()*100 }, "geometry" : { "type" : "Point" , "coordinates" : [ extent[0 ] + Math .random () * (extent[2 ] - extent[0 ]),extent[1 ]+Math .random ()*(extent[3 ]-extent[1 ])] } }; dataset.features .push (feature); } var breaks = [0 , 0.1 , 0.2 , 0.3 , 0.4 , 0.5 , 0.6 , 0.7 , 0.8 , 0.9 , 1.0 ]; let isobands=krigingUtils.constructShape (dataset,"level" ,breaks,extent,featureData.geometry ); resultFeatures=[...resultFeatures,...isobands.features ]; } let e=new Date ().getTime (); console .log ('子线程执行时间:' + (e - s) / 1000 + '秒' ); parentPort.postMessage (resultFeatures); });
6.pm2部署nodejs pm2提供了集群的方式启动nodejs应用,而且多个应用之间共享监听端口,简直不能在方便了。 (1) 安装:npm install pm2 -g (2) 启动:pm2 start index.js -i max