Hey guys, Has anyone ever used Q server for R to connect to a kdb load balancer from an R process? It seems to me like two things are missing from this interface: a command to send async and a command to receive data on the handle (both required for mserve). Most other interfaces seem to have these functions out of the box. Would I have to write custom R (and/or C) functions? Any info/tips would be appreciated. Terry
In-Reply-To: <4f50b585-0ca4-4d2d-978b-6c0f3698bc43@googlegroups.com>
To: personal-kdbplus@googlegroups.com
X-Mailer: iPhone Mail (13D15)
Yes the two features are missing. If you checked the source code the call to=
Kdb is always synchronous and there is no way to send an asynchronous messa=
ge. Also the call is always a string. And a deferred call is missing in the q=
server. And yes you need to write your custom c code.
Kim
Sent from my iPhone
Thanks Kim, it’s as I suspected. Strange that it wouldn’t be included by default given how little extra work would have been involved.
I don’t suppose anyone has done this already and could share function definitions??
Terry
The below modification to kx_r_execute in qserver.c works for the async support, e.g. usage
> execute(-h, “myv:99”)
NULL
> execute(h, “myv”)
[1] 99
where h is the value returned from open_connection(…)
SEXP kx_r_execute(SEXP connection, SEXP query)
{
K result;
SEXP s;
kx_connection = INTEGER_VALUE(connection);
result = k(kx_connection, (char*) CHARACTER_VALUE(query), (K)0);
if (0 == result) {
error(“Error: not connected to kdb+ server\n”);
}
// support for async IPC
else if (kx_connection < 0) {
return R_NilValue;
}
else if (-128 == result->t) {
char *e = calloc(strlen(result->s) + 1, 1);
strcpy(e, result->s);
r0(result);
error(“Error from kdb+: `%s\n”, e);
}
s = from_any_kobject(result);
r0(result);
return s;
}
James
Great, thanks James!
I’ve used that and also written the following “receive” function in qserver.c to listen for a response on the connection:
SEXP kx_r_receive(SEXP connection)
{
K result;
SEXP s;
kx_connection = INTEGER_VALUE(connection);
result = k(kx_connection, (S)0);
s = from_any_kobject(result);
return s;
}
Then in qserver.R I define:
receive <- function(connection) {
.Call(“kx_r_receive”, as.integer(connection))
}
The required call to the mserve master can then be executed from R as:
res = execute(-h , query);receive(h)
Works!
Terry
Yes, I was not precise when replying to this thread. Of course you can use a negative handle and this will turn to asynchronous call.<o:p></o:p>
<o:p> </o:p>
However you need at last a deferred call to handle incoming messages. But using a deferred call will block your R instances and this is something you dont want to do in my opinion.<o:p></o:p>
<o:p> </o:p>
Btw has anyone thought about rewriting the rserver using this library http://www.rcpp.org/ ?<o:p></o:p>
<o:p> </o:p>
Kim<o:p></o:p>
<o:p> </o:p>
Von: personal-kdbplus@googlegroups.com [mailto:personal-kdbplus@googlegroups.com] Im Auftrag von TerryLynch
Gesendet: Mittwoch, 17. Februar 2016 23:45
An: Kdb+ Personal Developers
Betreff: Re: [personal kdb+] Using “Q server for R” in conjunction with mserve<o:p></o:p>
<o:p> </o:p>
Great, thanks James!<o:p></o:p>
<o:p> </o:p>
I’ve used that and also written the following “receive” function in qserver.c to listen for a response on the connection:<o:p></o:p>
<o:p> </o:p>
SEXP kx_r_receive(SEXP connection)<o:p></o:p>
{<o:p></o:p>
K result;<o:p></o:p>
SEXP s;<o:p></o:p>
kx_connection = INTEGER_VALUE(connection);<o:p></o:p>
<o:p> </o:p>
result = k(kx_connection, (S)0);<o:p></o:p>
<o:p></o:p>
s = from_any_kobject(result);<o:p></o:p>
<o:p></o:p>
return s;<o:p></o:p>
}<o:p></o:p>
<o:p> </o:p>
<o:p> </o:p>
Then in qserver.R I define:<o:p></o:p>
<o:p> </o:p>
receive <- function(connection) {<o:p></o:p>
.Call(“kx_r_receive”, as.integer(connection))<o:p></o:p>
}<o:p></o:p>
<o:p> </o:p>
The required call to the mserve master can then be executed from R as:<o:p></o:p>
<o:p> </o:p>
res = execute(-h , query);receive(h)<o:p></o:p>
<o:p> </o:p>
Works!<o:p></o:p>
<o:p> </o:p>
Terry<o:p></o:p>
<o:p> </o:p>
<o:p> </o:p>
On Wednesday, February 17, 2016 at 9:17:15 AM UTC-5, James Little wrote:<o:p></o:p>
The below modification to kx_r_execute in qserver.c works for the async support, e.g. usage<o:p></o:p>
<o:p> </o:p>
> execute(-h, “myv:99”)<o:p></o:p>
NULL<o:p></o:p>
> execute(h, “myv”)<o:p></o:p>
[1] 99<o:p></o:p>
<o:p> </o:p>
where h is the value returned from open_connection(…)<o:p></o:p>
<o:p> </o:p>
<o:p> </o:p>
SEXP kx_r_execute(SEXP connection, SEXP query)<o:p></o:p>
{<o:p></o:p>
K result;<o:p></o:p>
SEXP s;<o:p></o:p>
kx_connection = INTEGER_VALUE(connection);<o:p></o:p>
<o:p> </o:p>
result = k(kx_connection, (char*) CHARACTER_VALUE(query), (K)0);<o:p></o:p>
<o:p> </o:p>
if (0 == result) {<o:p></o:p>
error(“Error: not connected to kdb+ server\n”);<o:p></o:p>
}<o:p></o:p>
// support for async IPC<o:p></o:p>
else if (kx_connection < 0) {<o:p></o:p>
return R_NilValue;<o:p></o:p>
}<o:p></o:p>
else if (-128 == result->t) {<o:p></o:p>
char *e = calloc(strlen(result->s) + 1, 1);<o:p></o:p>
strcpy(e, result->s);<o:p></o:p>
r0(result);<o:p></o:p>
error(“Error from kdb+: `%s\n”, e);<o:p></o:p>
}<o:p></o:p>
s = from_any_kobject(result);<o:p></o:p>
r0(result);<o:p></o:p>
return s;<o:p></o:p>
}<o:p></o:p>
<o:p> </o:p>
<o:p> </o:p>
James<o:p></o:p>
<o:p> </o:p>
On 16 February 2016 at 23:44, TerryLynch <tlyn…@gmail.com> wrote:<o:p></o:p>
Thanks Kim, it’s as I suspected. Strange that it wouldn’t be included by default given how little extra work would have been involved.<o:p></o:p>
<o:p> </o:p>
I don’t suppose anyone has done this already and could share function definitions?? <o:p></o:p>
<o:p> </o:p>
Terry<o:p></o:p>
There was also an issue with segfaults when trying to use a negative handle previously as kx_r_execute was always expecting a K type to be returned (or 0), so that was worth fixing in itself.