'use strict'; var EventEmitter = require('events'), inherits = require('util').inherits, MongoNetworkError = require('mongodb-core').MongoNetworkError; var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference']; /** * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}. * @class ChangeStream * @since 3.0.0 * @param {(Db|Collection)} changeDomain The collection against which to create the change stream * @param {Array} pipeline An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents * @param {object} [options=null] Optional settings * @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. * @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query * @param {object} [options.resumeAfter=null] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document. * @param {number} [options.batchSize=null] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. * @param {object} [options.collation=null] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. * @param {ReadPreference} [options.readPreference=null] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}. * @fires ChangeStream#close * @fires ChangeStream#change * @fires ChangeStream#end * @fires ChangeStream#error * @return {ChangeStream} a ChangeStream instance. */ var ChangeStream = function(collection, pipeline, options) { var Collection = require('./collection'); // Ensure the provided collection is actually a collection if (!(collection instanceof Collection)) { throw new Error( 'collection provided to ChangeStream constructor is not an instance of Collection' ); } var self = this; self.pipeline = pipeline || []; self.options = options || {}; self.promiseLibrary = collection.s.promiseLibrary; // Extract namespace and serverConfig from the collection self.namespace = { collection: collection.collectionName, database: collection.s.db.databaseName }; self.serverConfig = collection.s.db.serverConfig; // Determine correct read preference self.options.readPreference = self.options.readPreference || collection.s.readPreference; // Create contained Change Stream cursor self.cursor = createChangeStreamCursor(self); // Listen for any `change` listeners being added to ChangeStream self.on('newListener', function(eventName) { if (eventName === 'change' && self.cursor && self.cursor.listenerCount('change') === 0) { self.cursor.on('data', function(change) { processNewChange(self, null, change); }); } }); // Listen for all `change` listeners being removed from ChangeStream self.on('removeListener', function(eventName) { if (eventName === 'change' && self.listenerCount('change') === 0 && self.cursor) { self.cursor.removeAllListeners('data'); } }); }; inherits(ChangeStream, EventEmitter); // Create a new change stream cursor based on self's configuration var createChangeStreamCursor = function(self) { if (self.resumeToken) { self.options.resumeAfter = self.resumeToken; } var changeStreamCursor = buildChangeStreamAggregationCommand( self.serverConfig, self.namespace, self.pipeline, self.resumeToken, self.options ); /** * Fired for each new matching change in the specified namespace. Attaching a `change` event listener to a Change Stream will switch the stream into flowing mode. Data will then be passed as soon as it is available. * * @event ChangeStream#change * @type {object} */ if (self.listenerCount('change') > 0) { changeStreamCursor.on('data', function(change) { processNewChange(self, null, change); }); } /** * Change stream close event * * @event ChangeStream#close * @type {null} */ changeStreamCursor.on('close', function() { self.emit('close'); }); /** * Change stream end event * * @event ChangeStream#end * @type {null} */ changeStreamCursor.on('end', function() { self.emit('end'); }); /** * Fired when the stream encounters an error. * * @event ChangeStream#error * @type {Error} */ changeStreamCursor.on('error', function(error) { self.emit('error', error); }); return changeStreamCursor; }; var buildChangeStreamAggregationCommand = function( serverConfig, namespace, pipeline, resumeToken, options ) { var changeStreamStageOptions = {}; if (options.fullDocument) { changeStreamStageOptions.fullDocument = options.fullDocument; } if (resumeToken || options.resumeAfter) { changeStreamStageOptions.resumeAfter = resumeToken || options.resumeAfter; } // Map cursor options var cursorOptions = {}; cursorOptionNames.forEach(function(optionName) { if (options[optionName]) { cursorOptions[optionName] = options[optionName]; } }); var changeStreamPipeline = [{ $changeStream: changeStreamStageOptions }]; changeStreamPipeline = changeStreamPipeline.concat(pipeline); var command = { aggregate: namespace.collection, pipeline: changeStreamPipeline, readConcern: { level: 'majority' }, cursor: { batchSize: options.batchSize || 1 } }; // Create and return the cursor return serverConfig.cursor( namespace.database + '.' + namespace.collection, command, cursorOptions ); }; /** * Check if there is any document still available in the Change Stream * @function ChangeStream.prototype.hasNext * @param {ChangeStream~resultCallback} [callback] The result callback. * @throws {MongoError} * @return {Promise} returns Promise if no callback passed */ ChangeStream.prototype.hasNext = function(callback) { return this.cursor.hasNext(callback); }; /** * Get the next available document from the Change Stream, returns null if no more documents are available. * @function ChangeStream.prototype.next * @param {ChangeStream~resultCallback} [callback] The result callback. * @throws {MongoError} * @return {Promise} returns Promise if no callback passed */ ChangeStream.prototype.next = function(callback) { var self = this; if (this.isClosed()) { if (callback) return callback(new Error('Change Stream is not open.'), null); return self.promiseLibrary.reject(new Error('Change Stream is not open.')); } return this.cursor .next() .then(function(change) { return processNewChange(self, null, change, callback); }) .catch(function(err) { return processNewChange(self, err, null, callback); }); }; /** * Is the cursor closed * @method ChangeStream.prototype.isClosed * @return {boolean} */ ChangeStream.prototype.isClosed = function() { if (this.cursor) { return this.cursor.isClosed(); } return true; }; /** * Close the Change Stream * @method ChangeStream.prototype.close * @param {ChangeStream~resultCallback} [callback] The result callback. * @return {Promise} returns Promise if no callback passed */ ChangeStream.prototype.close = function(callback) { if (!this.cursor) { if (callback) return callback(); return this.promiseLibrary.resolve(); } // Tidy up the existing cursor var cursor = this.cursor; delete this.cursor; return cursor.close(callback); }; /** * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream. * @method * @param {Writable} destination The destination for writing data * @param {object} [options] {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options} * @return {null} */ ChangeStream.prototype.pipe = function(destination, options) { if (!this.pipeDestinations) { this.pipeDestinations = []; } this.pipeDestinations.push(destination); return this.cursor.pipe(destination, options); }; /** * This method will remove the hooks set up for a previous pipe() call. * @param {Writable} [destination] The destination for writing data * @return {null} */ ChangeStream.prototype.unpipe = function(destination) { if (this.pipeDestinations && this.pipeDestinations.indexOf(destination) > -1) { this.pipeDestinations.splice(this.pipeDestinations.indexOf(destination), 1); } return this.cursor.unpipe(destination); }; /** * This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer. * @return {null} */ ChangeStream.prototype.pause = function() { return this.cursor.pause(); }; /** * This method will cause the readable stream to resume emitting data events. * @return {null} */ ChangeStream.prototype.resume = function() { return this.cursor.resume(); }; /** * Return a modified Readable stream including a possible transform method. * @method * @param {object} [options=null] Optional settings. * @param {function} [options.transform=null] A transformation method applied to each document emitted by the stream. * @return {Cursor} */ ChangeStream.prototype.stream = function(options) { this.streamOptions = options; return this.cursor.stream(options); }; // Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream. var processNewChange = function(self, err, change, callback) { // Handle errors if (err) { // Handle resumable MongoNetworkErrors if (err instanceof MongoNetworkError && !self.attemptingResume) { self.attemptingResume = true; return self.cursor.close(function(closeErr) { if (closeErr) { if (callback) return callback(err, null); return self.promiseLibrary.reject(err); } // Establish a new cursor self.cursor = createChangeStreamCursor(self); // Attempt to reconfigure piping if (self.pipeDestinations) { var cursorStream = self.cursor.stream(self.streamOptions); for (var pipeDestination in self.pipeDestinations) { cursorStream.pipe(pipeDestination); } } // Attempt the next() operation again if (callback) return self.next(callback); return self.next(); }); } if (typeof callback === 'function') return callback(err, null); if (self.listenerCount('error')) return self.emit('error', err); return self.promiseLibrary.reject(err); } self.attemptingResume = false; // Cache the resume token if it is present. If it is not present return an error. if (!change || !change._id) { var noResumeTokenError = new Error( 'A change stream document has been received that lacks a resume token (_id).' ); if (typeof callback === 'function') return callback(noResumeTokenError, null); if (self.listenerCount('error')) return self.emit('error', noResumeTokenError); return self.promiseLibrary.reject(noResumeTokenError); } self.resumeToken = change._id; // Return the change if (typeof callback === 'function') return callback(err, change); if (self.listenerCount('change')) return self.emit('change', change); return self.promiseLibrary.resolve(change); }; /** * The callback format for results * @callback ChangeStream~resultCallback * @param {MongoError} error An error instance representing the error during the execution. * @param {(object|null)} result The result object if the command was executed successfully. */ module.exports = ChangeStream;