Source code for qe_api_client.engine_communicator

from websocket import create_connection
import ssl


[docs] class EngineCommunicator: def __init__(self, url): self.url = url self.ws = create_connection(self.url) # Holds session object. Required for Qlik Sense Sept. 2017 and later self.session = self.ws.recv()
[docs] @staticmethod def send_call(self, call_msg): self.ws.send(call_msg) return self.ws.recv()
[docs] @staticmethod def close_qvengine_connection(self): self.ws.close()
[docs] class SecureEngineCommunicator(EngineCommunicator): def __init__(self, url, user_directory, user_id, ca_certs, certfile, keyfile, app_id=None): self.url = "wss://" + url + ":4747/app/" + str(app_id) certs = ({"ca_certs": ca_certs, "certfile": certfile, "keyfile": keyfile, "cert_reqs": ssl.CERT_NONE, "server_side": False}) ssl.match_hostname = lambda cert, hostname: True header = f'X-Qlik-User: UserDirectory={user_directory}; UserId={user_id}' # NOQA self.ws = create_connection(self.url, sslopt=certs, cookie=None, header={header}) self.session = self.ws.recv()