Hi all,
I’ve had a frustrating experience attempting to set up a load balancer and query router on a collection of q processes both started independently and with developer. I wondered if anyone can see the schoolboy error or has any insight to share.
I’ve tried:
- using the base load balancing server script:
https://github.com/KxSystems/kdb/blob/master/e/mserve.q
Process:
start 4 q processes on ports 5001 to 5004
start a 5th process with this script loaded on start.
try loading an HDB and querying directly on that 5th process or by sending queries to that 5th process with a Developer session set up as a client.
I get nothing back
- using the full query router
https://code.kx.com/q/wp/query-routing/
Process:
start up 3 processes on ports 5001 to 5003 with gateway, loadBalancer and service scripts loaded to them (the start order i use is load balancer, then service, then gateway). The only changes I made to the scripts was to add the text from the white paper to aid my understanding and to define a random number generated to initialise the SEQ variable in the gateway script. The service script actually just creates dummy tables but I wanted to make as few changes as possible to get the POC running.
In any case, when I try connecting to gateway with Developer as client. I have no success. I have added the scripts in the post below as I can’t attach more than one file at a time and it appears the system is clever enough to thwart my attempt to get around the restriction on attaching zips.
From an earlier post to the Community, I understand that Developer doesn’t actually support the use of peach with separate processes rather than threads. This has me thinking that one of the below is true:
- I am screwing up the code implementation
- I am screwing up the code implementation AND can’t use developer with these multiprocess architectures
- the code as shown in the above links contains errors
- There is a bug in the q version available separate to the platform offering that means you can’t do asynchronous callbacks on either Developer or a raw q process.
I hope minds greater than my own might be able to cast light on which of these 4 options is true and what the solution might be. In summary, at present I am yet to successfully execute an asynchronous callback.
Regards,
Simon
Gateway
//////////////////////// // Gateway //////////////////////// // https://code.kx.com/q/wp/query-routing/# //////////////////////// // loading order: // * load balancer: p 5001 // * service: p 5002 // * gateway: p 5003 // * client //////////////////////// // When a connection is opened to the Load Balancer, the handle is set to the variable LB, which will be referenced throughout this paper. // As asynchronous messages are used throughout this framework, we also create the variable NLB, // which is assigned with the negative handle to the load balancer. // \p 5555 \p 5003 manageConn:{@[{NLB::neg LB::hopen x};:localhost:5001;{show x}]}; registerGWFunc:{addResource LB(
registerGW;)}; // The gateway connects to the Load Balancer and retrieves the addresses of all service resources, establishing a connection to each. // This is the only time the gateway uses synchronous IPC communication to ensure it has all of the details it requires before accepting // user queries. After the gateway registers itself as a subscriber for any new resources that come available, all future communication is // sent via asynchronous messages. resources:([address:()] source:();sh:()); addResource:{
resources upsert address xkey update sh:{hopen first x}each address from x }; // The gateway process creates and maintains an empty query table. The complexity of this table is at the developers discretion. In this example well record: // * Unique sequence number per query (sq) // * Handle from user process (uh) // * Timestamps for when the query was received, when the query got sent to an available resource, and when the query results are sent back // to the user (rec, snt, ret respectively) // * The user ID (user) // * The service handle (sh) // * The service requested by user (serv) // * The users query queryTable:([sq:
int$()]; uh:int$(); rec:
timestamp$(); snt:timestamp$(); ret:
timestamp$(); usr:$(); sh:
int$(); serv:$(); query:() ); // This table could be extended to include more information by making small changes to the code in this paper. // These fields could include the status of a query, error messages received from service or the total time a query took from start to end. // As mentioned previously, users make requests by calling the userQuery function on the gateway. This function takes a two-item list argument: (Service;Query). // The gateway will validate the existence of a service matching the name passed to userQuery and send an error if no such resource exists. // We are setting outside the scope of this paper any further request validation, including access permissioning. // For further details on access control, please refer to the technical white paper "Permissions with kdb+". // When a user sends her query via the userQuery function, we assign the query a unique sequence number and publish an asynchronous request to the // Load Balancer to be assigned an available resource. // initialise the query id generator. SEQ: first 1?0; userQuery:{ $[(serv:x 0) in exec distinct source from resources; // valid service? [queryTable,:(SEQ+:1;.z.w;.z.p;0Np;0Np;.z.u;0N;serv;x 1); NLB(
requestService;SEQ;serv)]; (neg .z.w)($"Service Unavailable")] }; // The addResource function defined earlier is used to add new service instances to the plant, while the serviceAlloc function is used to // pass back an allocated resource for a given query sequence number. The query is retrieved by its sequence number from queryTable and // sent to the allocated service resource. If the user has since disconnected from the gateway before a resource could be provided, the gateway // informs the Load Balancer to make this resource free again by executing the returnService function in the Load Balancer. After each event, // the timestamp fields are updated within the queryTable. serviceAlloc:{[sq;addr] $[null queryTable[sq;
uh]; // Check if user is still waiting on results NLB(returnService;sq); // Service no longer required [(neg sh:resources[addr;
sh]) (queryService;(sq;queryTable[sq;
query])); // Send query to allocated resource, update queryTable queryTable[sq;snt
sh]:(.z.p;sh)]] }; // When a service returns results to the gateway, the results arrive tagged with the same sequence number sent in the original query. This // incoming message packet executes the returnRes function, which uses the sequence number to identify the user handle and return the results. // If the user has disconnected before the results can be returned then the user handle field uh will be set to null (through the .z.pc trigger) // causing nothing further to be done. returnRes:{[res] uh:first exec uh from queryTable where sq=(res 0); // (res 0) is the sequence number if[not null uh;(neg uh)(res 1)]; // (res 1) is the result queryTable[(res 0);ret]:.z.p }; // In the situation where a process disconnects from the gateway, .z.pc establishes what actions to take. As mentioned, a disconnected user // will cause queryTable to be updated with a null user handle. If the user currently has no outstanding queries, the gateway has nothing to // do. If a service disconnects from the gateway whilst processing an outstanding user request, then all users that have outstanding // requests to this database are informed and the database is purged from the available resources table. // If our Load Balancer connection has dropped, all users with queued queries will be informed. All connections are disconnected and purged // from the resources table. This ensures that all new queries will be returned directly to users as the Load Balancer is unavailable to // respond to their request. A timer is set to attempt to reconnect to the Load Balancer. On reconnection, the gateway will re-register // itself, pull all available resources and establish new connections. The .z.ts trigger is executed once, on script startup, // to initialize and register the process. .z.pc:{[handle] // if handle is for a user process, set the query handle (uh) as null update uh:0N from
queryTable where uh=handle; // if handle is for a resource process, remove from resources delete from resources where sh=handle; // if any user query is currently being processed on the service which // disconnected, send message to user if[count sq:exec distinct sq from queryTable where sh=handle,null ret; returnRes'[sq cross
$“Service Disconnect”]]; if[handle~LB; // if handle is Load Balancer // Send message to each connected user, which has not received results (neg exec uh from queryTable where not null uh,null snt)@: $"Service Unavailable"; // Close handle to all resources and clear resources table hclose each (0!resources)
sh; delete from resources; // update queryTable to close outstanding user queries update snt:.z.p,ret:.z.p from
queryTable where not null uh,null snt; // reset LB handle and set timer of 10 seconds // to try and reconnect to Load Balancer process LB::0; NLB::0; value"\t 10000"] }; .z.ts:{ manageConn; if[0<LB;@[registerGWFunc;`;{show x}];value"\t 0"] }; .z.ts;
Load Balancer
//////////////////////// // LoadBalancer //////////////////////// // https://code.kx.com/q/wp/query-routing/# //////////////////////// // loading order: // * load balancer: p 5001 // * service: p 5002 // * gateway: p 5003 // * client //////////////////////// // Within our Load Balancer there are two tables and a list: // \p 1234 \p 5001 services:([handle:int$()] address:
$(); source:$(); gwHandle:
int$(); sq:int$(); udt:
timestamp$() ); serviceQueue:([gwHandle:int$();sq:
int$()] source:$(); time:
timestamp$() ); gateways:(); // The service table maintains all available instances/resources of services registered and the gateways currently using each service resource. // The serviceQueue maintains a list of requests waiting on resources. A list is also maintained, called gateways, which contains all gateway handles. // Gateways connecting to the Load Balancer add their handle to the gateways list. New service resources add their connection details to the services table. // When a service resource registers itself using the registerResource function, the Load Balancer informs all registered gateways of the newly available // resource. The next outstanding query within the serviceQueue table is allocated immediately to this new resource. registerGW:{gateways,:.z.w ; select source, address from services}; registerResource:{[name;addr] services upsert (.z.w;addr;name;0N;0N;.z.p); (neg gateways)@\:(
addResource;enlistsource
address!(name;addr)); // Sends resource information to all registered gateway handles serviceAvailable[.z.w;name] }; // Incoming requests for service allocation arrive with a corresponding sequence number. The combination of gateway handle and sequence number will // always be unique. The requestService function either provides a service to the gateway or adds the request to the serviceQueue. When a resource is allocated // to a user query, the resource address is returned to the gateway along with the query sequence number that made the initial request. sendService:{[gw;h]neg[gw]raze(serviceAlloc;services[h;
sqaddress])}; // Returns query sequence number and resource address to gateway handle requestService:{[seq;serv] res:exec first handle from services where source=serv,null gwHandle; // Check if any idle service resources are available $[null res; addRequestToQueue[seq;serv;.z.w]; [services[res;
gwHandlesq
udt]:(.z.w;seq;.z.p); sendService[.z.w;res]]] }; // If all matching resources are busy, then the gateway handle + sequence number combination is appended to the serviceQueue table along with the service required. addRequestToQueue:{[seq;serv;gw]serviceQueue upsert (gw;seq;serv;.z.p)}; // After a service resource has finished processing a request, it sends an asynchronous message to the Load Balancer, executing the returnService function. // As mentioned previously, if the user disconnects from the gateway prior to being allocated a service resource, the gateway also calls this function. // The incoming handle differentiates between these two situations. returnService:{ serviceAvailable . $[.z.w in (0!services)
handle; (.z.w;x); value first select handle,source from services where gwHandle=.z.w,sq=x] }; // On execution of the serviceAvailable function, the Load Balancer will either mark this resource as free, or allocate the resource to the next gateway + // sequence number combination that has requested this service, updating the services and serviceQueue tables accordingly. serviceAvailable:{[zw;serv] nxt:first n:select gwHandle,sq from serviceQueue where source=serv; serviceQueue::(1#n)_ serviceQueue; // Take first request for service and remove from queue services[zw;gwHandle
squdt]:(nxt
gwHandle;nxtsq;.z.p); if[count n; sendService[nxt
gwHandle;zw]] }; // Any resource that disconnects from the Load Balancer is removed from the services table. If a gateway has disconnected, // it is removed from the resource subscriber list gateways and all queued queries for any resources must also be removed, // and the resource freed up for other gateways. Unlike other components in this framework, the Load Balancer does not // attempt to reconnect to processes, as they may have permanently been removed from the service pool of resources. In a // dynamically adjustable system, service resources could be added and removed on demand based on the size of the // serviceQueue table. .z.pc:{[h] services _:h; gateways::gateways except h; delete from serviceQueue where gwHandle=h; update gwHandle:0N from
services where gwHandle=h }; // If a gateway dies, data services will continue to run queries that have already been routed to them, // which will not subsequently be returned to the client. It is also possible that the next query assigned to this // resource may experience a delay as the previous query is still being evaluated. As mentioned later, // all resources should begin with a timeout function to limit interruption of service.
Service
//////////////////////// // Service //////////////////////// // https://code.kx.com/q/wp/query-routing/# //////////////////////// // loading order: // * load balancer: p 5001 // * service: p 5002 // * gateway: p 5003 // * client //////////////////////// // The example below takes a simple in-memory database containing trade and quote data that users can query. // An example timeout of ten seconds is assigned, to prevent queries running for too long. \T 10 \p 5002 LB:0 egQuote:( date:10#.z.D-1; sym:10#FDP; time:09:30t+00:30t*til 10; bid:100.+0.01*til 10; ask:101.+0.01*til 10 ); egTrade:([] date:10#.z.D-1; sym:10#
FDP; time:09:30t+00:30ttil 10; price:100.+0.01til 10; size:10#100 ); // Each instance of a service uses the same service name. Within this example, the service name is hard-coded, but this would ideally be set // via a command line parameter. In our example below, our service name is set to EQUITY_MARKET_RDB. In designing a user-friendly system, // service names should be carefully set to clearly describe a services purpose. Similar processes (with either a different port number or // running on a different host) can be started up with this service name, increasing the pool of resources available to users. // The serviceDetails function is executed on connection to the Load Balancer to register each service address. manageConn:{@[{NLB::neg LB::hopen x};
:localhost:5001; {show “Can’t connect to Load Balancer-> “,x}] }; serviceName:EQUITY_MARKET_RDB; serviceDetails:(
registerResource; serviceName; $":" sv string (();.z.h;system"p") ); // When a gateway sends the service a request via the queryService function, a unique sequence number assigned by a given gateway arrives as // the first component of the incoming asynchronous message. The second component, the query itself, is then evaluated. The results of this query // is stamped with the same original sequence number and returned to the gateway handle. // As mentioned previously, query interpretation/validation on the gateway side is outside of the scope of this paper. // Any errors that occur due to malformed queries will be returned via protected evaluation from database back to the user. // In the situation where the process query times out, 'stop will be returned to the user via the projection errProj. // On completion of a request, an asynchronous message is sent to the Load Balancer informing it that the service is now available for the next request. execRequest:{[nh;rq]nh(
returnRes;(rq 0;@[value;rq 1;{x}]));nh}; queryService:{ errProj:{[nh;sq;er]nh(sq;$er);nh[]}; @[execRequest[neg .z.w];x;errProj[neg .z.w;x 0]]; NLB(
returnService;serviceName) }; // Note that in the execRequest function, nh is the asynchronous handle to the gateway. Calling nh after sending the result causes the outgoing // message queue for this handle to be flushed immediately. // Like our gateway, the .z.pc handle is set to reconnect to the Load Balancer on disconnect. The .z.ts function retries to connect to the Load Balancer, // and once successful the service registers its details. The .z.ts function is executed once on start-up like the gateway to initialize the first connection. .z.ts:{manageConn;if[0<LB;@[NLB;serviceDetails;{show x}];value”\t 0”]}; .z.pc:{[handle]if[handle~LB;LB::0;value"\t 10000"]}; .z.ts;