const debug = require("debug")("streamroller:RollingFileWriteStream"); const _ = require("lodash"); const async = require("async"); const fs = require("fs-extra"); const zlib = require("zlib"); const path = require("path"); const newNow = require("./now"); const format = require("date-format"); const { Writable } = require("stream"); const FILENAME_SEP = "."; const ZIP_EXT = ".gz"; const moveAndMaybeCompressFile = ( sourceFilePath, targetFilePath, needCompress, done ) => { if (sourceFilePath === targetFilePath) { debug( `moveAndMaybeCompressFile: source and target are the same, not doing anything` ); return done(); } fs.access(sourceFilePath, fs.constants.W_OK | fs.constants.R_OK, e => { if (e) { debug( `moveAndMaybeCompressFile: source file path does not exist. not moving. sourceFilePath=${sourceFilePath}` ); return done(); } debug( `moveAndMaybeCompressFile: moving file from ${sourceFilePath} to ${targetFilePath} ${ needCompress ? "with" : "without" } compress` ); if (needCompress) { fs.createReadStream(sourceFilePath) .pipe(zlib.createGzip()) .pipe(fs.createWriteStream(targetFilePath)) .on("finish", () => { debug( `moveAndMaybeCompressFile: finished compressing ${targetFilePath}, deleting ${sourceFilePath}` ); fs.unlink(sourceFilePath, done); }); } else { debug( `moveAndMaybeCompressFile: deleting file=${targetFilePath}, renaming ${sourceFilePath} to ${targetFilePath}` ); fs.unlink(targetFilePath, () => { fs.rename(sourceFilePath, targetFilePath, done); }); } }); }; /** * RollingFileWriteStream is mainly used when writing to a file rolling by date or size. * RollingFileWriteStream inhebites from stream.Writable */ class RollingFileWriteStream extends Writable { /** * Create a RollingFileWriteStream * @constructor * @param {string} filePath - The file path to write. * @param {object} options - The extra options * @param {number} options.numToKeep - The max numbers of files to keep. * @param {number} options.maxSize - The maxSize one file can reach. Unit is Byte. * This should be more than 1024. The default is Number.MAX_SAFE_INTEGER. * @param {string} options.mode - The mode of the files. The default is '0644'. Refer to stream.writable for more. * @param {string} options.flags - The default is 'a'. Refer to stream.flags for more. * @param {boolean} options.compress - Whether to compress backup files. * @param {boolean} options.keepFileExt - Whether to keep the file extension. * @param {string} options.pattern - The date string pattern in the file name. * @param {boolean} options.alwaysIncludePattern - Whether to add date to the name of the first file. */ constructor(filePath, options) { debug(`creating RollingFileWriteStream. path=${filePath}`); super(options); this.options = this._parseOption(options); this.fileObject = path.parse(filePath); if (this.fileObject.dir === "") { this.fileObject = path.parse(path.join(process.cwd(), filePath)); } this.justTheFile = this._formatFileName({ isHotFile: true }); this.filename = path.join(this.fileObject.dir, this.justTheFile); this.state = { currentSize: 0 }; if (this.options.pattern) { this.state.currentDate = format(this.options.pattern, newNow()); } if (this.options.flags === "a") { this._setExistingSizeAndDate(); } debug( `create new file with no hot file. name=${ this.justTheFile }, state=${JSON.stringify(this.state)}` ); this._renewWriteStream(); } _setExistingSizeAndDate() { try { const stats = fs.statSync(this.filename); this.state.currentSize = stats.size; if (this.options.pattern) { this.state.currentDate = format(this.options.pattern, stats.birthtime); } } catch (e) { //file does not exist, that's fine - move along return; } } _parseOption(rawOptions) { const defaultOptions = { maxSize: Number.MAX_SAFE_INTEGER, numToKeep: Number.MAX_SAFE_INTEGER, encoding: "utf8", mode: parseInt("0644", 8), flags: "a", compress: false, keepFileExt: false, alwaysIncludePattern: false }; const options = _.defaults({}, rawOptions, defaultOptions); if (options.maxSize <= 0) { throw new Error(`options.maxSize (${options.maxSize}) should be > 0`); } if (options.numToKeep <= 0) { throw new Error(`options.numToKeep (${options.numToKeep}) should be > 0`); } debug(`creating stream with option=${JSON.stringify(options)}`); return options; } _shouldRoll(callback) { if ( this.state.currentDate && this.state.currentDate !== format(this.options.pattern, newNow()) ) { debug( `_shouldRoll: rolling by date because ${ this.state.currentDate } !== ${format(this.options.pattern, newNow())}` ); this._roll({ isNextPeriod: true }, callback); return; } if (this.state.currentSize >= this.options.maxSize) { debug( `_shouldRoll: rolling by size because ${this.state.currentSize} >= ${this.options.maxSize}` ); this._roll({ isNextPeriod: false }, callback); return; } callback(); } _write(chunk, encoding, callback) { this._shouldRoll(() => { debug( `writing chunk. ` + `file=${this.currentFileStream.path} ` + `state=${JSON.stringify(this.state)} ` + `chunk=${chunk}` ); this.currentFileStream.write(chunk, encoding, e => { this.state.currentSize += chunk.length; callback(e); }); }); } // Sorted from the oldest to the latest _getExistingFiles(cb) { fs.readdir(this.fileObject.dir, (e, files) => { debug(`_getExistingFiles: files=${files}`); const existingFileDetails = _.compact( _.map(files, n => { const parseResult = this._parseFileName(n); debug(`_getExistingFiles: parsed ${n} as `, parseResult); if (!parseResult) { return; } return _.assign({ fileName: n }, parseResult); }) ); cb( null, _.sortBy( existingFileDetails, n => (n.date ? format.parse(this.options.pattern, n.date).valueOf() : newNow().valueOf()) - n.index ) ); }); } // need file name instead of file abs path. _parseFileName(fileName) { let isCompressed = false; if (fileName.endsWith(ZIP_EXT)) { fileName = fileName.slice(0, -1 * ZIP_EXT.length); isCompressed = true; } let metaStr; if (this.options.keepFileExt) { const prefix = this.fileObject.name + FILENAME_SEP; const suffix = this.fileObject.ext; if (!fileName.startsWith(prefix) || !fileName.endsWith(suffix)) { return; } metaStr = fileName.slice(prefix.length, -1 * suffix.length); debug( `metaStr=${metaStr}, fileName=${fileName}, prefix=${prefix}, suffix=${suffix}` ); } else { const prefix = this.fileObject.base; if (!fileName.startsWith(prefix)) { return; } metaStr = fileName.slice(prefix.length + 1); debug(`metaStr=${metaStr}, fileName=${fileName}, prefix=${prefix}`); } if (!metaStr) { return { index: 0, isCompressed }; } if (this.options.pattern) { const items = _.split(metaStr, FILENAME_SEP); const indexStr = items[items.length - 1]; debug("items: ", items, ", indexStr: ", indexStr); if (indexStr !== undefined && indexStr.match(/^\d+$/)) { const dateStr = metaStr.slice(0, -1 * (indexStr.length + 1)); debug(`dateStr is ${dateStr}`); if (dateStr) { return { index: parseInt(indexStr, 10), date: dateStr, isCompressed }; } } debug(`metaStr is ${metaStr}`); return { index: 0, date: metaStr, isCompressed }; } else { if (metaStr.match(/^\d+$/)) { return { index: parseInt(metaStr, 10), isCompressed }; } } return; } _formatFileName({ date, index, isHotFile }) { debug( `_formatFileName: date=${date}, index=${index}, isHotFile=${isHotFile}` ); const dateStr = date || _.get(this, "state.currentDate") || format(this.options.pattern, newNow()); const indexOpt = index || _.get(this, "state.currentIndex"); const oriFileName = this.fileObject.base; if (isHotFile) { debug( `_formatFileName: includePattern? ${this.options.alwaysIncludePattern}, pattern: ${this.options.pattern}` ); if (this.options.alwaysIncludePattern && this.options.pattern) { debug( `_formatFileName: is hot file, and include pattern, so: ${oriFileName + FILENAME_SEP + dateStr}` ); return this.options.keepFileExt ? this.fileObject.name + FILENAME_SEP + dateStr + this.fileObject.ext : oriFileName + FILENAME_SEP + dateStr; } debug(`_formatFileName: is hot file so, filename: ${oriFileName}`); return oriFileName; } let fileNameExtraItems = []; if (this.options.pattern) { fileNameExtraItems.push(dateStr); } if (indexOpt && this.options.maxSize < Number.MAX_SAFE_INTEGER) { fileNameExtraItems.push(indexOpt); } let fileName; if (this.options.keepFileExt) { const baseFileName = this.fileObject.name + FILENAME_SEP + fileNameExtraItems.join(FILENAME_SEP); fileName = baseFileName + this.fileObject.ext; } else { fileName = oriFileName + FILENAME_SEP + fileNameExtraItems.join(FILENAME_SEP); } if (this.options.compress) { fileName += ZIP_EXT; } debug(`_formatFileName: ${fileName}`); return fileName; } _moveOldFiles(isNextPeriod, cb) { const currentFilePath = this.currentFileStream.path; debug(`numToKeep = ${this.options.numToKeep}`); const finishedRolling = () => { if (isNextPeriod) { this.state.currentSize = 0; this.state.currentDate = format(this.options.pattern, newNow()); debug(`rolling for next period. state=${JSON.stringify(this.state)}`); } else { this.state.currentSize = 0; debug( `rolling during the same period. state=${JSON.stringify(this.state)}` ); } this._renewWriteStream(); // wait for the file to be open before cleaning up old ones, // otherwise the daysToKeep calculations can be off this.currentFileStream.write("", "utf8", () => this._clean(cb)); }; this._getExistingFiles((e, files) => { const filesToMove = []; const todaysFiles = this.state.currentDate ? files.filter(f => f.date === this.state.currentDate) : files; for (let i = todaysFiles.length; i >= 0; i--) { debug(`i = ${i}`); const sourceFilePath = i === 0 ? currentFilePath : path.format({ dir: this.fileObject.dir, base: this._formatFileName({ date: this.state.currentDate, index: i }) }); const targetFilePath = path.format({ dir: this.fileObject.dir, base: this._formatFileName({ date: this.state.currentDate, index: i + 1 }) }); filesToMove.push({ sourceFilePath, targetFilePath }); } debug(`filesToMove = `, filesToMove); async.eachOfSeries( filesToMove, (files, idx, cb1) => { debug( `src=${files.sourceFilePath}, tgt=${ files.sourceFilePath }, idx=${idx}, pos=${filesToMove.length - 1 - idx}` ); moveAndMaybeCompressFile( files.sourceFilePath, files.targetFilePath, this.options.compress && filesToMove.length - 1 - idx === 0, cb1 ); }, finishedRolling ); }); } _roll({ isNextPeriod }, cb) { debug(`rolling, isNextPeriod ? ${isNextPeriod}`); debug(`_roll: closing the current stream`); this.currentFileStream.end("", this.options.encoding, () => { this._moveOldFiles(isNextPeriod, cb); }); } _renewWriteStream() { fs.ensureDirSync(this.fileObject.dir); this.justTheFile = this._formatFileName({ date: this.state.currentDate, index: 0, isHotFile: true }); const filePath = path.format({ dir: this.fileObject.dir, base: this.justTheFile }); const ops = _.pick(this.options, ["flags", "encoding", "mode"]); this.currentFileStream = fs.createWriteStream(filePath, ops); this.currentFileStream.on("error", e => { this.emit("error", e); }); } _clean(cb) { this._getExistingFiles((e, existingFileDetails) => { debug( `numToKeep = ${this.options.numToKeep}, existingFiles = ${existingFileDetails.length}` ); debug("existing files are: ", existingFileDetails); if ( this.options.numToKeep > 0 && existingFileDetails.length > this.options.numToKeep ) { const fileNamesToRemove = _.slice( existingFileDetails.map(f => f.fileName), 0, existingFileDetails.length - this.options.numToKeep - 1 ); this._deleteFiles(fileNamesToRemove, cb); return; } cb(); }); } _deleteFiles(fileNames, done) { debug(`files to delete: ${fileNames}`); async.each( _.map(fileNames, f => path.format({ dir: this.fileObject.dir, base: f })), fs.unlink, done ); return; } } module.exports = RollingFileWriteStream;