Source: lib/mongos.js

  1. "use strict";
  2. var EventEmitter = require('events').EventEmitter
  3. , inherits = require('util').inherits
  4. , f = require('util').format
  5. , ServerCapabilities = require('./topology_base').ServerCapabilities
  6. , MongoError = require('mongodb-core').MongoError
  7. , CMongos = require('mongodb-core').Mongos
  8. , Cursor = require('./cursor')
  9. , AggregationCursor = require('./aggregation_cursor')
  10. , CommandCursor = require('./command_cursor')
  11. , Define = require('./metadata')
  12. , Server = require('./server')
  13. , Store = require('./topology_base').Store
  14. , MAX_JS_INT = require('./utils').MAX_JS_INT
  15. , translateOptions = require('./utils').translateOptions
  16. , filterOptions = require('./utils').filterOptions
  17. , mergeOptions = require('./utils').mergeOptions
  18. , getReadPreference = require('./utils').getReadPreference
  19. , os = require('os');
  20. // Get package.json variable
  21. var driverVersion = require('../package.json').version;
  22. var nodejsversion = f('Node.js %s, %s', process.version, os.endianness());
  23. var type = os.type();
  24. var name = process.platform;
  25. var architecture = process.arch;
  26. var release = os.release();
  27. /**
  28. * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is
  29. * used to construct connections.
  30. *
  31. * **Mongos Should not be used, use MongoClient.connect**
  32. * @example
  33. * var Db = require('mongodb').Db,
  34. * Mongos = require('mongodb').Mongos,
  35. * Server = require('mongodb').Server,
  36. * test = require('assert');
  37. * // Connect using Mongos
  38. * var server = new Server('localhost', 27017);
  39. * var db = new Db('test', new Mongos([server]));
  40. * db.open(function(err, db) {
  41. * // Get an additional db
  42. * db.close();
  43. * });
  44. */
  45. // Allowed parameters
  46. var legalOptionNames = ['ha', 'haInterval', 'acceptableLatencyMS'
  47. , 'poolSize', 'ssl', 'checkServerIdentity', 'sslValidate'
  48. , 'sslCA', 'sslCRL', 'sslCert', 'sslKey', 'sslPass', 'socketOptions', 'bufferMaxEntries'
  49. , 'store', 'auto_reconnect', 'autoReconnect', 'emitError'
  50. , 'keepAlive', 'noDelay', 'connectTimeoutMS', 'socketTimeoutMS'
  51. , 'loggerLevel', 'logger', 'reconnectTries', 'appname', 'domainsEnabled'
  52. , 'servername', 'promoteLongs', 'promoteValues', 'promoteBuffers'];
  53. /**
  54. * Creates a new Mongos instance
  55. * @class
  56. * @deprecated
  57. * @param {Server[]} servers A seedlist of servers participating in the replicaset.
  58. * @param {object} [options=null] Optional settings.
  59. * @param {booelan} [options.ha=true] Turn on high availability monitoring.
  60. * @param {number} [options.haInterval=5000] Time between each replicaset status check.
  61. * @param {number} [options.poolSize=5] Number of connections in the connection pool for each server instance, set to 5 as default for legacy reasons.
  62. * @param {number} [options.acceptableLatencyMS=15] Cutoff latency point in MS for MongoS proxy selection
  63. * @param {boolean} [options.ssl=false] Use ssl connection (needs to have a mongod server with ssl support)
  64. * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
  65. * @param {object} [options.sslValidate=true] Validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher)
  66. * @param {array} [options.sslCA=null] Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher)
  67. * @param {array} [options.sslCRL=null] Array of revocation certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher)
  68. * @param {(Buffer|string)} [options.sslCert=null] String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
  69. * @param {(Buffer|string)} [options.sslKey=null] String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
  70. * @param {(Buffer|string)} [options.sslPass=null] String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher)
  71. * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
  72. * @param {object} [options.socketOptions=null] Socket options
  73. * @param {boolean} [options.socketOptions.noDelay=true] TCP Socket NoDelay option.
  74. * @param {number} [options.socketOptions.keepAlive=0] TCP KeepAlive on the socket with a X ms delay before start.
  75. * @param {number} [options.socketOptions.connectTimeoutMS=0] TCP Connection timeout setting
  76. * @param {number} [options.socketOptions.socketTimeoutMS=0] TCP Socket timeout setting
  77. * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
  78. * @fires Mongos#connect
  79. * @fires Mongos#ha
  80. * @fires Mongos#joined
  81. * @fires Mongos#left
  82. * @fires Mongos#fullsetup
  83. * @fires Mongos#open
  84. * @fires Mongos#close
  85. * @fires Mongos#error
  86. * @fires Mongos#timeout
  87. * @fires Mongos#parseError
  88. * @property {string} parserType the parser type used (c++ or js).
  89. * @return {Mongos} a Mongos instance.
  90. */
  91. var Mongos = function(servers, options) {
  92. if(!(this instanceof Mongos)) return new Mongos(servers, options);
  93. options = options || {};
  94. var self = this;
  95. // Filter the options
  96. options = filterOptions(options, legalOptionNames);
  97. // Ensure all the instances are Server
  98. for(var i = 0; i < servers.length; i++) {
  99. if(!(servers[i] instanceof Server)) {
  100. throw MongoError.create({message: "all seed list instances must be of the Server type", driver:true});
  101. }
  102. }
  103. // Stored options
  104. var storeOptions = {
  105. force: false
  106. , bufferMaxEntries: typeof options.bufferMaxEntries == 'number' ? options.bufferMaxEntries : MAX_JS_INT
  107. }
  108. // Shared global store
  109. var store = options.store || new Store(self, storeOptions);
  110. // Set up event emitter
  111. EventEmitter.call(this);
  112. // Build seed list
  113. var seedlist = servers.map(function(x) {
  114. return {host: x.host, port: x.port}
  115. });
  116. // Get the reconnect option
  117. var reconnect = typeof options.auto_reconnect == 'boolean' ? options.auto_reconnect : true;
  118. reconnect = typeof options.autoReconnect == 'boolean' ? options.autoReconnect : reconnect;
  119. // Clone options
  120. var clonedOptions = mergeOptions({}, {
  121. disconnectHandler: store,
  122. cursorFactory: Cursor,
  123. reconnect: reconnect,
  124. emitError: typeof options.emitError == 'boolean' ? options.emitError : true,
  125. size: typeof options.poolSize == 'number' ? options.poolSize : 5
  126. });
  127. // Translate any SSL options and other connectivity options
  128. clonedOptions = translateOptions(clonedOptions, options);
  129. // Socket options
  130. var socketOptions = options.socketOptions && Object.keys(options.socketOptions).length > 0
  131. ? options.socketOptions : options;
  132. // Translate all the options to the mongodb-core ones
  133. clonedOptions = translateOptions(clonedOptions, socketOptions);
  134. if(typeof clonedOptions.keepAlive == 'number') {
  135. clonedOptions.keepAliveInitialDelay = clonedOptions.keepAlive;
  136. clonedOptions.keepAlive = clonedOptions.keepAlive > 0;
  137. }
  138. // Build default client information
  139. this.clientInfo = {
  140. driver: {
  141. name: "nodejs",
  142. version: driverVersion
  143. },
  144. os: {
  145. type: type,
  146. name: name,
  147. architecture: architecture,
  148. version: release
  149. },
  150. platform: nodejsversion
  151. }
  152. // Build default client information
  153. clonedOptions.clientInfo = this.clientInfo;
  154. // Do we have an application specific string
  155. if(options.appname) {
  156. clonedOptions.clientInfo.application = { name: options.appname };
  157. }
  158. // Create the Mongos
  159. var mongos = new CMongos(seedlist, clonedOptions)
  160. // Server capabilities
  161. var sCapabilities = null;
  162. // Internal state
  163. this.s = {
  164. // Create the Mongos
  165. mongos: mongos
  166. // Server capabilities
  167. , sCapabilities: sCapabilities
  168. // Debug turned on
  169. , debug: clonedOptions.debug
  170. // Store option defaults
  171. , storeOptions: storeOptions
  172. // Cloned options
  173. , clonedOptions: clonedOptions
  174. // Actual store of callbacks
  175. , store: store
  176. // Options
  177. , options: options
  178. }
  179. }
  180. var define = Mongos.define = new Define('Mongos', Mongos, false);
  181. /**
  182. * @ignore
  183. */
  184. inherits(Mongos, EventEmitter);
  185. // Last ismaster
  186. Object.defineProperty(Mongos.prototype, 'isMasterDoc', {
  187. enumerable:true, get: function() { return this.s.mongos.lastIsMaster(); }
  188. });
  189. Object.defineProperty(Mongos.prototype, 'parserType', {
  190. enumerable:true, get: function() {
  191. return this.s.mongos.parserType;
  192. }
  193. });
  194. // BSON property
  195. Object.defineProperty(Mongos.prototype, 'bson', {
  196. enumerable: true, get: function() {
  197. return this.s.mongos.s.bson;
  198. }
  199. });
  200. Object.defineProperty(Mongos.prototype, 'haInterval', {
  201. enumerable:true, get: function() { return this.s.mongos.s.haInterval; }
  202. });
  203. // Connect
  204. Mongos.prototype.connect = function(db, _options, callback) {
  205. var self = this;
  206. if('function' === typeof _options) callback = _options, _options = {};
  207. if(_options == null) _options = {};
  208. if(!('function' === typeof callback)) callback = null;
  209. self.s.options = _options;
  210. // Update bufferMaxEntries
  211. self.s.storeOptions.bufferMaxEntries = db.bufferMaxEntries;
  212. // Error handler
  213. var connectErrorHandler = function() {
  214. return function(err) {
  215. // Remove all event handlers
  216. var events = ['timeout', 'error', 'close'];
  217. events.forEach(function(e) {
  218. self.removeListener(e, connectErrorHandler);
  219. });
  220. self.s.mongos.removeListener('connect', connectErrorHandler);
  221. // Try to callback
  222. try {
  223. callback(err);
  224. } catch(err) {
  225. process.nextTick(function() { throw err; })
  226. }
  227. }
  228. }
  229. // Actual handler
  230. var errorHandler = function(event) {
  231. return function(err) {
  232. if(event != 'error') {
  233. self.emit(event, err);
  234. }
  235. }
  236. }
  237. // Error handler
  238. var reconnectHandler = function() {
  239. self.emit('reconnect');
  240. self.s.store.execute();
  241. }
  242. // relay the event
  243. var relay = function(event) {
  244. return function(t, server) {
  245. self.emit(event, t, server);
  246. }
  247. }
  248. // Connect handler
  249. var connectHandler = function() {
  250. // Clear out all the current handlers left over
  251. var events = ["timeout", "error", "close", 'fullsetup'];
  252. events.forEach(function(e) {
  253. self.s.mongos.removeAllListeners(e);
  254. });
  255. // Set up listeners
  256. self.s.mongos.once('timeout', errorHandler('timeout'));
  257. self.s.mongos.once('error', errorHandler('error'));
  258. self.s.mongos.once('close', errorHandler('close'));
  259. // Set up serverConfig listeners
  260. self.s.mongos.on('fullsetup', function() { self.emit('fullsetup', self); });
  261. // Emit open event
  262. self.emit('open', null, self);
  263. // Return correctly
  264. try {
  265. callback(null, self);
  266. } catch(err) {
  267. process.nextTick(function() { throw err; })
  268. }
  269. }
  270. // Clear out all the current handlers left over
  271. var events = ["timeout", "error", "close", 'serverOpening', 'serverDescriptionChanged', 'serverHeartbeatStarted',
  272. 'serverHeartbeatSucceeded', 'serverHeartbeatFailed', 'serverClosed', 'topologyOpening',
  273. 'topologyClosed', 'topologyDescriptionChanged'];
  274. events.forEach(function(e) {
  275. self.s.mongos.removeAllListeners(e);
  276. });
  277. // Set up SDAM listeners
  278. self.s.mongos.on('serverDescriptionChanged', relay('serverDescriptionChanged'));
  279. self.s.mongos.on('serverHeartbeatStarted', relay('serverHeartbeatStarted'));
  280. self.s.mongos.on('serverHeartbeatSucceeded', relay('serverHeartbeatSucceeded'));
  281. self.s.mongos.on('serverHeartbeatFailed', relay('serverHeartbeatFailed'));
  282. self.s.mongos.on('serverOpening', relay('serverOpening'));
  283. self.s.mongos.on('serverClosed', relay('serverClosed'));
  284. self.s.mongos.on('topologyOpening', relay('topologyOpening'));
  285. self.s.mongos.on('topologyClosed', relay('topologyClosed'));
  286. self.s.mongos.on('topologyDescriptionChanged', relay('topologyDescriptionChanged'));
  287. // Set up listeners
  288. self.s.mongos.once('timeout', connectErrorHandler('timeout'));
  289. self.s.mongos.once('error', connectErrorHandler('error'));
  290. self.s.mongos.once('close', connectErrorHandler('close'));
  291. self.s.mongos.once('connect', connectHandler);
  292. // Join and leave events
  293. self.s.mongos.on('joined', relay('joined'));
  294. self.s.mongos.on('left', relay('left'));
  295. // Reconnect server
  296. self.s.mongos.on('reconnect', reconnectHandler);
  297. // Start connection
  298. self.s.mongos.connect(_options);
  299. }
  300. // Server capabilities
  301. Mongos.prototype.capabilities = function() {
  302. if(this.s.sCapabilities) return this.s.sCapabilities;
  303. if(this.s.mongos.lastIsMaster() == null) return null;
  304. this.s.sCapabilities = new ServerCapabilities(this.s.mongos.lastIsMaster());
  305. return this.s.sCapabilities;
  306. }
  307. define.classMethod('capabilities', {callback: false, promise:false, returns: [ServerCapabilities]});
  308. // Command
  309. Mongos.prototype.command = function(ns, cmd, options, callback) {
  310. this.s.mongos.command(ns, cmd, getReadPreference(options), callback);
  311. }
  312. define.classMethod('command', {callback: true, promise:false});
  313. // Insert
  314. Mongos.prototype.insert = function(ns, ops, options, callback) {
  315. this.s.mongos.insert(ns, ops, options, function(e, m) {
  316. callback(e, m)
  317. });
  318. }
  319. define.classMethod('insert', {callback: true, promise:false});
  320. // Update
  321. Mongos.prototype.update = function(ns, ops, options, callback) {
  322. this.s.mongos.update(ns, ops, options, callback);
  323. }
  324. define.classMethod('update', {callback: true, promise:false});
  325. // Remove
  326. Mongos.prototype.remove = function(ns, ops, options, callback) {
  327. this.s.mongos.remove(ns, ops, options, callback);
  328. }
  329. define.classMethod('remove', {callback: true, promise:false});
  330. // Destroyed
  331. Mongos.prototype.isDestroyed = function() {
  332. return this.s.mongos.isDestroyed();
  333. }
  334. // IsConnected
  335. Mongos.prototype.isConnected = function() {
  336. return this.s.mongos.isConnected();
  337. }
  338. define.classMethod('isConnected', {callback: false, promise:false, returns: [Boolean]});
  339. // Insert
  340. Mongos.prototype.cursor = function(ns, cmd, options) {
  341. options.disconnectHandler = this.s.store;
  342. return this.s.mongos.cursor(ns, cmd, options);
  343. }
  344. define.classMethod('cursor', {callback: false, promise:false, returns: [Cursor, AggregationCursor, CommandCursor]});
  345. Mongos.prototype.lastIsMaster = function() {
  346. return this.s.mongos.lastIsMaster();
  347. }
  348. /**
  349. * Unref all sockets
  350. * @method
  351. */
  352. Mongos.prototype.unref = function () {
  353. return this.s.mongos.unref();
  354. }
  355. Mongos.prototype.close = function(forceClosed) {
  356. this.s.mongos.destroy({
  357. force: typeof forceClosed == 'boolean' ? forceClosed : false,
  358. });
  359. // We need to wash out all stored processes
  360. if(forceClosed == true) {
  361. this.s.storeOptions.force = forceClosed;
  362. this.s.store.flush();
  363. }
  364. }
  365. define.classMethod('close', {callback: false, promise:false});
  366. Mongos.prototype.auth = function() {
  367. var args = Array.prototype.slice.call(arguments, 0);
  368. this.s.mongos.auth.apply(this.s.mongos, args);
  369. }
  370. define.classMethod('auth', {callback: true, promise:false});
  371. Mongos.prototype.logout = function() {
  372. var args = Array.prototype.slice.call(arguments, 0);
  373. this.s.mongos.logout.apply(this.s.mongos, args);
  374. }
  375. define.classMethod('logout', {callback: true, promise:false});
  376. /**
  377. * All raw connections
  378. * @method
  379. * @return {array}
  380. */
  381. Mongos.prototype.connections = function() {
  382. return this.s.mongos.connections();
  383. }
  384. define.classMethod('connections', {callback: false, promise:false, returns:[Array]});
  385. /**
  386. * A mongos connect event, used to verify that the connection is up and running
  387. *
  388. * @event Mongos#connect
  389. * @type {Mongos}
  390. */
  391. /**
  392. * The mongos high availability event
  393. *
  394. * @event Mongos#ha
  395. * @type {function}
  396. * @param {string} type The stage in the high availability event (start|end)
  397. * @param {boolean} data.norepeat This is a repeating high availability process or a single execution only
  398. * @param {number} data.id The id for this high availability request
  399. * @param {object} data.state An object containing the information about the current replicaset
  400. */
  401. /**
  402. * A server member left the mongos set
  403. *
  404. * @event Mongos#left
  405. * @type {function}
  406. * @param {string} type The type of member that left (primary|secondary|arbiter)
  407. * @param {Server} server The server object that left
  408. */
  409. /**
  410. * A server member joined the mongos set
  411. *
  412. * @event Mongos#joined
  413. * @type {function}
  414. * @param {string} type The type of member that joined (primary|secondary|arbiter)
  415. * @param {Server} server The server object that joined
  416. */
  417. /**
  418. * Mongos fullsetup event, emitted when all proxies in the topology have been connected to.
  419. *
  420. * @event Mongos#fullsetup
  421. * @type {Mongos}
  422. */
  423. /**
  424. * Mongos open event, emitted when mongos can start processing commands.
  425. *
  426. * @event Mongos#open
  427. * @type {Mongos}
  428. */
  429. /**
  430. * Mongos close event
  431. *
  432. * @event Mongos#close
  433. * @type {object}
  434. */
  435. /**
  436. * Mongos error event, emitted if there is an error listener.
  437. *
  438. * @event Mongos#error
  439. * @type {MongoError}
  440. */
  441. /**
  442. * Mongos timeout event
  443. *
  444. * @event Mongos#timeout
  445. * @type {object}
  446. */
  447. /**
  448. * Mongos parseError event
  449. *
  450. * @event Mongos#parseError
  451. * @type {object}
  452. */
  453. module.exports = Mongos;