import json
import logging
import attr
[docs]
@attr.s(eq=False)
class QMPMonitor:
monitor_out = attr.ib()
monitor_in = attr.ib()
[docs]
def __attrs_post_init__(self):
self.logger = logging.getLogger(f"{self}:")
self._negotiate_capabilities()
def _negotiate_capabilities(self):
greeting = self._read_parse_json()
if not greeting.get('QMP'):
raise QMPError("QMP greeting message invalid")
self.monitor_in.write(json.dumps({"execute": "qmp_capabilities"}).encode("utf-8"))
self.monitor_in.write("\n".encode("utf-8"))
self.monitor_in.flush()
answer = self._read_parse_json()
if not "return" in answer:
raise QMPError(f"Could not connect to QMP: {answer}")
def _read_parse_json(self):
line = self.monitor_out.readline().decode('utf-8')
self.logger.debug("Received line: %s", line.rstrip("\r\n"))
if not line:
raise QMPError("Received empty response")
return json.loads(line)
[docs]
def execute(self, command, arguments={}):
json_command = {"execute": command, "arguments": arguments}
self.monitor_in.write(json.dumps(json_command).encode("utf-8"))
self.monitor_in.write("\n".encode("utf-8"))
self.monitor_in.flush()
answer = self._read_parse_json()
# skip all asynchronous events
while answer.get('event'):
answer = self._read_parse_json()
if "error" in answer:
raise QMPError(answer['error'])
return answer['return']
[docs]
@attr.s(eq=False)
class QMPError(Exception):
msg = attr.ib(validator=attr.validators.instance_of(str))