Query Routing/ Load Balancing / asynchronous callbacks

https://learninghub.kx.com/forums/topic/query-routing-load-balancing-asynchronous-callbacks

Hi all,

I've had a frustrating experience attempting to set up a load balancer and query router on a collection of q processes both started independently and with developer. I wondered if anyone can see the schoolboy error or has any insight to share.
I've tried:
1) using the base load balancing server script:
https://github.com/KxSystems/kdb/blob/master/e/mserve.q
Process:
start 4 q processes on ports 5001 to 5004
start a 5th process with this script loaded on start.
try loading an HDB and querying directly on that 5th process or by sending queries to that 5th process with a Developer session set up as a client.
I get nothing back
2) using the full query router
https://code.kx.com/q/wp/query-routing/
Process:
start up 3 processes on ports 5001 to 5003 with gateway, loadBalancer and service scripts loaded to them (the start order i use is load balancer, then service, then gateway). The only changes I made to the scripts was to add the text from the white paper to aid my understanding and to define a random number generated to initialise the SEQ variable in the gateway script. The service script actually just creates dummy tables but I wanted to make as few changes as possible to get the POC running.
In any case, when I try connecting to gateway with Developer as client. I have no success. I have added the scripts in the post below as I can't attach more than one file at a time and it appears the system is clever enough to thwart my attempt to get around the restriction on attaching zips.
From an earlier post to the Community, I understand that Developer doesn't actually support the use of peach with separate processes rather than threads. This has me thinking that one of the below is true:
  • I am screwing up the code implementation
  • I am screwing up the code implementation AND can't use developer with these multiprocess architectures
  • the code as shown in the above links contains errors
  • There is a bug in the q version available separate to the platform offering that means you can't do asynchronous callbacks on either Developer or a raw q process.
I hope minds greater than my own might be able to cast light on which of these 4 options is true and what the solution might be. In summary, at present I am yet to successfully execute an asynchronous callback.
Regards,
Simon

Gateway

