var net = require('net'); var util = require('util'); var EventEmitter = require('events').EventEmitter; var Queue = require('fastqueue'); var PacketParser = require('./packet_parser'); var Packet = require('./packets/packet'); var Packets = require('./packets/index.js'); var Commands = require('./commands/index.js'); var SqlString = require('./sql_string'); var _connectionId = 0; function Connection(opts) { EventEmitter.call(this); this.config = opts.config; // TODO: fill defaults // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX ) // if host is given, connect to host:3306 // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath // if there is no host/port and no socketPath parameters? if (!opts.config.stream) { if (opts.config.socketPath) this.stream = net.connect(opts.config.socketPath); else this.stream = net.connect(opts.config.port, opts.config.host); } else { this.stream = opts.config.stream; } this._internalId = _connectionId++; this._commands = new Queue(); this._command = null; this.statements = {}; // TODO: make it lru cache // https://github.com/mercadolibre/node-simple-lru-cache // or https://github.com/rsms/js-lru // or https://github.com/monsur/jscache // or https://github.com/isaacs/node-lru-cache // // key is field.name + ':' + field.columnType + ':' field.flags + '/' this.textProtocolParsers = {}; // TODO: not sure if cache should be separate (same key as with textProtocolParsers) // or part of prepared statements cache (key is sql query) this.binaryProtocolParsers = {}; this.serverCapabilityFlags = 0; this.authorized = false; var connection = this; this.sequenceId = 0; this.stream.on('error', function(err) { connection.emit('error', err); }); // big TODO: benchmark if it all worth using 'ondata' and onPacket callbacks directly // compositing streams would be much more easier. // also, look for existing length-prefixed streams to reuse instead of packet_parser // https://github.com/squaremo/node-spb - currently only fixed 4 byte prefix // ...? // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind this.packetParser = new PacketParser(function(p) { connection.handlePacket(p) }); // TODO: this code used to be an optimized version of handler // DOES NOT WORK IN NODE 11 // TODO: measure if we actually get something here // if yes, re-enable for node 10 //if (this.stream instanceof net.Stream) { // this.stream.ondata = function(data, start, end) { // connection.packetParser.execute(data, start, end); // }; //} else { this.stream.on('data', function(data) { connection.packetParser.execute(data, 0, data.length); }); //} this._protocolError = null; this.stream.on('end', function() { // we need to set this flag everywhere where we want connaction to close if (connection._closing) return; if (!connection._protocolError) // no particular error message before disconnect connection._protocolError = 'PROTOCOL_CONNECTION_LOST' var err = new Error('Connection lost: The server closed the connection.'); err.fatal = true; err.code = connection._protocolError; connection.emit('error', err); }); var handshakeCommand; if (!this.config.isServer) { handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags); handshakeCommand.on('error', function(e) { connection.emit('error', e); }); this.addCommand(handshakeCommand); } } util.inherits(Connection, EventEmitter); Connection.prototype.write = function(buffer) { this.stream.write(buffer); }; // TODO: replace function in runtime instead of having if() here // Needs benchmark. Connection.prototype.writePacket = function(packet) { packet.writeHeader(this.sequenceId); if (this.config.debug) { console.log(this._internalId + ' ' + this.connectionId + ' <== ' + this._command._commandName + '#' + this._command.stateName() + '(' + [this.sequenceId, packet._name, packet.length()].join(',') + ')'); } this.sequenceId++; if (this.sequenceId == 256) this.sequenceId = 0 if (!this.config.compress || !this.authorized) { this.write(packet.buffer); } else { var packetLen = packet.length(); var compressHeader = new Buffer(7); // TODO: currently all outgoing packets are sent uncompressed (header + deflated length=0 as uncompressed flag) // Need to implement deflation of outgoing packet. Also need to decide when not to compress small packets // http://dev.mysql.com/doc/internals/en/compression.html#uncompressed-payload suggest not to compress packets less than 50 bytes // Write uncompressed packet compressHeader.fill(0); compressHeader.writeUInt8(packetLen & 0xff, 0); compressHeader.writeUInt16LE(packetLen >> 8, 1); this.write(compressHeader); this.write(packet.buffer); } }; Connection.prototype.startTLS = function(onSecure) { var connection = this; var crypto = require('crypto'); var tls = require('tls'); var config = this.config; var stream = this.stream; // special case for Amazon RDS: use http://s3.amazonaws.com/rds-downloads/mysql-ssl-ca-cert.pem CA cert if ssl option set to "Amazon RDS" if (config.ssl === 'Amazon RDS') { var fs = require('fs'); var path = require('path'); fs.readFile(path.resolve(__dirname, '../fixtures/mysql-ssl-ca-cert.pem'), function(err, ca) { if (err) throw err; config.ssl = { ca: ca }; after(); }); } else after(); function after() { var credentials = crypto.createCredentials({ key: config.ssl.key, cert: config.ssl.cert, passphrase: config.ssl.passphrase, ca: config.ssl.ca }); var securePair = tls.createSecurePair(credentials, false); if (stream.ondata) stream.ondata = null; stream.removeAllListeners('data'); stream.pipe(securePair.encrypted); securePair.encrypted.pipe(stream); securePair.cleartext.on('data', function(data) { connection.packetParser.execute(data.parent, data.offset, data.offset + data.length); }); connection.write = function(buffer) { securePair.cleartext.write(buffer); }; securePair.on('secure', onSecure); } }; // TODO: this does not work if uncompressed packet is split by compressed // packet boundary. // My assumption about compressedPacket to contain one or more complete // compressed packets was wrong. It can wrap any chunk of data. // This will be rmoved in favor of connection.startInflate // currently Handshake command overwrites connection.handlePacket with handleCompressedPacket // before expecting first compressed packet var zlib = require('zlib'); Connection.prototype.handleCompressedPacket = function(packet) { var connection = this; var inflatedLength = packet.readInt24(); if (inflatedLength !== 0) { var compressedBody = packet.readBuffer(packet.length() - 3); zlib.inflate(compressedBody, function(err, packets) { if (err) return connection.emit('error', err); var offset = packets.offset; var end = offset + packets.length; var buffer = packets.parent; var len = 0; var id = 0; // single compressed packet can contain multiple uncompressed while (offset < end) { len = buffer.readUInt16LE(offset) + (buffer[offset+2] << 16); id = buffer[offset+3]; connection.handlePacket(new Packet(id, buffer, offset + 4, offset + 4 + len)); offset += 4 + len; } }); } else { inflatedLength = packet.readInt24(); var sequenceId = packet.readInt8(); connection.handlePacket(new Packet(sequenceId, packet.buffer, packet.offset, packet.offset + inflatedLength)); } }; // TODO: consider using @creationix simple-streams // https://gist.github.com/creationix/5498108 // https://github.com/creationix/min-stream-uv // https://github.com/creationix/min-stream-helpers // TODO: try with Stream2 streams // // changes stream -> packetParser to // stream -> compressedPacketParser -> inflateStream -> packetParser // note that in the caseof ssl this should become // stream -> securePair.encrypted -> securePair.cleartext -> compressedPacketParser -> inflateStream -> packetParser Connection.prototype.startInflate = function() { var connection = this; var zlib = require('zlib'); var inflateStream = zlib.createInflate(); var uncompressedPacketParser = connection.packetParser; connection.packetParser = new PacketParser(function(compressedPacket) { var inflatedLength = packet.readInt24(); if (inflatedLength !== 0) { inflateStream.write(packet.readBuffer(packet.length() - 3)); } else { uncompressedPacketParser.execute(packet.buffer, packet.offset, packet.end); } }); inflateStream.on('data', function(buff) { uncompressedPacketParser.execute(buff.parent, buff.offset, buff.offset + buff.length); }); if (this.stream.ondata) this.stream.ondata = null; this.stream.removeAllListeners('data'); this.pipe(); }; Connection.prototype.pipe = function() { var connection = this; if (this.stream instanceof net.Stream) { this.stream.ondata = function(data, start, end) { connection.packetParser.execute(data, start, end); }; } else { this.stream.on('data', function(data) { connection.packetParser.execute(data.parent, data.offset, data.offset + data.length); }); } }; Connection.prototype.handlePacket = function(packet) { // TODO: check packet sequenceId here var packetType = ''; if (packet) this.sequenceId = packet.sequenceId + 1; if (this.config.debug) { if (packet) { console.log(this._internalId + ' ' + this.connectionId + ' ==> ' + this._command._commandName + '#' + this._command.stateName() + '(' + [packet.sequenceId, packet.type(), packet.length()].join(',') + ')'); } } var done = this._command.execute(packet, this); if (done) { this.sequenceId = 0; this._command = this._commands.shift(); if (this._command) this.handlePacket(); } }; Connection.prototype.addCommand = function(cmd) { if (this.config.debug) { console.log('Add command: ' + arguments.callee.caller.name); cmd._commandName = arguments.callee.caller.name; } if (!this._command) { this._command = cmd; this.handlePacket(); } else { this._commands.push(cmd); } return cmd; }; Connection.prototype.format = function(sql, values) { if (typeof this.config.queryFormat == "function") { return this.config.queryFormat.call(this, sql, values, this.config.timezone); } return SqlString.format(sql, values, this.config.timezone); }; Connection.prototype.escape = function(value) { return SqlString.escape(value, false, this.config.timezone); }; function _domainify(callback) { var domain = process.domain; if (domain && callback) return process.domain.bind(callback); else return callback; } Connection.prototype.query = function query(sql, values, cb) { // copy-paste from node-mysql/lib/Connection.js:createQuery var options = {}; if (typeof sql === 'object') { // query(options, cb) options = sql; if (typeof values === 'function') { cb = values; } else { options.values = values; } } else if (typeof values === 'function') { // query(sql, cb) cb = values; options.sql = sql; options.values = undefined; } else { // query(sql, values, cb) options.sql = sql; options.values = values; } var rawSql = this.format(options.sql, options.values || []); return this.addCommand(new Commands.Query(rawSql, options, _domainify(cb))); }; Connection.prototype.execute = function execute(sql, values, cb) { var options = {}; if (typeof sql === 'object') { // execute(options, cb) options = sql; if (typeof values === 'function') { cb = values; } else { options.values = values; } } else if (typeof values === 'function') { // execute(sql, cb) cb = values; options.sql = sql; options.values = undefined; } else { // execute(sql, values, cb) options.sql = sql; options.values = values; } return this.addCommand(new Commands.Execute(options, _domainify(cb))); }; // transaction helpers Connection.prototype.beginTransaction = function(cb) { return this.query('START TRANSACTION', cb); } Connection.prototype.commit = function(cb) { return this.query('COMMIT', cb); } Connection.prototype.rollback = function(cb) { return this.query('ROLLBACK', cb); } Connection.prototype.ping = function ping(cb) { return this.addCommand(new Commands.Ping(_domainify(cb))); }; Connection.prototype._registerSlave = function registerSlave(opts, cb) { return this.addCommand(new Commands.RegisterSlave(opts, _domainify(cb))); }; Connection.prototype._binlogDump = function binlogDump(opts, cb) { return this.addCommand(new Commands.BinlogDump(opts, _domainify(cb))); }; Connection.prototype.createBinlogStream = function(opts) { // TODO: create proper stream class // TODO: use through2 var test = 1; var Readable = require('stream').Readable; var stream = new Readable({objectMode: true}); stream._read = function() { return { data: test++ } }; var connection = this; connection._registerSlave(opts, function(err) { var dumpCmd = connection._binlogDump(opts); dumpCmd.on('event', function(ev) { stream.push(ev); }); dumpCmd.on('eof', function() { stream.push(null); // if non-blocking, then close stream to prevent errors if (opts.flags && (opts.flags & 0x01)) { connection._closing = true; connection.stream.end(); } }); // TODO: pipe errors as well }) return stream; } Connection.prototype.connect = function(cb) { if (!cb) return; var connectCalled = 0; // TODO domainify this callback as well. Note that domain has to be captured // at the top of function due to nested callback function callbackOnce(isErrorHandler) { return function(param) { if (!connectCalled) { if (isErrorHandler) cb(param); else cb(null, param); } connectCalled = 1; }; } this.once('error', callbackOnce(true) ); this.once('connect', callbackOnce(false)); }; // =================================== // outgoing server connection methods // =================================== Connection.prototype.writeColumns = function(columns) { var connection = this; this.writePacket(Packets.ResultSetHeader.toPacket(columns.length)); columns.forEach(function(column) { connection.writePacket(Packets.ColumnDefinition.toPacket(column)); }); this.writeEof(); }; // row is array of columns, not hash Connection.prototype.writeTextRow = function(column) { this.writePacket(Packets.TextRow.toPacket(column)); }; Connection.prototype.writeTextResult = function(rows, columns) { var connection = this; connection.writeColumns(columns); rows.forEach(function(row) { var arrayRow = new Array(columns.length); columns.forEach(function(column) { arrayRow.push(row[column.name]); }); connection.writeTextRow(arrayRow); }); connection.writeEof(); }; Connection.prototype.writeEof = function(warnings, statusFlags) { this.writePacket(Packets.EOF.toPacket(warnings, statusFlags)); }; Connection.prototype.writeOk = function(args) { if (!args) args = { affectedRows: 0 }; this.writePacket(Packets.OK.toPacket(args)); }; Connection.prototype.writeError = function(args) { this.writePacket(Packets.Error.toPacket(args)); }; Connection.prototype.serverHandshake = function serverHandshake(args) { return this.addCommand(new Commands.ServerHandshake(args)); }; // =============================================================== // TODO: domainify Connection.prototype.end = function(callback) { // TODO: implement COM_QUIT command var self = this; var endCmd = { connection: this }; endCmd.execute = function() { self._closing = true; this.connection.stream.end(); if (callback) callback(); }; return this.addCommand(endCmd); //return this.addCommand(new Commands.Quit(callback)); }; module.exports = Connection;