IO.js Cluster

2018-11-28 22:33 更新

穩(wěn)定度: 2 - 穩(wěn)定

單個(gè)的io.js實(shí)例運(yùn)行在單線程上。為了享受多核系統(tǒng)的優(yōu)勢(shì),用戶需要啟動(dòng)一個(gè)io.js集群來處理負(fù)載。

cluster模塊允許你方便地創(chuàng)建共享服務(wù)器端口的子進(jìn)程:

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  });
} else {
  // Workers can share any TCP connection
  // In this case its a HTTP server
  http.createServer(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(8000);
}

啟動(dòng)io.js將會(huì)在工作線程中共享8000端口:

% NODE_DEBUG=cluster iojs server.js
23521,Master Worker 23524 online
23521,Master Worker 23526 online
23521,Master Worker 23523 online
23521,Master Worker 23528 online

這個(gè)特性是最近才開發(fā)的,并且可能在未來有所改變。請(qǐng)?jiān)囉盟⑻峁┓答仭?/p>

注意,在Windows中,在工作進(jìn)程中建立命名管道服務(wù)器目前是不可行的。

工作原理

工作進(jìn)程通過child_process.fork方法被創(chuàng)建,所以它們可以與父進(jìn)程通過IPC管道溝通以及相互傳遞服務(wù)器句柄。

集群模式支持兩種分配傳入連接的方式。

第一種(并且是除了Windows平臺(tái)外默認(rèn)的方式)是循環(huán)式。主進(jìn)程監(jiān)聽一個(gè)端口,接受新連接,并且以輪流的方式分配給工作進(jìn)程,并且以一些內(nèi)建機(jī)制來避免一個(gè)工作進(jìn)程過載。

第二種方式是,主進(jìn)程建立監(jiān)聽socket并且將它發(fā)送給感興趣的工作進(jìn)程。工作進(jìn)程直接接受傳入的連接。

第二種方式理論上有最好的性能。但是在實(shí)踐中,操作系統(tǒng)的調(diào)度不可預(yù)測(cè),分配往往十分不平衡。負(fù)載曾被觀察到8個(gè)進(jìn)程中,超過70%的連接結(jié)束于其中的2個(gè)進(jìn)程。

因?yàn)?code>server.listen()將大部分工作交給了主進(jìn)程,所以一個(gè)普通的io.js進(jìn)程和一個(gè)集群工作進(jìn)程會(huì)在三種情況下有所區(qū)別:

  1. server.listen({fd: 7}) 因?yàn)橄⒈粋鬟f給了主進(jìn)程,主進(jìn)程的文件描述符7會(huì)被監(jiān)聽,并且句柄會(huì)被傳遞給工作進(jìn)程而不是監(jiān)聽工作進(jìn)程中文件描述符為7的東西。

  2. server.listen(handle) 明確地監(jiān)聽句柄,會(huì)讓工作進(jìn)程使用給定的句柄,而不是與主進(jìn)程通信。如果工作進(jìn)程已經(jīng)有了此句柄,那么將假設(shè)你知道你在做什么。

  3. server.listen(0) 通常,這會(huì)導(dǎo)致服務(wù)器監(jiān)聽一個(gè)隨機(jī)端口。但是,在集群中,每次調(diào)用listen(0)時(shí),每一個(gè)工作進(jìn)程會(huì)收到同樣的“隨機(jī)”端口。也就是說,端口只是在第一次方法被調(diào)用時(shí)是隨機(jī)的,但在之后是可預(yù)知的。如果你想監(jiān)聽特定的端口,則根據(jù)工作進(jìn)程的PID來生成端口號(hào)。

由于在io.js或你的程序中的工作進(jìn)程間沒有路由邏輯也沒有共享的狀態(tài)。所以,請(qǐng)不要為你的程序設(shè)計(jì)成依賴太重的內(nèi)存數(shù)據(jù)對(duì)象,如設(shè)計(jì)會(huì)話和登陸時(shí)。

因?yàn)楣ぷ鬟M(jìn)程都是獨(dú)立的進(jìn)程,它們可以根據(jù)你程序的需要被殺死或被創(chuàng)建,并且并不會(huì)影響到其他工作進(jìn)程。只要有活躍的工作進(jìn)程,那么服務(wù)器就會(huì)繼續(xù)接收連接。但是io.js不會(huì)自動(dòng)地為你管理工作進(jìn)程數(shù)。所以根據(jù)你的應(yīng)用需求來管理工作進(jìn)程池是你的責(zé)任。

cluster.schedulingPolicy