//////////////////////// // Gateway //////////////////////// 
// https://code.kx.com/q/wp/query-routing/# 
//////////////////////// 
// loading order: 
// * load balancer: p 5001 
// * service: p 5002 
// * gateway: p 5003 
// * client 
//////////////////////// 
// When a connection is opened to the Load Balancer, the handle is set to the variable LB, which will be referenced throughout this paper. 
// As asynchronous messages are used throughout this framework, we also create the variable NLB, 
// which is assigned with the negative handle to the load balancer. 
// p 5555 p 5003 manageConn:{@[{NLB::neg LB::hopen x};`:localhost:5001;{show x}]}; registerGWFunc:{addResource LB(`registerGW;`)}; 
// The gateway connects to the Load Balancer and retrieves the addresses of all service resources, establishing a connection to each. 
// This is the only time the gateway uses synchronous IPC communication to ensure it has all of the details it requires before accepting 
// user queries. After the gateway registers itself as a subscriber for any new resources that come available, all future communication is 
// sent via asynchronous messages. 
resources:([address:()] source:();sh:()); 
addResource:{ `resources upsert `address xkey update sh:{hopen first x}each address from x }; 
// The gateway process creates and maintains an empty query table. The complexity of this table is at the developers discretion. In this example well record: 
// * Unique sequence number per query (sq) 
// * Handle from user process (uh) 
// * Timestamps for when the query was received, when the query got sent to an available resource, and when the query results are sent back 
// to the user (rec, snt, ret respectively) 
// * The user ID (user) 
// * The service handle (sh) 
// * The service requested by user (serv) 
// * The users query 
queryTable:([sq:`int$()]; uh:`int$(); rec:`timestamp$(); snt:`timestamp$(); ret:`timestamp$(); usr:`$(); sh:`int$(); serv:`$(); query:() ); 
// This table could be extended to include more information by making small changes to the code in this paper. 
// These fields could include the status of a query, error messages received from service or the total time a query took from start to end. 
// As mentioned previously, users make requests by calling the userQuery function on the gateway. This function takes a two-item list argument: (Service;Query). 
// The gateway will validate the existence of a service matching the name passed to userQuery and send an error if no such resource exists. 
// We are setting outside the scope of this paper any further request validation, including access permissioning. 
// For further details on access control, please refer to the technical white paper "Permissions with kdb+". 
// When a user sends her query via the userQuery function, we assign the query a unique sequence number and publish an asynchronous request to the 
// Load Balancer to be assigned an available resource. 
// initialise the query id generator. 
SEQ: first 1?0; userQuery:{ $[(serv:x 0) in exec distinct source from resources; // valid service? [queryTable,:(SEQ+:1;.z.w;.z.p;0Np;0Np;.z.u;0N;serv;x 1); NLB(`requestService;SEQ;serv)]; (neg .z.w)(`$"Service Unavailable")] }; 
// The addResource function defined earlier is used to add new service instances to the plant, while the serviceAlloc function is used to 
// pass back an allocated resource for a given query sequence number. The query is retrieved by its sequence number from queryTable and 
// sent to the allocated service resource. If the user has since disconnected from the gateway before a resource could be provided, the gateway 
// informs the Load Balancer to make this resource free again by executing the returnService function in the Load Balancer. After each event, 
// the timestamp fields are updated within the queryTable. serviceAlloc:{[sq;addr] $[null queryTable[sq;`uh]; 
// Check if user is still waiting on results 
NLB(`returnService;sq); 
// Service no longer required [(neg sh:resources[addr;`sh]) (`queryService;(sq;queryTable[sq;`query])); 
// Send query to allocated resource, update queryTable queryTable[sq;`snt`sh]:(.z.p;sh)]] }; 
// When a service returns results to the gateway, the results arrive tagged with the same sequence number sent in the original query. This 
// incoming message packet executes the returnRes function, which uses the sequence number to identify the user handle and return the results. 
// If the user has disconnected before the results can be returned then the user handle field uh will be set to null (through the .z.pc trigger) 
// causing nothing further to be done. returnRes:{[res] uh:first exec uh from queryTable where sq=(res 0); 
// (res 0) is the sequence number if[not null uh;(neg uh)(res 1)]; 
// (res 1) is the result queryTable[(res 0);`ret]:.z.p }; 
// In the situation where a process disconnects from the gateway, .z.pc establishes what actions to take. As mentioned, a disconnected user 
// will cause queryTable to be updated with a null user handle. If the user currently has no outstanding queries, the gateway has nothing to 
// do. If a service disconnects from the gateway whilst processing an outstanding user request, then all users that have outstanding 
// requests to this database are informed and the database is purged from the available resources table. 
// If our Load Balancer connection has dropped, all users with queued queries will be informed. All connections are disconnected and purged 
// from the resources table. This ensures that all new queries will be returned directly to users as the Load Balancer is unavailable to 
// respond to their request. A timer is set to attempt to reconnect to the Load Balancer. On reconnection, the gateway will re-register 
// itself, pull all available resources and establish new connections. The .z.ts trigger is executed once, on script startup, 
// to initialize and register the process. .z.pc:{[handle] 
// if handle is for a user process, set the query handle (uh) as null update uh:0N from `queryTable where uh=handle; 
// if handle is for a resource process, remove from resources delete from `resources where sh=handle; 
// if any user query is currently being processed on the service which 
// disconnected, send message to user 
if[count sq:exec distinct sq from queryTable where sh=handle,null ret; returnRes'[sq cross `$"Service Disconnect"]]; if[handle~LB; 
// if handle is Load Balancer 
// Send message to each connected user, which has not received results 
(neg exec uh from queryTable where not null uh,null snt)@: `$"Service Unavailable"; 
// Close handle to all resources and clear resources table 
hclose each (0!resources)`sh; delete from `resources; 
// update queryTable to close outstanding user queries update snt:.z.p,ret:.z.p from `queryTable where not null uh,null snt; 
// reset LB handle and set timer of 10 seconds 
// to try and reconnect to Load Balancer process 
LB::0; NLB::0; value"\t 10000"] };
.z.ts:{ manageConn[]; if[0<LB;@[registerGWFunc;`;{show x}];value"\t 0"] };
.z.ts[];

Load Balancer

//////////////////////// // LoadBalancer //////////////////////// 
// https://code.kx.com/q/wp/query-routing/# 
//////////////////////// 
// loading order: 
// * load balancer: p 5001 
// * service: p 5002 
// * gateway: p 5003 
// * client 
//////////////////////// 
// Within our Load Balancer there are two tables and a list: 
// p 1234 p 5001 
services:([handle:`int$()] address:`$(); source:`$(); gwHandle:`int$(); sq:`int$(); udt:`timestamp$() );
serviceQueue:([gwHandle:`int$();sq:`int$()] source:`$(); time:`timestamp$() );
gateways:(); 
// The service table maintains all available instances/resources of services registered and the gateways currently using each service resource. 
// The serviceQueue maintains a list of requests waiting on resources. A list is also maintained, called gateways, which contains all gateway handles. 
// Gateways connecting to the Load Balancer add their handle to the gateways list. New service resources add their connection details to the services table. 
// When a service resource registers itself using the registerResource function, the Load Balancer informs all registered gateways of the newly available 
// resource. The next outstanding query within the serviceQueue table is allocated immediately to this new resource. 
registerGW:{gateways,:.z.w ; select source, address from services};
 registerResource:{[name;addr] `services upsert (.z.w;addr;name;0N;0N;.z.p); (neg gateways)@:(`addResource;enlist`source`address!(name;addr)); 
// Sends resource information to all registered gateway handles serviceAvailable[.z.w;name] }; 
// Incoming requests for service allocation arrive with a corresponding sequence number. The combination of gateway handle and sequence number will 
// always be unique. The requestService function either provides a service to the gateway or adds the request to the serviceQueue. When a resource is allocated 
// to a user query, the resource address is returned to the gateway along with the query sequence number that made the initial request. 
sendService:{[gw;h]neg[gw]raze(`serviceAlloc;services[h;`sq`address])}; 
// Returns query sequence number and resource address to gateway handle 
requestService:{[seq;serv] res:exec first handle from services where source=serv,null gwHandle; 
// Check if any idle service resources are available 
$[null res; addRequestToQueue[seq;serv;.z.w]; [services[res;`gwHandle`sq`udt]:(.z.w;seq;.z.p); sendService[.z.w;res]]] }; 
// If all matching resources are busy, then the gateway handle + sequence number combination is appended to the serviceQueue table along with the service required. 
addRequestToQueue:{[seq;serv;gw]`serviceQueue upsert (gw;seq;serv;.z.p)}; 
// After a service resource has finished processing a request, it sends an asynchronous message to the Load Balancer, executing the returnService function. 
// As mentioned previously, if the user disconnects from the gateway prior to being allocated a service resource, the gateway also calls this function.
// The incoming handle differentiates between these two situations. 
returnService:{ serviceAvailable . $[.z.w in (0!services)`handle; (.z.w;x); value first select handle,source from services where gwHandle=.z.w,sq=x ] };
// On execution of the serviceAvailable function, the Load Balancer will either mark this resource as free, or allocate the resource to the next gateway + 
// sequence number combination that has requested this service, updating the services and serviceQueue tables accordingly. 
serviceAvailable:{[zw;serv] nxt:first n:select gwHandle,sq from serviceQueue where source=serv; serviceQueue::(1#n)_ serviceQueue; 
// Take first request for service and remove from queue services[zw;`gwHandle`sq`udt]:(nxt`gwHandle;nxt`sq;.z.p); if[count n; sendService[nxt`gwHandle;zw]] }; 
// Any resource that disconnects from the Load Balancer is removed from the services table. If a gateway has disconnected, 
// it is removed from the resource subscriber list gateways and all queued queries for any resources must also be removed, 
// and the resource freed up for other gateways. Unlike other components in this framework, the Load Balancer does not 
// attempt to reconnect to processes, as they may have permanently been removed from the service pool of resources. In a 
// dynamically adjustable system, service resources could be added and removed on demand based on the size of the 
// serviceQueue table. 
.z.pc:{[h] services _:h; gateways::gateways except h; delete from `serviceQueue where gwHandle=h; update gwHandle:0N from `services where gwHandle=h }; 
// If a gateway dies, data services will continue to run queries that have already been routed to them, 
// which will not subsequently be returned to the client. It is also possible that the next query assigned to this 
// resource may experience a delay as the previous query is still being evaluated. As mentioned later, 
// all resources should begin with a timeout function to limit interruption of service.

Service

//////////////////////// // Service //////////////////////// 
// https://code.kx.com/q/wp/query-routing/# 
//////////////////////// 
// loading order: 
// * load balancer: p 5001 
// * service: p 5002 
// * gateway: p 5003 
// * client 
//////////////////////// 
// The example below takes a simple in-memory database containing trade and quote data that users can query. 
// An example timeout of ten seconds is assigned, to prevent queries running for too long. 
T 10 p 5002 LB:0 egQuote:([] date:10#.z.D-1; sym:10#`FDP; time:09:30t+00:30t*til 10; bid:100.+0.01*til 10; ask:101.+0.01*til 10 );
egTrade:([] date:10#.z.D-1; sym:10#`FDP; time:09:30t+00:30t*til 10; price:100.+0.01*til 10; size:10#100 );
// Each instance of a service uses the same service name. Within this example, the service name is hard-coded, but this would ideally be set 
// via a command line parameter. In our example below, our service name is set to `EQUITY_MARKET_RDB. In designing a user-friendly system, 
// service names should be carefully set to clearly describe a services purpose. Similar processes (with either a different port number or 
// running on a different host) can be started up with this service name, increasing the pool of resources available to users. 
// The serviceDetails function is executed on connection to the Load Balancer to register each service address. 
manageConn:{@[{NLB::neg LB::hopen x}; `:localhost:5001; {show "Can't connect to Load Balancer-> ",x}] };
serviceName:`EQUITY_MARKET_RDB; serviceDetails:(`registerResource; serviceName; `$":" sv string (();.z.h;system"p") );
// When a gateway sends the service a request via the queryService function, a unique sequence number assigned by a given gateway arrives as 
// the first component of the incoming asynchronous message. The second component, the query itself, is then evaluated. The results of this query 
// is stamped with the same original sequence number and returned to the gateway handle. 
// As mentioned previously, query interpretation/validation on the gateway side is outside of the scope of this paper. 
// Any errors that occur due to malformed queries will be returned via protected evaluation from database back to the user. 
// In the situation where the process query times out, 'stop will be returned to the user via the projection errProj. 
// On completion of a request, an asynchronous message is sent to the Load Balancer informing it that the service is now available for the next request. 
execRequest:{[nh;rq]nh(`returnRes;(rq 0;@[value;rq 1;{x}]));nh[]};
queryService:{ errProj:{[nh;sq;er]nh(sq;`$er);nh[]}; @[execRequest[neg .z.w];x;errProj[neg .z.w;x 0]]; NLB(`returnService;serviceName) };
// Note that in the execRequest function, nh is the asynchronous handle to the gateway. Calling nh[] after sending the result causes the outgoing 
// message queue for this handle to be flushed immediately. 
// Like our gateway, the .z.pc handle is set to reconnect to the Load Balancer on disconnect. The .z.ts function retries to connect to the Load Balancer, 
// and once successful the service registers its details. The .z.ts function is executed once on start-up like the gateway to initialize the first connection. 
.z.ts:{manageConn[];if[0<LB;@[NLB;serviceDetails;{show x}];value"\t 0"]};
.z.pc:{[handle]if[handle~LB;LB::0;value"\t 10000"]};
.z.ts[];

Hey David - updates!

I think the Query Router white paper might have a couple of quirks that are me causing issues. (Although at the moment I feel like the guy in the 2000’s who blames Bill Gates when he stuffs up is VB code)

I think the main issue is this:

returnRes:{[res] uh:first exec uh from queryTable where sq=(res 0); // (res 0) is the sequence number if[not null uh;(neg uh)(res 1)]; // (res 1) is the result queryTable[(res 0);`ret]:.z.p }

In particular:

(neg uh)(res 1)

I think res causes a `Type failure because it needs to be a string. I’ve tried:

(neg uh) .Q.s1 (res 1)

but that doesn’t return anything that shows up anywhere. Interestingly, if I do something like (neg uh)“foo:123”;(neg uh)“”;

from the gateway, then I do get a variable called foo with that value on the client.

My next attempt will be to either adjust the query process so it assigns return data to a variable or go through the async paper again and see what the standard method is.

I will let you know how I go.

Simon

Hey David - I wanted to follow up on this. I have recently built myself a fancypants new PC. I’m keen to squeeze as much as possible out of it so I went for a bare metal environment rather than in containers. I have just tried loading up the load balancer example using mserve.q and it seems to be working fine.

I’ll update when I try the full query router example and let you know if there are still issues. I would like to get it going in containers - I wonder if the docker approach would mean using one process per container. I will advise as I discover!!!

Cheers,

Simon

Thanks Dave,

I still get no luck - I should say that if I open a port to 5003 directly using

`h_gw set hopen 5003;

I can get it to do say “5+6” (returning 11) or “til 5” (returning 1 2 3 4).

As you advised, I restructured the gateway function:

gw:{(neg `h_gw[])(`userQuery;x);`h_gw[]""}

I now get an error back from

gw(`EQUITY_MARKET_RDB;"select from trade where date=max date")

where it doesn’t seem to recognize the table. It does return properly now though. I will add log functions to each component on the gw and see what gets done when. Also I should say, I took all the code directly from the query routing white paper. Perhaps I’m being too flippant with my cut and paste - either way, this process will get the skills built. I will report back with news from the logs.

Simon

 

I think the issue is that you are blocking on the client as your last command

x

Interprocess communication | Basics | kdb+ and q documentation - Kdb+ and q documentation (kx.com)

They may be no message being sent back from the client, especially true if is an issue on the server, or if you don’t have a callback on the calling handle.

Perhaps you meant to async flush?

neg

Might be worth while putting a nohup on the front of your ‘&’ commands, and I’d also suggest opening you handle as the projection, instead of opening per call to GW. You could add a check in your funct to check if the handle is valid and reopen as needed?

Just an note - you could save that function in any file, and source; doesn’t have to be .bashrc unless that’s your intention.

Hi David,

Thank you for your speedy response! I have tried the load balancer on basic q processes.

I did the following:

copy the following files to QHOME:

  • loadBalancer.q
  • service.q
  • gateway.q
Next, I defined function qmserve in .bashrc as follows:
qmserve(){ q loadBalancer.q -p 5001 & q service.q -p 5002 & q gateway.q -p 5003 & q -p 5004 & q -p 5005 }
and ran
source .bashrc
from the command line in the home directory to load up the function.

I then executed the function qmserve and noted 5 processes starting.

at q prompt did this:

gw:{h:hopen x;{(neg x)(`userQuery;y);x[]}[h]}[`:localhost:5003]

followed by this:

gw(`EQUITY_MARKET_RDB;"select from trade where date=max date")

The result was a hanging cursor.

Looking in the service.q code, what does LB do when it is used in manageConn?

My thoughts for troubleshooting was that I should load up the processes separately on multiple terminals as using ‘&’ suppresses error returns as I’m sure you will know. An alternative view though is that my final process doesn’t have a ‘&’ so should return errors normally.

Keen for any thoughts you might have even if it’s on the next steps for my troubleshooting. I wondered if it was worth loading up the scripts in a q startup process wrapped in my qmserve function but then start up my client q process directly on the command line. I don’t think the function wrapper in the bash script should change things but it never hurts to try.

Simon

Hi Simon,

Thanks for sharing your query.

Just curious if you have you tried either of these approaches outside of Developer on raw q processes to run as async callbacks?

Regards,

David