PyKX IPC work with multithreading?

https://learninghub.kx.com/forums/topic/pykx-ipc-work-with-multithreading

Hi there, 

 

I have a PyKx process that uses IPC to connect to a port, and we're running some KDB functions that involve multithreading (cutPeach). However everytime I run it, I keep getting the following error: 

Traceback (most recent call last):
File "C:UserscleungAppDataRoamingPythonPython38site-packagesprefectengine.py", line 1533, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "C:UserscleungAppDataRoamingPythonPython38site-packagesprefectutilitiesasyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "\v-tm-qeq-05Prefectlibsite-packagesanyioto_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py", line 867, in run
result = context.run(func, *args)
File "C:UserscleungAppDataRoamingPythonPython38site-packagesprefectutilitiesasyncutils.py", line 135, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "//v-tm-qeq-05/PrefectScripts/cleung/Script/ibes_pykx.py", line 40, in saveRetRawNaive
conn('.imq.model.saveRetRawNaive[dailyDates;`]')
File "\v-tm-qeq-05Prefectlibsite-packagespykxipc.py", line 806, in __call__
return self._call(query, *args, wait=wait)
File "\v-tm-qeq-05Prefectlibsite-packagespykxipc.py", line 816, in _call
return self._recv(locked=True)
File "\v-tm-qeq-05Prefectlibsite-packagespykxipc.py", line 511, in _recv
res = callback(key.fileobj)
File "\v-tm-qeq-05Prefectlibsite-packagespykxipc.py", line 535, in _recv_socket
size = chunks[4]
IndexError: list index out of range

 

Any guidance would be appreciated. Thanks!

 

The issue you are having is unrelated to peach/multithreading on the kdb+ side.

 

The issue is the reuse of the same connection among @task blocks.

The different @tasks are attempting to read data from the same connection concurrently leading to junk data being passed through.

 

kdb+ processes incoming queries sequentially. This means even through you see the log messages from the tasks as if they run in parallel in fact once these arrive to kdb+ they will be processed one after the other always. For this reason switching to @flow will not be slower and will result is safe consistent results.

 

 

Hi,

  1. Are you running PyKX under Prefect? www.prefect.io
  2. Are you connecting and querying using PyKX in @task or @flow ?
  3. If using @task does switching to @flow (.i.e subflows) stop the issue happening?
  4. Do you see the same issue if you query in a standalone python process outside of Prefect?
  5. Are you making a single query when the issue happens or are multiple queries being run?
  6. What version of PyKX are you running? pykx.__version__
  7. Are you running in licensed or unlicensed mode? pykx.licensed
  8. How did you create the connection conn?
 

We added some functionality in version 2.3.0

https://code.kx.com/pykx/2.4/release-notes/changelog.html#pykx-230

If you enable beta features as well as pykx threading all calls into q from any python thread will be run as if they were calling from the main thread which allows python multithreaded programs to use IPC connections in licensed mode.

You can enable this functionality like this:

import os
os.environ['PYKX_THREADING'] = '1'
os.environ['PYKX_BETA_FEATURES'] = '1'
import pykx as kx

You will also want to ensure that kx.shutdown_thread()​ is called when the script finishes. The safest way to do this is within a try​ – finally​ block like this.

if __name__ == '__main__':
try:
main()
finally:
kx.shutdown_thread()

More information about this functionality and an example can be found within our documentation:

https://code.kx.com/pykx/2.4/examples/threaded_execution/threading.html