調(diào)度策略,選擇cluster.SCHED_RR來使用循環(huán)式,或選擇cluster.SCHED_NONE來由操作系統(tǒng)處理。這是一個(gè)全局設(shè)定,并且在你第一次啟動(dòng)了一個(gè)工作進(jìn)程或調(diào)用cluster.setupMaster()方法后就不可再更改。

SCHED_RR是除了Windows外其他操作系統(tǒng)中的默認(rèn)值。一旦libuv能夠有效地分配IOCP句柄并且沒有巨大的性能損失,那么Windows下的默認(rèn)值也會(huì)變?yōu)樗?/p>

cluster.schedulingPolicy也可以通過環(huán)境變量NODE_CLUSTER_SCHED_POLICY來設(shè)定。合法值為rrnone

cluster.settings

  • Object

  • execArgv Array 傳遞給io.js執(zhí)行的字符串參數(shù)(默認(rèn)為process.execArgv
  • exec String 工作進(jìn)程文件的路徑(默認(rèn)為process.argv[1]
  • args Array 傳遞給工作進(jìn)程的字符串參數(shù)(默認(rèn)為process.argv.slice(2)
  • silent Boolean 是否將工作進(jìn)程的輸出傳遞給父進(jìn)程的stdio(默認(rèn)為false
  • uid Number 設(shè)置用戶進(jìn)程的ID
  • gid Number 設(shè)置進(jìn)程組的ID

在調(diào)用.setupMaster()(或.fork())方法之后,這個(gè)settings對(duì)象會(huì)存放方法的配置,包括默認(rèn)值。

因?yàn)?code>.setupMaster()僅能被調(diào)用一次,所以這個(gè)對(duì)象被設(shè)置后便不可更改。

這個(gè)對(duì)象不應(yīng)由你來手工更改或設(shè)置。

cluster.isMaster

  • Boolean

如果進(jìn)程是主進(jìn)程則返回true。這由process.env.NODE_UNIQUE_ID決定。如果process.env.NODE_UNIQUE_IDundefined,那么就返回true

cluster.isWorker

  • Boolean

如果進(jìn)程不是主進(jìn)程則返回true。

Event: 'fork'

  • worker Worker object

當(dāng)一個(gè)新的工作進(jìn)程由cluster模塊所開啟時(shí)會(huì)觸發(fā)fork事件。這能被用來記錄工作進(jìn)程活動(dòng)日志,或創(chuàng)建自定義的超時(shí)。

var timeouts = [];
function errorMsg() {
  console.error("Something must be wrong with the connection ...");
}

cluster.on('fork', function(worker) {
  timeouts[worker.id] = setTimeout(errorMsg, 2000);
});
cluster.on('listening', function(worker, address) {
  clearTimeout(timeouts[worker.id]);
});
cluster.on('exit', function(worker, code, signal) {
  clearTimeout(timeouts[worker.id]);
  errorMsg();
});

Event: 'online'

  • worker Worker object

當(dāng)創(chuàng)建了一個(gè)新的工作線程后,工作線程必須響應(yīng)一個(gè)在線信息。當(dāng)主進(jìn)程接收到在線信息后它會(huì)觸發(fā)這個(gè)事件。forkonline事件的區(qū)別在于:fork是主進(jìn)程創(chuàng)建了工作進(jìn)程后觸發(fā),online是工作進(jìn)程開始運(yùn)行時(shí)觸發(fā)。

cluster.on('online', function(worker) {
  console.log("Yay, the worker responded after it was forked");
});

Event: 'listening'

  • worker Worker object
  • address Object

當(dāng)工作進(jìn)程調(diào)用listen()方法。服務(wù)器會(huì)觸發(fā)listening事件,集群中的主進(jìn)程也會(huì)觸發(fā)一個(gè)listening事件。

這個(gè)事件的回調(diào)函數(shù)包含兩個(gè)參數(shù),第一個(gè)worker是一個(gè)包含工作進(jìn)程的對(duì)象,address對(duì)象是一個(gè)包含以下屬性的對(duì)象:addressportaddressType。當(dāng)工作進(jìn)程監(jiān)聽多個(gè)地址時(shí),這非常有用。

cluster.on('listening', function(worker, address) {
  console.log("A worker is now connected to " + address.address + ":" + address.port);
});

addressType是以下中的一個(gè):

  • 4 (TCPv4)
  • 6 (TCPv6)
  • -1 (unix domain socket)
  • "udp4" 或 "udp6" (UDP v4 或 v6)

Event: 'disconnect'

  • worker Worker object

當(dāng)工作進(jìn)程的IPC信道斷開連接時(shí)觸發(fā)。這個(gè)事件當(dāng)工作進(jìn)程優(yōu)雅地退出,被殺死,或手工斷開連接(如調(diào)用worker.disconnect())后觸發(fā)。

disconnectexit事件之間可能存在延遲。這兩個(gè)事件可以用來偵測(cè)是否進(jìn)程在清理的過程中被阻塞,或者是否存在長(zhǎng)連接。

cluster.on('disconnect', function(worker) {
  console.log('The worker #' + worker.id + ' has disconnected');
});

Event: 'exit'

  • worker Worker object
  • code Number 如果正常退出,則為退出碼
  • signal String 導(dǎo)致進(jìn)程被殺死的信號(hào)的信號(hào)名(如'SIGHUP')

當(dāng)任何一個(gè)工作進(jìn)程結(jié)束時(shí),cluster模塊會(huì)觸發(fā)一個(gè)exit事件。

這可以被用來通過再次調(diào)用.fork()方法重啟服務(wù)器。

cluster.on('exit', function(worker, code, signal) {
  console.log('worker %d died (%s). restarting...',
    worker.process.pid, signal || code);
  cluster.fork();
});

參閱child_process事件:exit。

Event: 'setup'

  • settings Object

每次.setupMaster()方法被調(diào)用時(shí)觸發(fā)。

這個(gè)settings對(duì)象與.setupMaster()被調(diào)用時(shí)cluster.settings對(duì)象相同,并且僅供查詢,因?yàn)?code>.setupMaster()可能在一次事件循環(huán)里被調(diào)用多次。

如果保持精確十分重要,請(qǐng)使用cluster.settings

cluster.setupMaster([settings])

  • settings Object

  • exec String 工作進(jìn)程文件的路徑(默認(rèn)為process.argv[1]
  • args Array 傳遞給工作進(jìn)程的參數(shù)字符串(默認(rèn)為process.argv.slice(2)
  • silent Boolean 是否將工作進(jìn)程的輸出傳遞給父進(jìn)程的stdio(默認(rèn)為false)

setupMaster方法被用來改變默認(rèn)的fork行為。一旦被調(diào)用,settings參數(shù)將被表現(xiàn)為cluster.settings。

注意:

  • 任何settings的改變僅影響之后的.fork()調(diào)用,而不影響已經(jīng)運(yùn)行中的工作進(jìn)程
  • 工作進(jìn)程中唯一不同通過.setupMaster()來設(shè)置的屬性是傳遞給.fork()方法的env參數(shù)
  • 上文中的參數(shù)的默認(rèn)值僅在第一次調(diào)用時(shí)被應(yīng)用,之后的調(diào)用的默認(rèn)值是當(dāng)前cluster.setupMaster()被調(diào)用時(shí)的值。

例子:

var cluster = require('cluster');
cluster.setupMaster({
  exec: 'worker.js',
  args: ['--use', 'https'],
  silent: true
});
cluster.fork(); // https worker
cluster.setupMaster({
  args: ['--use', 'http']
});
cluster.fork(); // http worker

這只能被主進(jìn)程調(diào)用。

cluster.fork([env])

  • env Object 將添加到工作進(jìn)程環(huán)境變量的鍵值對(duì)
  • return Worker object

創(chuàng)建一個(gè)新的工作進(jìn)程。

這只能被主進(jìn)程調(diào)用。

cluster.disconnect([callback])

  • callback Function 當(dāng)所有的工作進(jìn)程斷開連接并且所有句柄關(guān)閉后調(diào)用

cluster.workers中的每一個(gè)工作進(jìn)程中調(diào)用.disconnect()。

當(dāng)所有進(jìn)程斷開連接,所有內(nèi)部的句柄都將關(guān)閉,如果沒有其他的事件處于等待,將允許主進(jìn)程優(yōu)雅地退出。

這個(gè)方法接受一個(gè)可選的將會(huì)在結(jié)束時(shí)觸發(fā)的回調(diào)函數(shù)參數(shù)。

這只能被主進(jìn)程調(diào)用。

cluster.worker

  • Object

當(dāng)前工作進(jìn)程對(duì)象的引用。對(duì)于主進(jìn)程不可用。

var cluster = require('cluster');

if (cluster.isMaster) {
  console.log('I am master');
  cluster.fork();
  cluster.fork();
} else if (cluster.isWorker) {
  console.log('I am worker #' + cluster.worker.id);
}

cluster.workers

  • Object

一個(gè)儲(chǔ)存了所有活躍的工作進(jìn)程對(duì)象的哈希表,以id字段為主鍵。這使得遍歷所有工作進(jìn)程變得容易。僅在主進(jìn)程中可用。

當(dāng)工作進(jìn)程斷開連接或退出時(shí),它會(huì)從cluster.workers中移除。這個(gè)兩個(gè)事件的觸發(fā)順序不能被提前決定。但是,能保證的是,從cluster.workers移除一定發(fā)生在這兩個(gè)事件觸發(fā)之后。

// Go through all workers
function eachWorker(callback) {
  for (var id in cluster.workers) {
    callback(cluster.workers[id]);
  }
}
eachWorker(function(worker) {
  worker.send('big announcement to all workers');
});

想要跨越通信信道來得到一個(gè)工作進(jìn)程的引用時(shí),使用工作進(jìn)程的唯一id能簡(jiǎn)單找到工作進(jìn)程。

socket.on('data', function(id) {
  var worker = cluster.workers[id];
});

Class: Worker

Worker對(duì)象包含了一個(gè)工作進(jìn)程所有的公開信息和方法。在主進(jìn)程中它可以通過cluster.workers取得。在工作進(jìn)程中它可以通過cluster.worker取得。

worker.id

  • String

每一個(gè)新的工作進(jìn)程都被給予一個(gè)獨(dú)一無二的id,這個(gè)id被存儲(chǔ)在此id屬性中。

當(dāng)一個(gè)工作進(jìn)程活躍時(shí),這是它被索引在cluster.workers中的主鍵。

worker.process

  • ChildProcess object

所有的工作進(jìn)程都通過child_process.fork()被創(chuàng)建,返回的對(duì)象被作為.process屬性存儲(chǔ)。在一個(gè)工作進(jìn)程中,全局的process被存儲(chǔ)。

參閱Child Process module

注意,如果在進(jìn)程中disconnect事件觸發(fā)并且.suicide屬性不為true,那么進(jìn)程會(huì)調(diào)用process.exit(0)。這防止了意外的斷開連接。

worker.suicide

  • Boolean

通過調(diào)用.kill().disconnect()設(shè)置,在這之前他為undefined。

布爾值worker.suicide使你可以區(qū)別自發(fā)和意外的退出,主進(jìn)程可以通過這個(gè)值來決定使用重新創(chuàng)建一個(gè)工作進(jìn)程。

cluster.on('exit', function(worker, code, signal) {
  if (worker.suicide === true) {
    console.log('Oh, it was just suicide\' – no need to worry').
  }
});

// kill worker
worker.kill();

worker.send(message[, sendHandle])

  • message Object
  • sendHandle Handle object

給工作進(jìn)程或主進(jìn)程發(fā)生一個(gè)信息,可選得添加一個(gè)句柄。

在主進(jìn)程中它將給特定的工作進(jìn)程發(fā)送一個(gè)信息。它指向child.send()。

在工作進(jìn)程中它將給主進(jìn)程發(fā)送一個(gè)信息。它指向process.send()。

下面的例子將來自主進(jìn)程的所有信息返回:

if (cluster.isMaster) {
  var worker = cluster.fork();
  worker.send('hi there');

} else if (cluster.isWorker) {
  process.on('message', function(msg) {
    process.send(msg);
  });
}

worker.kill([signal='SIGTERM'])

  • signal String 傳遞給工作進(jìn)程的結(jié)束信號(hào)名

這個(gè)函數(shù)將會(huì)殺死工作進(jìn)程。在主進(jìn)程中,它通過斷開worker.process做到,并且一旦斷開,使用signal殺死進(jìn)程。在工作進(jìn)程中,它通過斷開信道做到,然后使用退出碼0退出。

會(huì)導(dǎo)致.suicide被設(shè)置。

為了向后兼任,這個(gè)方法的別名是worker.destroy()。

注意在工作進(jìn)程中,process.kill()存在,但它不是這個(gè)函數(shù)。是process.kill(pid[, signal])。

worker.disconnect()

在工作進(jìn)程中,這個(gè)函數(shù)會(huì)關(guān)閉所有的服務(wù)器,等待這些服務(wù)器上的close事件,然后斷開IPC信道。

在主進(jìn)程中,一個(gè)內(nèi)部信息會(huì)被傳遞給工作進(jìn)程,至使它們自行調(diào)用.disconnect()。

會(huì)導(dǎo)致.suicide被設(shè)置。

注意在一個(gè)服務(wù)器被關(guān)閉后,它將不會(huì)再接受新連接,但是連接可能被其他正在監(jiān)聽的工作進(jìn)程所接收。已存在的連接將會(huì)被允許向往常一樣退出。當(dāng)沒有更多的連接存在時(shí),工作進(jìn)程的IPC信道會(huì)關(guān)閉并使之優(yōu)雅地退出,參閱server.close()。

以上說明僅應(yīng)用于服務(wù)器連接,客戶端連接將不會(huì)自動(dòng)由工作進(jìn)程關(guān)閉,并且在退出前,不會(huì)等到連接退出。

注意在工作進(jìn)程中,process.disconnect存在,但它不是這個(gè)函數(shù)。是child.disconnect()。

由于長(zhǎng)連接可能會(huì)阻塞工作進(jìn)程的退出,這時(shí)傳遞一個(gè)動(dòng)作信息非常有用,應(yīng)用來根據(jù)信息指定的動(dòng)作來關(guān)閉它們。超時(shí)機(jī)制是上述的有用實(shí)現(xiàn),在disconnect事件在指定時(shí)長(zhǎng)后沒有觸發(fā)時(shí),殺死工作進(jìn)程。

if (cluster.isMaster) {
  var worker = cluster.fork();
  var timeout;

  worker.on('listening', function(address) {
    worker.send('shutdown');
    worker.disconnect();
    timeout = setTimeout(function() {
      worker.kill();
    }, 2000);
  });

  worker.on('disconnect', function() {
    clearTimeout(timeout);
  });

} else if (cluster.isWorker) {
  var net = require('net');
  var server = net.createServer(function(socket) {
    // connections never end
  });

  server.listen(8000);

  process.on('message', function(msg) {
    if(msg === 'shutdown') {
      // initiate graceful close of any connections to server
    }
  });
}

worker.isDead()

如果工作進(jìn)程已經(jīng)被關(guān)閉,則返回true。

worker.isConnected()

如果工作進(jìn)程通過它的IPC信道連接到主進(jìn)程,則返回true。一個(gè)工作進(jìn)程在被創(chuàng)建后連接到它的主進(jìn)程。在disconnect事件觸發(fā)后它會(huì)斷開連接。

Event: 'message'

  • message Object

這個(gè)事件與child_process.fork()所提供的事件完全相同。

在工作進(jìn)程中你也可以使用process.on('message')。

例子,這里有一個(gè)集群,使用消息系統(tǒng)在主進(jìn)程中統(tǒng)計(jì)請(qǐng)求的數(shù)量:

var cluster = require('cluster');
var http = require('http');

if (cluster.isMaster) {

  // Keep track of http requests
  var numReqs = 0;
  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);

  // Count requestes
  function messageHandler(msg) {
    if (msg.cmd && msg.cmd == 'notifyRequest') {
      numReqs += 1;
    }
  }

  // Start workers and listen for messages containing notifyRequest
  var numCPUs = require('os').cpus().length;
  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  Object.keys(cluster.workers).forEach(function(id) {
    cluster.workers[id].on('message', messageHandler);
  });

} else {

  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");

    // notify master about the request
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

Event: 'online'

cluster.on('online')事件相似,但指向了特定的工作進(jìn)程。

cluster.fork().on('online', function() {
  // Worker is online
});

這不是在工作進(jìn)程中觸發(fā)的。

Event: 'listening'

  • address Object

cluster.on('listening')事件相似,但指向了特定的工作進(jìn)程。

cluster.fork().on('listening', function(address) {
  // Worker is listening
});

這不是在工作進(jìn)程中觸發(fā)的。

Event: 'disconnect'

cluster.on('disconnect')事件相似,但指向了特定的工作進(jìn)程。

cluster.fork().on('disconnect', function() {
  // Worker has disconnected
});

Event: 'exit'

  • code Number 如果正常退出,則為退出碼
  • signal String 導(dǎo)致進(jìn)程被殺死的信號(hào)名(如'SIGHUP'

cluster.on('exit')事件相似,但指向了特定的工作進(jìn)程。

var worker = cluster.fork();
worker.on('exit', function(code, signal) {
  if( signal ) {
    console.log("worker was killed by signal: "+signal);
  } else if( code !== 0 ) {
    console.log("worker exited with error code: "+code);
  } else {
    console.log("worker success!");
  }
});

Event: 'error'

這個(gè)事件與child_process.fork()所提供的事件完全相同。

在工作進(jìn)程中你也可以使用process.on('error')

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)