675 lines
21 KiB
JavaScript
675 lines
21 KiB
JavaScript
|
var net = require('net');
|
||
|
var util = require('util');
|
||
|
var EventEmitter = require('events').EventEmitter;
|
||
|
var Queue = require('double-ended-queue');
|
||
|
|
||
|
var PacketParser = require('./packet_parser.js');
|
||
|
var Packet = require('./packets/packet.js');
|
||
|
var Packets = require('./packets/index.js');
|
||
|
var Commands = require('./commands/index.js');
|
||
|
var SqlString = require('./sql_string.js');
|
||
|
var ConnectionConfig = require('./connection_config.js');
|
||
|
|
||
|
var _connectionId = 0;
|
||
|
<<<<<<< HEAD
|
||
|
var noop = function() {};
|
||
|
=======
|
||
|
var convertNamedParameters;
|
||
|
>>>>>>> allow to use named placeholders in .execute()
|
||
|
|
||
|
function Connection(opts)
|
||
|
{
|
||
|
EventEmitter.call(this);
|
||
|
this.config = opts.config;
|
||
|
|
||
|
// TODO: fill defaults
|
||
|
// if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX )
|
||
|
// if host is given, connect to host:3306
|
||
|
|
||
|
// TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath
|
||
|
// if there is no host/port and no socketPath parameters?
|
||
|
|
||
|
if (!opts.config.stream) {
|
||
|
if (opts.config.socketPath)
|
||
|
this.stream = net.connect(opts.config.socketPath);
|
||
|
else
|
||
|
this.stream = net.connect(opts.config.port, opts.config.host);
|
||
|
} else {
|
||
|
// if stream is a function, treat it as "stream agent / factory"
|
||
|
if (typeof opts.config.stream == 'function')
|
||
|
this.stream = opts.config.stream(opts);
|
||
|
else
|
||
|
this.stream = opts.config.stream;
|
||
|
}
|
||
|
this._internalId = _connectionId++;
|
||
|
|
||
|
this._commands = new Queue();
|
||
|
this._command = null;
|
||
|
|
||
|
this._paused = false;
|
||
|
this._paused_packets = new Queue();
|
||
|
|
||
|
this._statements = {};
|
||
|
|
||
|
// TODO: make it lru cache
|
||
|
// https://github.com/mercadolibre/node-simple-lru-cache
|
||
|
// or https://github.com/rsms/js-lru
|
||
|
// or https://github.com/monsur/jscache
|
||
|
// or https://github.com/isaacs/node-lru-cache
|
||
|
//
|
||
|
// key is field.name + ':' + field.columnType + ':' field.flags + '/'
|
||
|
this.textProtocolParsers = {};
|
||
|
|
||
|
// TODO: not sure if cache should be separate (same key as with textProtocolParsers)
|
||
|
// or part of prepared statements cache (key is sql query)
|
||
|
this.binaryProtocolParsers = {};
|
||
|
|
||
|
this.serverCapabilityFlags = 0;
|
||
|
this.authorized = false;
|
||
|
|
||
|
var connection = this;
|
||
|
this.sequenceId = 0;
|
||
|
|
||
|
this.threadId = null;
|
||
|
this._handshakePacket = null;
|
||
|
|
||
|
this.stream.on('error', function(err) {
|
||
|
connection.emit('error', err);
|
||
|
});
|
||
|
|
||
|
// big TODO: benchmark if it all worth using 'ondata' and onPacket callbacks directly
|
||
|
// compositing streams would be much more easier.
|
||
|
// also, look for existing length-prefixed streams to reuse instead of packet_parser
|
||
|
// https://github.com/squaremo/node-spb - currently only fixed 4 byte prefix
|
||
|
// ...?
|
||
|
|
||
|
// see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind
|
||
|
this.packetParser = new PacketParser(function(p) { connection.handlePacket(p) });
|
||
|
|
||
|
// TODO: this code used to be an optimized version of handler
|
||
|
// DOES NOT WORK IN NODE 11
|
||
|
// TODO: measure if we actually get something here
|
||
|
// if yes, re-enable for node 10
|
||
|
//if (this.stream instanceof net.Stream) {
|
||
|
// this.stream.ondata = function(data, start, end) {
|
||
|
// connection.packetParser.execute(data, start, end);
|
||
|
// };
|
||
|
//} else {
|
||
|
this.stream.on('data', function(data) {
|
||
|
connection.packetParser.execute(data);
|
||
|
});
|
||
|
//}
|
||
|
this._protocolError = null;
|
||
|
this.stream.on('end', function() {
|
||
|
// we need to set this flag everywhere where we want connection to close
|
||
|
if (connection._closing)
|
||
|
return;
|
||
|
|
||
|
if (!connection._protocolError) // no particular error message before disconnect
|
||
|
connection._protocolError = 'PROTOCOL_CONNECTION_LOST'
|
||
|
var err = new Error('Connection lost: The server closed the connection.');
|
||
|
err.fatal = true;
|
||
|
err.code = connection._protocolError;
|
||
|
|
||
|
var command;
|
||
|
if (connection._command && connection._command.onResult)
|
||
|
connection._command.onResult(err);
|
||
|
while (command = connection._commands.shift())
|
||
|
if (command.onResult)
|
||
|
command.onResult(err);
|
||
|
|
||
|
connection.emit('error', err);
|
||
|
|
||
|
});
|
||
|
var handshakeCommand;
|
||
|
if (!this.config.isServer) {
|
||
|
handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags);
|
||
|
handshakeCommand.on('error', function(e) { connection.emit('error', e); });
|
||
|
handshakeCommand.on('end', function() {
|
||
|
connection._handshakePacket = handshakeCommand.handshake;
|
||
|
connection.threadId = handshakeCommand.handshake.connectionId;
|
||
|
});
|
||
|
this.addCommand(handshakeCommand);
|
||
|
}
|
||
|
}
|
||
|
util.inherits(Connection, EventEmitter);
|
||
|
|
||
|
Connection.prototype.write = function(buffer) {
|
||
|
this.stream.write(buffer);
|
||
|
};
|
||
|
|
||
|
// TODO: replace function in runtime instead of having if() here
|
||
|
// Needs benchmark.
|
||
|
Connection.prototype.writePacket = function(packet) {
|
||
|
packet.writeHeader(this.sequenceId);
|
||
|
if (this.config.debug) {
|
||
|
console.log(this._internalId + ' ' + this.connectionId + ' <== ' + this._command._commandName + '#' + this._command.stateName() + '(' + [this.sequenceId, packet._name, packet.length()].join(',') + ')');
|
||
|
}
|
||
|
this.sequenceId++;
|
||
|
if (this.sequenceId == 256)
|
||
|
this.sequenceId = 0
|
||
|
if (!this.config.compress || !this.authorized) {
|
||
|
this.write(packet.buffer);
|
||
|
} else {
|
||
|
var packetLen = packet.length();
|
||
|
var compressHeader = new Buffer(7);
|
||
|
|
||
|
// TODO: currently all outgoing packets are sent uncompressed (header + deflated length=0 as uncompressed flag)
|
||
|
// Need to implement deflation of outgoing packet. Also need to decide when not to compress small packets
|
||
|
// http://dev.mysql.com/doc/internals/en/compression.html#uncompressed-payload suggest not to compress packets less than 50 bytes
|
||
|
|
||
|
// Write uncompressed packet
|
||
|
compressHeader.fill(0);
|
||
|
compressHeader.writeUInt8(packetLen & 0xff, 0);
|
||
|
compressHeader.writeUInt16LE(packetLen >> 8, 1);
|
||
|
this.write(compressHeader);
|
||
|
this.write(packet.buffer);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
Connection.prototype.startTLS = function(onSecure) {
|
||
|
var connection = this;
|
||
|
var crypto = require('crypto');
|
||
|
var tls = require('tls');
|
||
|
var config = this.config;
|
||
|
var stream = this.stream;
|
||
|
|
||
|
// special case for Amazon RDS: use http://s3.amazonaws.com/rds-downloads/mysql-ssl-ca-cert.pem CA cert if ssl option set to "Amazon RDS"
|
||
|
if (config.ssl === 'Amazon RDS') {
|
||
|
var fs = require('fs');
|
||
|
var path = require('path');
|
||
|
fs.readFile(path.resolve(__dirname, '../fixtures/mysql-ssl-ca-cert.pem'), function(err, ca) {
|
||
|
if (err) throw err;
|
||
|
config.ssl = { ca: ca };
|
||
|
after();
|
||
|
});
|
||
|
} else
|
||
|
after();
|
||
|
|
||
|
function after() {
|
||
|
var credentials = crypto.createCredentials({
|
||
|
key: config.ssl.key,
|
||
|
cert: config.ssl.cert,
|
||
|
passphrase: config.ssl.passphrase,
|
||
|
ca: config.ssl.ca
|
||
|
});
|
||
|
var securePair = tls.createSecurePair(credentials, false);
|
||
|
if (stream.ondata)
|
||
|
stream.ondata = null;
|
||
|
stream.removeAllListeners('data');
|
||
|
stream.pipe(securePair.encrypted);
|
||
|
securePair.encrypted.pipe(stream);
|
||
|
securePair.cleartext.on('data', function(data) {
|
||
|
connection.packetParser.execute(data.parent, data.offset, data.offset + data.length);
|
||
|
});
|
||
|
connection.write = function(buffer) {
|
||
|
securePair.cleartext.write(buffer);
|
||
|
};
|
||
|
securePair.on('secure', onSecure);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
// TODO: this does not work if uncompressed packet is split by compressed
|
||
|
// packet boundary.
|
||
|
// My assumption about compressedPacket to contain one or more complete
|
||
|
// compressed packets was wrong. It can wrap any chunk of data.
|
||
|
// This will be rmoved in favor of connection.startInflate
|
||
|
// currently Handshake command overwrites connection.handlePacket with handleCompressedPacket
|
||
|
// before expecting first compressed packet
|
||
|
var zlib = require('zlib');
|
||
|
Connection.prototype.handleCompressedPacket = function(packet) {
|
||
|
var connection = this;
|
||
|
var inflatedLength = packet.readInt24();
|
||
|
if (inflatedLength !== 0) {
|
||
|
var compressedBody = packet.readBuffer(packet.length() - 3);
|
||
|
zlib.inflate(compressedBody, function(err, packets) {
|
||
|
if (err)
|
||
|
return connection.emit('error', err);
|
||
|
var offset = packets.offset;
|
||
|
var end = offset + packets.length;
|
||
|
var buffer = packets.parent;
|
||
|
var len = 0;
|
||
|
var id = 0;
|
||
|
// single compressed packet can contain multiple uncompressed
|
||
|
while (offset < end) {
|
||
|
len = buffer.readUInt16LE(offset) + (buffer[offset+2] << 16);
|
||
|
id = buffer[offset+3];
|
||
|
connection.handlePacket(new Packet(id, buffer, offset + 4, offset + 4 + len));
|
||
|
offset += 4 + len;
|
||
|
}
|
||
|
});
|
||
|
} else {
|
||
|
inflatedLength = packet.readInt24();
|
||
|
var sequenceId = packet.readInt8();
|
||
|
connection.handlePacket(new Packet(sequenceId, packet.buffer, packet.offset, packet.offset + inflatedLength));
|
||
|
}
|
||
|
};
|
||
|
|
||
|
// TODO: consider using @creationix simple-streams
|
||
|
// https://gist.github.com/creationix/5498108
|
||
|
// https://github.com/creationix/min-stream-uv
|
||
|
// https://github.com/creationix/min-stream-helpers
|
||
|
|
||
|
|
||
|
// TODO: try with Stream2 streams
|
||
|
//
|
||
|
// changes stream -> packetParser to
|
||
|
// stream -> compressedPacketParser -> inflateStream -> packetParser
|
||
|
// note that in the caseof ssl this should become
|
||
|
// stream -> securePair.encrypted -> securePair.cleartext -> compressedPacketParser -> inflateStream -> packetParser
|
||
|
Connection.prototype.startInflate = function() {
|
||
|
var connection = this;
|
||
|
var zlib = require('zlib');
|
||
|
var inflateStream = zlib.createInflate();
|
||
|
var uncompressedPacketParser = connection.packetParser;
|
||
|
connection.packetParser = new PacketParser(function(packet) {
|
||
|
var inflatedLength = packet.readInt24();
|
||
|
if (inflatedLength !== 0) {
|
||
|
inflateStream.write(packet.readBuffer(packet.length() - 3));
|
||
|
} else {
|
||
|
uncompressedPacketParser.execute(packet.buffer, packet.offset, packet.end);
|
||
|
}
|
||
|
});
|
||
|
inflateStream.on('data', function(buff) {
|
||
|
uncompressedPacketParser.execute(buff.parent, buff.offset, buff.offset + buff.length);
|
||
|
});
|
||
|
if (this.stream.ondata)
|
||
|
this.stream.ondata = null;
|
||
|
this.stream.removeAllListeners('data');
|
||
|
this.pipe();
|
||
|
};
|
||
|
|
||
|
Connection.prototype.pipe = function() {
|
||
|
var connection = this;
|
||
|
if (this.stream instanceof net.Stream) {
|
||
|
this.stream.ondata = function(data, start, end) {
|
||
|
connection.packetParser.execute(data, start, end);
|
||
|
};
|
||
|
} else {
|
||
|
this.stream.on('data', function(data) {
|
||
|
connection.packetParser.execute(data.parent, data.offset, data.offset + data.length);
|
||
|
});
|
||
|
}
|
||
|
};
|
||
|
|
||
|
Connection.prototype.handlePacket = function(packet) {
|
||
|
|
||
|
if (this._paused)
|
||
|
{
|
||
|
this._paused_packets.push(packet);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// TODO: check packet sequenceId here
|
||
|
if (packet)
|
||
|
this.sequenceId = packet.sequenceId + 1;
|
||
|
if (this.config.debug) {
|
||
|
if (packet) {
|
||
|
console.log(this._internalId + ' ' + this.connectionId + ' ==> ' + this._command._commandName + '#' + this._command.stateName() + '(' + [packet.sequenceId, packet.type(), packet.length()].join(',') + ')');
|
||
|
console.log(' raw: ' + packet.buffer.slice(packet.offset, packet.offset + packet.length).toString('hex'));
|
||
|
}
|
||
|
}
|
||
|
var done = this._command.execute(packet, this);
|
||
|
if (done) {
|
||
|
this.sequenceId = 0;
|
||
|
this._command = this._commands.shift();
|
||
|
if (this._command)
|
||
|
this.handlePacket();
|
||
|
}
|
||
|
};
|
||
|
|
||
|
Connection.prototype.addCommand = function(cmd) {
|
||
|
if (this.config.debug) {
|
||
|
console.log('Add command: ' + arguments.callee.caller.name);
|
||
|
cmd._commandName = arguments.callee.caller.name;
|
||
|
}
|
||
|
if (!this._command) {
|
||
|
this._command = cmd;
|
||
|
this.handlePacket();
|
||
|
} else {
|
||
|
this._commands.push(cmd);
|
||
|
}
|
||
|
return cmd;
|
||
|
};
|
||
|
|
||
|
Connection.prototype.format = function(sql, values) {
|
||
|
if (typeof this.config.queryFormat == "function") {
|
||
|
return this.config.queryFormat.call(this, sql, values, this.config.timezone);
|
||
|
}
|
||
|
return SqlString.format(sql, values, this.config.stringifyObjects, this.config.timezone);
|
||
|
};
|
||
|
|
||
|
Connection.prototype.escape = function(value) {
|
||
|
return SqlString.escape(value, false, this.config.timezone);
|
||
|
};
|
||
|
|
||
|
function _domainify(callback) {
|
||
|
var domain = process.domain;
|
||
|
if (domain && callback)
|
||
|
return process.domain.bind(callback);
|
||
|
else
|
||
|
return callback;
|
||
|
}
|
||
|
|
||
|
Connection.prototype.query = function query(sql, values, cb) {
|
||
|
// copy-paste from node-mysql/lib/Connection.js:createQuery
|
||
|
var options = {};
|
||
|
if (typeof sql === 'object') {
|
||
|
// query(options, cb)
|
||
|
options = sql;
|
||
|
if (typeof values === 'function') {
|
||
|
cb = values;
|
||
|
} else {
|
||
|
options.values = values;
|
||
|
}
|
||
|
} else if (typeof values === 'function') {
|
||
|
// query(sql, cb)
|
||
|
cb = values;
|
||
|
options.sql = sql;
|
||
|
options.values = undefined;
|
||
|
} else {
|
||
|
// query(sql, values, cb)
|
||
|
options.sql = sql;
|
||
|
options.values = values;
|
||
|
}
|
||
|
var rawSql = this.format(options.sql, options.values || []);
|
||
|
return this.addCommand(new Commands.Query(rawSql, options, _domainify(cb)));
|
||
|
};
|
||
|
|
||
|
Connection.prototype.pause = function pause() {
|
||
|
this._paused = true;
|
||
|
this.stream.pause();
|
||
|
};
|
||
|
|
||
|
Connection.prototype.resume= function resume() {
|
||
|
var packet;
|
||
|
this._paused = false;
|
||
|
while( packet = this._paused_packets.shift() ) {
|
||
|
this.handlePacket(packet);
|
||
|
// don't resume if packet hander paused connection
|
||
|
if (this._paused)
|
||
|
return;
|
||
|
}
|
||
|
this.stream.resume();
|
||
|
};
|
||
|
|
||
|
Connection.prototype.keyFromFields = function keyFromFields(fields, options) {
|
||
|
var res = (typeof options.nestTables) + '/' + options.nestTables + '/' + options.rowsAsHash;
|
||
|
for (var i=0; i < fields.length; ++i)
|
||
|
res += '/' + fields[i].name + ':' + fields[i].columnType + ':' + fields[i].flags;
|
||
|
return res;
|
||
|
}
|
||
|
|
||
|
function statementKey(options) {
|
||
|
return (typeof options.nestTables) +
|
||
|
'/' + options.nestTables + '/' + options.rowsAsHash
|
||
|
+ options.sql;
|
||
|
}
|
||
|
|
||
|
// TODO: named placeholders support
|
||
|
Connection.prototype.prepare = function prepare(options, cb) {
|
||
|
if (typeof options == 'string')
|
||
|
options = { sql: options };
|
||
|
return this.addCommand(new Commands.Prepare(options, _domainify(cb)));
|
||
|
};
|
||
|
|
||
|
Connection.prototype.unprepare = function execute(sql) {
|
||
|
var options = {};
|
||
|
if (typeof sql === 'object') {
|
||
|
options = sql;
|
||
|
} else
|
||
|
options.sql = sql;
|
||
|
var key = statementKey(options);
|
||
|
var stmt = this._statements[key];
|
||
|
if (stmt) {
|
||
|
this._statements[key] = null;
|
||
|
stmt.close();
|
||
|
}
|
||
|
return stmt;
|
||
|
}
|
||
|
|
||
|
Connection.prototype.execute = function execute(sql, values, cb) {
|
||
|
var options = {};
|
||
|
var unnamed;
|
||
|
if (typeof sql === 'object') {
|
||
|
// execute(options, cb)
|
||
|
options = sql;
|
||
|
if (typeof values === 'function') {
|
||
|
cb = values;
|
||
|
} else {
|
||
|
options.values = values;
|
||
|
}
|
||
|
} else if (typeof values === 'function') {
|
||
|
// execute(sql, cb)
|
||
|
cb = values;
|
||
|
options.sql = sql;
|
||
|
options.values = undefined;
|
||
|
} else {
|
||
|
// execute(sql, values, cb)
|
||
|
options.sql = sql;
|
||
|
options.values = values;
|
||
|
}
|
||
|
cb = _domainify(cb);
|
||
|
|
||
|
var connection = this;
|
||
|
var key = statementKey(options);
|
||
|
var statement = connection._statements[key];
|
||
|
|
||
|
options.statement = statement;
|
||
|
var executeCommand = new Commands.Execute(options, cb);
|
||
|
|
||
|
if (!statement) {
|
||
|
connection.prepare(options, function executeStatement(err, stmt) {
|
||
|
if (err) {
|
||
|
if (cb)
|
||
|
cb(err);
|
||
|
else
|
||
|
executeCommand.emit('error', err);
|
||
|
return
|
||
|
}
|
||
|
executeCommand.statement = stmt;
|
||
|
connection._statements[key] = stmt;
|
||
|
connection.addCommand(executeCommand);
|
||
|
});
|
||
|
} else {
|
||
|
connection.addCommand(executeCommand);
|
||
|
}
|
||
|
return executeCommand;
|
||
|
};
|
||
|
|
||
|
<<<<<<< HEAD
|
||
|
Connection.prototype.changeUser = function changeUser(options, callback) {
|
||
|
if (!callback && typeof options === 'function') {
|
||
|
callback = options;
|
||
|
options = {};
|
||
|
}
|
||
|
|
||
|
var charsetNumber = (options.charset)
|
||
|
? ConnectionConfig.getCharsetNumber(options.charset)
|
||
|
: this.config.charsetNumber;
|
||
|
|
||
|
return this.addCommand(new Commands.ChangeUser({
|
||
|
user : options.user || this.config.user,
|
||
|
password : options.password || this.config.password,
|
||
|
passwordSha1 : options.passwordSha1 || this.config.passwordSha1,
|
||
|
database : options.database || this.config.database,
|
||
|
timeout : options.timeout,
|
||
|
charsetNumber : charsetNumber,
|
||
|
currentConfig : this.config
|
||
|
}, _domainify(function(err) {
|
||
|
if (err)
|
||
|
err.fatal = true;
|
||
|
if (callback) callback(err);
|
||
|
})));
|
||
|
=======
|
||
|
if (this.config.namedParameters) {
|
||
|
if (!convertNamedParameters)
|
||
|
convertNamedParameters = require('named-placeholders')();
|
||
|
unnamed = convertNamedParameters(options.sql, options.values);
|
||
|
debugger;
|
||
|
options.sql = unnamed[0];
|
||
|
options.values = unnamed[1];
|
||
|
}
|
||
|
return this.addCommand(new Commands.Execute(options, _domainify(cb)));
|
||
|
>>>>>>> allow to use named placeholders in .execute()
|
||
|
};
|
||
|
|
||
|
// transaction helpers
|
||
|
Connection.prototype.beginTransaction = function(cb) {
|
||
|
return this.query('START TRANSACTION', cb);
|
||
|
}
|
||
|
|
||
|
Connection.prototype.commit = function(cb) {
|
||
|
return this.query('COMMIT', cb);
|
||
|
}
|
||
|
|
||
|
Connection.prototype.rollback = function(cb) {
|
||
|
return this.query('ROLLBACK', cb);
|
||
|
}
|
||
|
|
||
|
Connection.prototype.ping = function ping(cb) {
|
||
|
return this.addCommand(new Commands.Ping(_domainify(cb)));
|
||
|
};
|
||
|
|
||
|
Connection.prototype._registerSlave = function registerSlave(opts, cb) {
|
||
|
return this.addCommand(new Commands.RegisterSlave(opts, _domainify(cb)));
|
||
|
};
|
||
|
|
||
|
Connection.prototype._binlogDump = function binlogDump(opts, cb) {
|
||
|
return this.addCommand(new Commands.BinlogDump(opts, _domainify(cb)));
|
||
|
};
|
||
|
|
||
|
// currently just alias to close
|
||
|
Connection.prototype.destroy = function() {
|
||
|
this.close();
|
||
|
}
|
||
|
|
||
|
Connection.prototype.close = function() {
|
||
|
this._closing = true;
|
||
|
this.stream.end();
|
||
|
}
|
||
|
|
||
|
Connection.prototype.createBinlogStream = function(opts) {
|
||
|
// TODO: create proper stream class
|
||
|
// TODO: use through2
|
||
|
var test = 1;
|
||
|
var Readable = require('stream').Readable;
|
||
|
var stream = new Readable({objectMode: true});
|
||
|
stream._read = function() {
|
||
|
return {
|
||
|
data: test++
|
||
|
}
|
||
|
};
|
||
|
var connection = this;
|
||
|
connection._registerSlave(opts, function(err) {
|
||
|
var dumpCmd = connection._binlogDump(opts);
|
||
|
dumpCmd.on('event', function(ev) {
|
||
|
stream.push(ev);
|
||
|
});
|
||
|
dumpCmd.on('eof', function() {
|
||
|
stream.push(null);
|
||
|
// if non-blocking, then close stream to prevent errors
|
||
|
if (opts.flags && (opts.flags & 0x01)) {
|
||
|
connection.close();
|
||
|
}
|
||
|
});
|
||
|
// TODO: pipe errors as well
|
||
|
})
|
||
|
return stream;
|
||
|
}
|
||
|
|
||
|
Connection.prototype.connect = function(cb) {
|
||
|
if (!cb) return;
|
||
|
var connectCalled = 0;
|
||
|
|
||
|
// TODO domainify this callback as well. Note that domain has to be captured
|
||
|
// at the top of function due to nested callback
|
||
|
function callbackOnce(isErrorHandler) {
|
||
|
return function(param) {
|
||
|
if (!connectCalled) {
|
||
|
if (isErrorHandler)
|
||
|
cb(param);
|
||
|
else
|
||
|
cb(null, param);
|
||
|
}
|
||
|
connectCalled = 1;
|
||
|
};
|
||
|
}
|
||
|
this.once('error', callbackOnce(true) );
|
||
|
this.once('connect', callbackOnce(false));
|
||
|
};
|
||
|
|
||
|
// ===================================
|
||
|
// outgoing server connection methods
|
||
|
// ===================================
|
||
|
|
||
|
Connection.prototype.writeColumns = function(columns) {
|
||
|
var connection = this;
|
||
|
this.writePacket(Packets.ResultSetHeader.toPacket(columns.length));
|
||
|
columns.forEach(function(column) {
|
||
|
connection.writePacket(Packets.ColumnDefinition.toPacket(column));
|
||
|
});
|
||
|
this.writeEof();
|
||
|
};
|
||
|
|
||
|
// row is array of columns, not hash
|
||
|
Connection.prototype.writeTextRow = function(column) {
|
||
|
this.writePacket(Packets.TextRow.toPacket(column));
|
||
|
};
|
||
|
|
||
|
Connection.prototype.writeTextResult = function(rows, columns) {
|
||
|
var connection = this;
|
||
|
connection.writeColumns(columns);
|
||
|
rows.forEach(function(row) {
|
||
|
var arrayRow = new Array(columns.length);
|
||
|
columns.forEach(function(column) {
|
||
|
arrayRow.push(row[column.name]);
|
||
|
});
|
||
|
connection.writeTextRow(arrayRow);
|
||
|
});
|
||
|
connection.writeEof();
|
||
|
};
|
||
|
|
||
|
Connection.prototype.writeEof = function(warnings, statusFlags) {
|
||
|
this.writePacket(Packets.EOF.toPacket(warnings, statusFlags));
|
||
|
};
|
||
|
|
||
|
Connection.prototype.writeOk = function(args) {
|
||
|
if (!args)
|
||
|
args = { affectedRows: 0 };
|
||
|
this.writePacket(Packets.OK.toPacket(args));
|
||
|
};
|
||
|
|
||
|
Connection.prototype.writeError = function(args) {
|
||
|
this.writePacket(Packets.Error.toPacket(args));
|
||
|
};
|
||
|
|
||
|
Connection.prototype.serverHandshake = function serverHandshake(args) {
|
||
|
return this.addCommand(new Commands.ServerHandshake(args));
|
||
|
};
|
||
|
|
||
|
// ===============================================================
|
||
|
|
||
|
// TODO: domainify
|
||
|
Connection.prototype.end = function(callback) {
|
||
|
var connection = this;
|
||
|
// TODO: implement COM_QUIT command
|
||
|
var endCmd = { connection: this };
|
||
|
endCmd.execute = function() {
|
||
|
this.connection.close();
|
||
|
if (callback)
|
||
|
callback();
|
||
|
};
|
||
|
this.addCommand(endCmd);
|
||
|
// trigger error if more commands enqueued after end command
|
||
|
connection.addCommand = function() {
|
||
|
if (connection._closing) {
|
||
|
this.emit(new Error('addCommand() called on closing connection'));
|
||
|
}
|
||
|
};
|
||
|
return endCmd;
|
||
|
//return this.addCommand(new Commands.Quit(callback));
|
||
|
};
|
||
|
|
||
|
module.exports = Connection;
|