#include #include #include #include #include #include #define LOG(msg) printf("%s\n",msg); #define TRACE(msg) //printf("%s\n", msg); #if PG_VERSION_NUM >= 90000 #define ESCAPE_SUPPORTED #endif #if PG_VERSION_NUM >= 90200 #define SINGLE_ROW_SUPPORTED #endif #define THROW(msg) NanThrowError(msg); NanReturnUndefined(); using namespace v8; using namespace node; class Connection : public ObjectWrap { public: //creates the V8 objects & attaches them to the module (target) static void Init (Handle target) { NanScope(); Local t = NanNew(New); t->InstanceTemplate()->SetInternalFieldCount(1); t->SetClassName(NanNew("Connection")); NanSetPrototypeTemplate(t, "connect", NanNew(Connect)); #ifdef ESCAPE_SUPPORTED NanSetPrototypeTemplate(t, "escapeIdentifier", NanNew(EscapeIdentifier)); NanSetPrototypeTemplate(t, "escapeLiteral", NanNew(EscapeLiteral)); #endif NanSetPrototypeTemplate(t, "_sendQuery", NanNew(SendQuery)); NanSetPrototypeTemplate(t, "_sendQueryWithParams", NanNew(SendQueryWithParams)); NanSetPrototypeTemplate(t, "_sendPrepare", NanNew(SendPrepare)); NanSetPrototypeTemplate(t, "_sendQueryPrepared", NanNew(SendQueryPrepared)); NanSetPrototypeTemplate(t, "cancel", NanNew(Cancel)); NanSetPrototypeTemplate(t, "end", NanNew(End)); NanSetPrototypeTemplate(t, "_sendCopyFromChunk", NanNew(SendCopyFromChunk)); NanSetPrototypeTemplate(t, "_endCopyFrom", NanNew(EndCopyFrom)); target->Set(NanNew("Connection"), t->GetFunction()); TRACE("created class"); } //static function called by libuv as callback entrypoint static void io_event(uv_poll_t* w, int status, int revents) { TRACE("Received IO event"); if(status == -1) { TRACE("Connection error. -1 status from lib_uv_poll"); } Connection *connection = static_cast(w->data); connection->HandleIOEvent(revents); } //v8 entry point into Connection#connect static NAN_METHOD(Connect) { NanScope(); Connection *self = ObjectWrap::Unwrap(args.This()); if(args.Length() == 0 || !args[0]->IsString()) { THROW("Must include connection string as only argument to connect"); } String::Utf8Value conninfo(args[0]->ToString()); bool success = self->Connect(*conninfo); if(!success) { self -> EmitLastError(); self -> DestroyConnection(); } NanReturnUndefined(); } //v8 entry point into Connection#cancel static NAN_METHOD(Cancel) { NanScope(); Connection *self = ObjectWrap::Unwrap(args.This()); bool success = self->Cancel(); if(!success) { self -> EmitLastError(); self -> DestroyConnection(); } NanReturnUndefined(); } #ifdef ESCAPE_SUPPORTED //v8 entry point into Connection#escapeIdentifier static NAN_METHOD(EscapeIdentifier) { NanScope(); Connection *self = ObjectWrap::Unwrap(args.This()); char* inputStr = MallocCString(args[0]); if(!inputStr) { THROW("Unable to allocate memory for a string in EscapeIdentifier.") } char* escapedStr = self->EscapeIdentifier(inputStr); free(inputStr); if(escapedStr == NULL) { THROW(self->GetLastError()); } Local jsStr = NanNew(escapedStr, strlen(escapedStr)); PQfreemem(escapedStr); NanReturnValue(jsStr); } //v8 entry point into Connection#escapeLiteral static NAN_METHOD(EscapeLiteral) { NanScope(); Connection *self = ObjectWrap::Unwrap(args.This()); char* inputStr = MallocCString(args[0]); if(!inputStr) { THROW("Unable to allocate memory for a string in EscapeIdentifier.") } char* escapedStr = self->EscapeLiteral(inputStr); free(inputStr); if(escapedStr == NULL) { THROW(self->GetLastError()); } Local jsStr = NanNew(escapedStr, strlen(escapedStr)); PQfreemem(escapedStr); NanReturnValue(jsStr); } #endif //v8 entry point into Connection#_sendQuery static NAN_METHOD(SendQuery) { NanScope(); Connection *self = ObjectWrap::Unwrap(args.This()); const char *lastErrorMessage; if(!args[0]->IsString()) { THROW("First parameter must be a string query"); } char* queryText = MallocCString(args[0]); bool singleRowMode = (bool)args[1]->Int32Value(); int result = self->Send(queryText, singleRowMode); free(queryText); if(result == 0) { lastErrorMessage = self->GetLastError(); THROW(lastErrorMessage); } //TODO should we flush before throw? self->Flush(); NanReturnUndefined(); } //v8 entry point into Connection#_sendQueryWithParams static NAN_METHOD(SendQueryWithParams) { NanScope(); //dispatch non-prepared parameterized query DispatchParameterizedQuery(args, false); NanReturnUndefined(); } //v8 entry point into Connection#_sendPrepare(string queryName, string queryText, int nParams) static NAN_METHOD(SendPrepare) { NanScope(); Connection *self = ObjectWrap::Unwrap(args.This()); String::Utf8Value queryName(args[0]); String::Utf8Value queryText(args[1]); int length = args[2]->Int32Value(); bool singleRowMode = (bool)args[3]->Int32Value(); self->SendPrepare(*queryName, *queryText, length, singleRowMode); NanReturnUndefined(); } //v8 entry point into Connection#_sendQueryPrepared(string queryName, string[] paramValues) static NAN_METHOD(SendQueryPrepared) { NanScope(); //dispatch prepared parameterized query DispatchParameterizedQuery(args, true); NanReturnUndefined(); } static void DispatchParameterizedQuery(_NAN_METHOD_ARGS, bool isPrepared) { NanScope(); Connection *self = ObjectWrap::Unwrap(args.This()); String::Utf8Value queryName(args[0]); //TODO this is much copy/pasta code if(!args[0]->IsString()) { NanThrowError("First parameter must be a string"); return; } if(!args[1]->IsArray()) { NanThrowError("Values must be an array"); return; } Local jsParams = Local::Cast(args[1]); int len = jsParams->Length(); char** paramValues = ArgToCStringArray(jsParams); if(!paramValues) { NanThrowError("Unable to allocate char **paramValues from Local of v8 params"); return; } char* queryText = MallocCString(args[0]); bool singleRowMode = (bool)args[2]->Int32Value(); int result = 0; if(isPrepared) { result = self->SendPreparedQuery(queryText, len, paramValues, singleRowMode); } else { result = self->SendQueryParams(queryText, len, paramValues, singleRowMode); } free(queryText); ReleaseCStringArray(paramValues, len); if(result == 1) { return; } self->EmitLastError(); NanThrowError("Postgres returned non-1 result from query dispatch."); } //v8 entry point into Connection#end static NAN_METHOD(End) { NanScope(); Connection *self = ObjectWrap::Unwrap(args.This()); self->End(); NanReturnUndefined(); } uv_poll_t read_watcher_; uv_poll_t write_watcher_; PGconn *connection_; bool connecting_; bool ioInitialized_; bool copyOutMode_; bool copyInMode_; bool reading_; bool writing_; bool ended_; Connection () : ObjectWrap () { connection_ = NULL; connecting_ = false; ioInitialized_ = false; copyOutMode_ = false; copyInMode_ = false; reading_ = false; writing_ = false; ended_ = false; TRACE("Initializing ev watchers"); read_watcher_.data = this; write_watcher_.data = this; } ~Connection () { } static NAN_METHOD(SendCopyFromChunk) { NanScope(); Connection *self = ObjectWrap::Unwrap(args.This()); //TODO handle errors in some way if (args.Length() < 1 && !Buffer::HasInstance(args[0])) { THROW("SendCopyFromChunk requires 1 Buffer argument"); } self->SendCopyFromChunk(args[0]->ToObject()); NanReturnUndefined(); } static NAN_METHOD(EndCopyFrom) { NanScope(); Connection *self = ObjectWrap::Unwrap(args.This()); char * error_msg = NULL; if (args[0]->IsString()) { error_msg = MallocCString(args[0]); } //TODO handle errors in some way self->EndCopyFrom(error_msg); free(error_msg); NanReturnUndefined(); } protected: //v8 entry point to constructor static NAN_METHOD(New) { NanScope(); Connection *connection = new Connection(); connection->Wrap(args.This()); NanReturnValue(args.This()); } #ifdef ESCAPE_SUPPORTED char * EscapeIdentifier(const char *str) { TRACE("js::EscapeIdentifier") return PQescapeIdentifier(connection_, str, strlen(str)); } char * EscapeLiteral(const char *str) { TRACE("js::EscapeLiteral") return PQescapeLiteral(connection_, str, strlen(str)); } #endif void enableSingleRowMode(bool enable) { #ifdef SINGLE_ROW_SUPPORTED if(enable == true) { int mode = PQsetSingleRowMode(connection_); if(mode == 1) { TRACE("PQsetSingleRowMode enabled") } else { TRACE("PQsetSingleRowMode disabled") } } else { TRACE("PQsetSingleRowMode disabled") } #endif } int Send(const char *queryText, bool singleRowMode) { TRACE("js::Send") int rv = PQsendQuery(connection_, queryText); enableSingleRowMode(singleRowMode); StartWrite(); return rv; } int SendQueryParams(const char *command, const int nParams, const char * const *paramValues, bool singleRowMode) { TRACE("js::SendQueryParams") int rv = PQsendQueryParams(connection_, command, nParams, NULL, paramValues, NULL, NULL, 0); enableSingleRowMode(singleRowMode); StartWrite(); return rv; } int SendPrepare(const char *name, const char *command, const int nParams, bool singleRowMode) { TRACE("js::SendPrepare") int rv = PQsendPrepare(connection_, name, command, nParams, NULL); enableSingleRowMode(singleRowMode); StartWrite(); return rv; } int SendPreparedQuery(const char *name, int nParams, const char * const *paramValues, bool singleRowMode) { int rv = PQsendQueryPrepared(connection_, name, nParams, paramValues, NULL, NULL, 0); enableSingleRowMode(singleRowMode); StartWrite(); return rv; } bool Cancel() { PGcancel* pgCancel = PQgetCancel(connection_); char errbuf[256]; int result = PQcancel(pgCancel, errbuf, 256); StartWrite(); PQfreeCancel(pgCancel); return result; } //flushes socket void Flush() { if(PQflush(connection_) == 1) { TRACE("Flushing"); uv_poll_start(&write_watcher_, UV_WRITABLE, io_event); } } //safely destroys the connection at most 1 time void DestroyConnection() { if(connection_ != NULL) { PQfinish(connection_); connection_ = NULL; } } //initializes initial async connection to postgres via libpq //and hands off control to libev bool Connect(const char* conninfo) { if(ended_) return true; connection_ = PQconnectStart(conninfo); if (!connection_) { LOG("Connection couldn't be created"); } ConnStatusType status = PQstatus(connection_); if(CONNECTION_BAD == status) { return false; } if (PQsetnonblocking(connection_, 1) == -1) { LOG("Unable to set connection to non-blocking"); return false; } int fd = PQsocket(connection_); if(fd < 0) { LOG("socket fd was negative. error"); return false; } assert(PQisnonblocking(connection_)); PQsetNoticeProcessor(connection_, NoticeReceiver, this); TRACE("Setting watchers to socket"); uv_poll_init(uv_default_loop(), &read_watcher_, fd); uv_poll_init(uv_default_loop(), &write_watcher_, fd); ioInitialized_ = true; connecting_ = true; StartWrite(); Ref(); return true; } static void NoticeReceiver(void *arg, const char *message) { Connection *self = (Connection*)arg; self->HandleNotice(message); } void HandleNotice(const char *message) { NanScope(); Handle notice = NanNew(message); Emit("notice", ¬ice); } //called to process io_events from libuv void HandleIOEvent(int revents) { if(connecting_) { TRACE("Processing connecting_ io"); HandleConnectionIO(); return; } if(revents & UV_READABLE) { TRACE("revents & UV_READABLE"); TRACE("about to consume input"); if(PQconsumeInput(connection_) == 0) { TRACE("could not read, terminating"); End(); EmitLastError(); //LOG("Something happened, consume input is 0"); return; } TRACE("Consumed"); //declare handlescope as this method is entered via a libuv callback //and not part of the public v8 interface NanScope(); if (this->copyOutMode_) { this->HandleCopyOut(); } if (!this->copyInMode_ && !this->copyOutMode_ && PQisBusy(connection_) == 0) { PGresult *result; bool didHandleResult = false; TRACE("PQgetResult"); while ((result = PQgetResult(connection_))) { TRACE("HandleResult"); didHandleResult = HandleResult(result); TRACE("PQClear"); PQclear(result); if(!didHandleResult) { //this means that we are in copy in or copy out mode //in this situation PQgetResult will return same //result untill all data will be read (copy out) or //until data end notification (copy in) //and because of this, we need to break cycle break; } } //might have fired from notification if(didHandleResult) { Emit("_readyForQuery"); } } PGnotify *notify; TRACE("PQnotifies"); while ((notify = PQnotifies(connection_))) { Local result = NanNew(); result->Set(NanNew("channel"), NanNew(notify->relname)); result->Set(NanNew("payload"), NanNew(notify->extra)); Handle res = (Handle)result; Emit("notification", &res); PQfreemem(notify); } } if(revents & UV_WRITABLE) { TRACE("revents & UV_WRITABLE"); if (PQflush(connection_) == 0) { //nothing left to write, poll the socket for more to read StartRead(); } } } bool HandleCopyOut () { char * buffer = NULL; int copied; copied = PQgetCopyData(connection_, &buffer, 1); while (copied > 0) { Local node_chunk = NanNewBufferHandle(buffer, copied); Emit("copyData", &node_chunk); PQfreemem(buffer); copied = PQgetCopyData(connection_, &buffer, 1); } if (copied == 0) { //wait for next read ready //result was not handled completely return false; } else if (copied == -1) { this->copyOutMode_ = false; return true; } else if (copied == -2) { this->copyOutMode_ = false; return true; } return false; } //maps the postgres tuple results to v8 objects //and emits row events //TODO look at emitting fewer events because the back & forth between //javascript & c++ might introduce overhead (requires benchmarking) void EmitRowDescription(const PGresult* result) { NanScope(); Local row = NanNew(); int fieldCount = PQnfields(result); for(int fieldNumber = 0; fieldNumber < fieldCount; fieldNumber++) { Local field = NanNew(); //name of field char* fieldName = PQfname(result, fieldNumber); field->Set(NanNew("name"), NanNew(fieldName)); //oid of type of field int fieldType = PQftype(result, fieldNumber); field->Set(NanNew("dataTypeID"), NanNew(fieldType)); row->Set(NanNew(fieldNumber), field); } Handle e = (Handle)row; Emit("_rowDescription", &e); } bool HandleResult(PGresult* result) { TRACE("PQresultStatus"); ExecStatusType status = PQresultStatus(result); switch(status) { case PGRES_TUPLES_OK: #ifdef SINGLE_ROW_SUPPORTED case PGRES_SINGLE_TUPLE: #endif { EmitRowDescription(result); HandleTuplesResult(result); EmitCommandMetaData(result); return true; } break; case PGRES_FATAL_ERROR: { TRACE("HandleErrorResult"); HandleErrorResult(result); return true; } break; case PGRES_COMMAND_OK: case PGRES_EMPTY_QUERY: { EmitCommandMetaData(result); return true; } break; case PGRES_COPY_IN: { this->copyInMode_ = true; Emit("copyInResponse"); return false; } break; case PGRES_COPY_OUT: { this->copyOutMode_ = true; Emit("copyOutResponse"); return this->HandleCopyOut(); } break; default: printf("YOU SHOULD NEVER SEE THIS! PLEASE OPEN AN ISSUE ON GITHUB! Unrecogized query status: %s\n", PQresStatus(status)); break; } return true; } void EmitCommandMetaData(PGresult* result) { NanScope(); Local info = NanNew(); info->Set(NanNew("command"), NanNew(PQcmdStatus(result))); info->Set(NanNew("value"), NanNew(PQcmdTuples(result))); Handle e = (Handle)info; Emit("_cmdStatus", &e); } //maps the postgres tuple results to v8 objects //and emits row events //TODO look at emitting fewer events because the back & forth between //javascript & c++ might introduce overhead (requires benchmarking) void HandleTuplesResult(const PGresult* result) { NanScope(); int rowCount = PQntuples(result); for(int rowNumber = 0; rowNumber < rowCount; rowNumber++) { //create result object for this row Local row = NanNew(); int fieldCount = PQnfields(result); for(int fieldNumber = 0; fieldNumber < fieldCount; fieldNumber++) { //value of field if(PQgetisnull(result, rowNumber, fieldNumber)) { row->Set(NanNew(fieldNumber), NanNull()); } else { char* fieldValue = PQgetvalue(result, rowNumber, fieldNumber); row->Set(NanNew(fieldNumber), NanNew(fieldValue)); } } Handle e = (Handle)row; Emit("_row", &e); } } void HandleErrorResult(const PGresult* result) { NanScope(); //instantiate the return object as an Error with the summary Postgres message TRACE("ReadResultField"); const char* errorMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); if(!errorMessage) { //there is no error, it has already been consumed in the last //read-loop callback return; } Local msg = Local::Cast(NanError(errorMessage)); TRACE("AttachErrorFields"); //add the other information returned by Postgres to the error object AttachErrorField(result, msg, NanNew("severity"), PG_DIAG_SEVERITY); AttachErrorField(result, msg, NanNew("code"), PG_DIAG_SQLSTATE); AttachErrorField(result, msg, NanNew("detail"), PG_DIAG_MESSAGE_DETAIL); AttachErrorField(result, msg, NanNew("hint"), PG_DIAG_MESSAGE_HINT); AttachErrorField(result, msg, NanNew("position"), PG_DIAG_STATEMENT_POSITION); AttachErrorField(result, msg, NanNew("internalPosition"), PG_DIAG_INTERNAL_POSITION); AttachErrorField(result, msg, NanNew("internalQuery"), PG_DIAG_INTERNAL_QUERY); AttachErrorField(result, msg, NanNew("where"), PG_DIAG_CONTEXT); AttachErrorField(result, msg, NanNew("file"), PG_DIAG_SOURCE_FILE); AttachErrorField(result, msg, NanNew("line"), PG_DIAG_SOURCE_LINE); AttachErrorField(result, msg, NanNew("routine"), PG_DIAG_SOURCE_FUNCTION); Handle m = msg; TRACE("EmitError"); Emit("_error", &m); } void AttachErrorField(const PGresult *result, const Local msg, const Local symbol, int fieldcode) { NanScope(); char *val = PQresultErrorField(result, fieldcode); if(val) { msg->Set(symbol, NanNew(val)); } } void End() { TRACE("stopping read & write"); StopRead(); StopWrite(); DestroyConnection(); Emit("_end"); ended_ = true; } private: //EventEmitter was removed from c++ in node v0.5.x void Emit(const char* message) { NanScope(); Handle args[1] = { NanNew(message) }; Emit(1, args); } void Emit(const char* message, Handle* arg) { NanScope(); Handle args[2] = { NanNew(message), *arg }; Emit(2, args); } void Emit(int length, Handle *args) { NanScope(); Local emit_v = NanObjectWrapHandle(this)->Get(NanNew("emit")); assert(emit_v->IsFunction()); Local emit_f = emit_v.As(); TryCatch tc; emit_f->Call(NanObjectWrapHandle(this), length, args); if(tc.HasCaught()) { FatalException(tc); } } void HandleConnectionIO() { PostgresPollingStatusType status = PQconnectPoll(connection_); switch(status) { case PGRES_POLLING_READING: TRACE("Polled: PGRES_POLLING_READING"); StartRead(); break; case PGRES_POLLING_WRITING: TRACE("Polled: PGRES_POLLING_WRITING"); StartWrite(); break; case PGRES_POLLING_FAILED: StopRead(); StopWrite(); TRACE("Polled: PGRES_POLLING_FAILED"); EmitLastError(); break; case PGRES_POLLING_OK: TRACE("Polled: PGRES_POLLING_OK"); connecting_ = false; StartRead(); Emit("connect"); default: //printf("Unknown polling status: %d\n", status); break; } } void EmitError(const char *message) { NanScope(); Local exception = NanError(message); Emit("_error", &exception); } void EmitLastError() { EmitError(PQerrorMessage(connection_)); } const char *GetLastError() { return PQerrorMessage(connection_); } void StopWrite() { TRACE("write STOP"); if(ioInitialized_ && writing_) { uv_poll_stop(&write_watcher_); writing_ = false; } } void StartWrite() { TRACE("write START"); if(reading_) { TRACE("stop READ to start WRITE"); StopRead(); } uv_poll_start(&write_watcher_, UV_WRITABLE, io_event); writing_ = true; } void StopRead() { TRACE("read STOP"); if(ioInitialized_ && reading_) { uv_poll_stop(&read_watcher_); reading_ = false; } } void StartRead() { TRACE("read START"); if(writing_) { TRACE("stop WRITE to start READ"); StopWrite(); } uv_poll_start(&read_watcher_, UV_READABLE, io_event); reading_ = true; } //Converts a v8 array to an array of cstrings //the result char** array must be free() when it is no longer needed //if for any reason the array cannot be created, returns 0 static char** ArgToCStringArray(Local params) { int len = params->Length(); char** paramValues = new char*[len]; for(int i = 0; i < len; i++) { Handle val = params->Get(i); if(val->IsString()) { char* cString = MallocCString(val); //will be 0 if could not malloc if(!cString) { LOG("ArgToCStringArray: OUT OF MEMORY OR SOMETHING BAD!"); ReleaseCStringArray(paramValues, i-1); return 0; } paramValues[i] = cString; } else if(val->IsNull()) { paramValues[i] = NULL; } else if(val->IsObject() && Buffer::HasInstance(val)) { char *cHexString = MallocCHexString(val->ToObject()); if(!cHexString) { LOG("ArgToCStringArray: OUT OF MEMORY OR SOMETHING BAD!"); ReleaseCStringArray(paramValues, i-1); return 0; } paramValues[i] = cHexString; } else { //a paramter was not a string LOG("Parameter not a string or buffer"); ReleaseCStringArray(paramValues, i-1); return 0; } } return paramValues; } //helper function to release cString arrays static void ReleaseCStringArray(char **strArray, int len) { for(int i = 0; i < len; i++) { free(strArray[i]); } delete [] strArray; } //helper function to malloc new string from v8string static char* MallocCString(v8::Handle v8String) { String::Utf8Value utf8String(v8String->ToString()); char *cString = (char *) malloc(strlen(*utf8String) + 1); if(!cString) { return cString; } strcpy(cString, *utf8String); return cString; } //helper function to Malloc a Bytea encoded Hex string from a buffer static char* MallocCHexString(v8::Handle buf) { char* bufferData = Buffer::Data(buf); size_t hexStringLen = Buffer::Length(buf)*2 + 3; char *cHexString = (char *) malloc(hexStringLen); if(!cHexString) { return cHexString; } strcpy(cHexString, "\\x"); for (uint32_t i = 0, k = 2; k < hexStringLen; i += 1, k += 2) { static const char hex[] = "0123456789abcdef"; uint8_t val = static_cast(bufferData[i]); cHexString[k + 0] = hex[val >> 4]; cHexString[k + 1] = hex[val & 15]; } cHexString[hexStringLen-1] = 0; return cHexString; } void SendCopyFromChunk(Handle chunk) { PQputCopyData(connection_, Buffer::Data(chunk), Buffer::Length(chunk)); } void EndCopyFrom(char * error_msg) { PQputCopyEnd(connection_, error_msg); this->copyInMode_ = false; } }; extern "C" void init (Handle target) { NanScope(); Connection::Init(target); } NODE_MODULE(binding, init)