Hi there,
-
Yes we’re using Prefect to orchestrate PyKX
-
We’re using a @task
to query using PyKX. The connection is done outside of the flow and task (attached code snippet below)
-
Switching to flows does work, but we want some functions to execute in parallel. When we switched to all flows/subflows, it seemed like it was all executing sequentially.
-
We tried doing it in parallel outside of Prefect using from multiprocessing import Pool
and it was operating fine
-
Multiple queries
-
PyKX version 1.5.4
-
Unlicensed mode
-
Using SyncQConnection and IPC to grab handle into port
port = masterConn('getProcessClient[
prefect_pykx;pykx_test]')
conn = kx.SyncQConnection( host='', port=port.py(), username='cleung', password=token)
Code Snippet:
import pykx as kx import subprocess from prefect import flow, task, get_run_logger masterConn = kx.SyncQConnection( host=‘v-kdbr-01’, port=5000, username=‘cleung’, password=token, timeout=3.0 ) user = masterConn(‘.z.u’) print(‘User = ’ + user.py()) port = masterConn(‘getProcessClient[prefect_pykx;
pykx_test]’) print(‘Port = ’ + str(port.py())) masterConn.close() conn = kx.SyncQConnection( host=‘v-kdbr-01’, port=port.py(), username=‘cleung’, password=token) user = conn(’.z.u’) print(‘User = ’ + user.py()) @task def saveRecRevAI260(): logger = get_run_logger() logger.info(“Executing saveRecRevAI260”) conn(’.imq.model.saveRecRevAI260[.dt.shiftstartdr[max dailyDates;neg 1];inLogFile;reportDate]') logger.info("Finished saveRecRevAI260") return 1 @task def saveEstRevAI(): logger = get_run_logger() logger.info("Executing saveEstRevAI") conn('.imq.model.saveEstRevAI[.dt.shiftstartdr[max dailyDates;neg 1];();inLogFile]') logger.info("Finished saveEstRevAI") return 1 @task def saveRetRawNaive(): logger = get_run_logger() logger.info("Executing saveRetRawNaive") conn('.imq.model.saveRetRawNaive[dailyDates;
]‘) logger.info(“Finished saveRetRawNaive”) return 1 @task def saveEstRevStdDev5Yr(): logger = get_run_logger() logger.info (“Executing saveEstRevStdDev5Yr”) conn(".imq.model.saveEstRevStdDev5Yr[dailyDates;reportDate;inLogFile]") logger.info("Finished saveEstRevStdDev5Yr") return 1 @task def saveSRinteractionsEarnItems(): logger = get_run_logger() logger.info("Executing saveSRinteractionsEarnItems") conn(".imq.model.saveSRInteractionsEarnItems[dailyDates;inLogFile]") logger.info("Finished saveSRinteractionsEarnItems") @flow() def ibes_pykx(): logger = get_run_logger() logger.info("Initializing initial arguments") dailyDates = conn('dailyDates: .dt.drb[.dt.shiftdateb[exec max date from QModelD;-3]; exec max date from QModelD];') logFile = conn('inLogFile:
’) saveRecRevAI260_result = saveRecRevAI260.submit() saveRetRawNaive_result = saveRetRawNaive.submit() saveEstRevAI_result = saveEstRevAI(wait_for=[saveRecRevAI260_result]) saveSRinteractionsEarnItems_result = saveSRinteractionsEarnItems(wait_for=[saveRetRawNaive_result]) # saveEstRevStdDev5Yr_result = saveEstRevStdDev5Yr(wait_for=[saveEstRevAI_result]) if name == " main": ibes_pykx()
Error Message:
4:15:15.041 | DEBUG | Task run ‘saveRecRevAI260-0’ - Beginning execution… 14:15:15.046 | INFO | Task run ‘saveRecRevAI260-0’ - Executing saveRecRevAI260 14:15:15.108 | DEBUG | Task run ‘saveRetRawNaive-0’ - Beginning execution… 14:15:15.112 | INFO | Task run ‘saveRetRawNaive-0’ - Executing saveRetRawNaive 14:15:16.486 | DEBUG | prefect.client - Connecting to API at http://v-tm-qeq-05:4200/api/ 14:16:31.335 | ERROR | Task run ‘saveRecRevAI260-0’ - Encountered exception during execution: Traceback (most recent call last): File “C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py”, line 1533, in orchestrate_task_run result = await run_sync(task.fn, *args, **kwargs) File “C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py”, line 156, in run_sync_in_interruptible_worker_thread tg.start_soon( File “\v-tm-qeq-05\Prefect\lib\site-packages\anyio_backends_asyncio.py”, line 662, in aexit raise exceptions[0] File “\v-tm-qeq-05\Prefect\lib\site-packages\anyio\to_thread.py”, line 31, in run_sync return await get_asynclib().run_sync_in_worker_thread( File “\v-tm-qeq-05\Prefect\lib\site-packages\anyio_backends_asyncio.py”, line 937, in run_sync_in_worker_thread return await future File “\v-tm-qeq-05\Prefect\lib\site-packages\anyio_backends_asyncio.py”, line 867, in run result = context.run(func, *args) File “C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py”, line 135, in capture_worker_thread_and_result result = __fn(*args,**kwargs) File “//v-tm-qeq-05/PrefectScripts/cleung/Script/ibes_pykx.py”, line 24, in saveRecRevAI260 conn(‘.imq.model.saveRecRevAI260[.dt.shiftstartdr[max dailyDates;neg 1];inLogFile;`reportDate]’) File “\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py”, line 806, in__call__return self._call(query, *args, wait=wait) File “\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py”, line 816, in _call return self._recv(locked=True) File “\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py”, line 511, in _recv res = callback(key.fileobj) File “\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py”, line 535, in _recv_socket size = chunks[4] IndexError: list index out of range 14:16:31.558 | ERROR | Task run ‘saveRecRevAI260-0’ - Finished in state Failed(‘Task run encountered an exception: IndexError: list index out of range\n’) 14:16:31.622 | ERROR | Flow run ‘chocolate-marmot’ - Encountered exception during execution: Traceback (most recent call last): File “C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py”, line 665, in orchestrate_flow_run result = await run_sync(flow_call) File “C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py”, line 156, in run_sync_in_interruptible_worker_thread tg.start_soon( File “\v-tm-qeq-05\Prefect\lib\site-packages\anyio_backends_asyncio.py”, line 662, in__aexit__raise exceptions[0] File “\v-tm-qeq-05\Prefect\lib\site-packages\anyio\to_thread.py”, line 31, in run_sync return await get_asynclib().run_sync_in_worker_thread( File “\v-tm-qeq-05\Prefect\lib\site-packages\anyio_backends_asyncio.py”, line 937, in run_sync_in_worker_thread return await future File “\v-tm-qeq-05\Prefect\lib\site-packages\anyio_backends_asyncio.py”, line 867, in run result = context.run(func, *args) File “C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py”, line 135, in capture_worker_thread_and_result result =__fn(*args, **kwargs) File “//v-tm-qeq-05/PrefectScripts/cleung/Script/ibes_pykx.py”, line 69, in ibes_pykx saveEstRevAI_result = saveEstRevAI(wait_for=[saveRecRevAI260_result]) File “C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\tasks.py”, line 469, in call return enter_task_run_engine( File “C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py”, line 965, in enter_task_run_engine return run_async_from_worker_thread(begin_run) File “C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py”, line 177, in run_async_from_worker_thread return anyio.from_thread.run(call) File “\v-tm-qeq-05\Prefect\lib\site-packages\anyio\from_thread.py”, line 49, in run return asynclib.run_async_from_thread(func, *args) File “\v-tm-qeq-05\Prefect\lib\site-packages\anyio_backends_asyncio.py”, line 970, in run_async_from_thread return f.result() File “\v-tm-qeq-05\Prefect\lib\concurrent\futures_base.py”, line 444, in result return self.__get_result() File “\v-tm-qeq-05\Prefect\lib\concurrent\futures_base.py”, line 389, in__get_result raise self._exception File “C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py”, line 1114, in get_task_call_return_value return await future._result() File “C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\futures.py”, line 237, in _result return await final_state.result(raise_on_failure=raise_on_failure, fetch=True) File “C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\states.py”, line 103, in _get_state_result raise MissingResult( prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
Thanks a lot for your help by the way. Greatly appreciate it!!