# HG changeset patch # User Louis Opter # Date 1478811747 28800 # Node ID 7431abeb5b7cac91340d19c54233465187d57b53 # Parent 24a8464934ff6bc633517febaf59902d4fed9d08 good lightsc fixes, but yeah need streaming in lightsd in the future, this works but sucks diff -r 24a8464934ff -r 7431abeb5b7c add_monolight.patch --- a/add_monolight.patch Tue Nov 08 09:36:37 2016 -0800 +++ b/add_monolight.patch Thu Nov 10 13:02:27 2016 -0800 @@ -589,7 +589,7 @@ new file mode 100644 --- /dev/null +++ b/apps/monolight/monolight/ui/elements.py -@@ -0,0 +1,455 @@ +@@ -0,0 +1,457 @@ +# Copyright (c) 2016, Louis Opter +# +# This file is part of lightsd. @@ -742,6 +742,8 @@ + + if current_action in done: + self._action_queue.task_done() ++ # always retrieve the result, we might have an error to raise: ++ current_action.result() + current_action = None + if next_action in done: + next_action = next_action.result() @@ -1467,7 +1469,7 @@ new file mode 100644 --- /dev/null +++ b/clients/python/lightsc/lightsc/client.py -@@ -0,0 +1,352 @@ +@@ -0,0 +1,392 @@ +# Copyright (c) 2016, Louis Opter +# All rights reserved. +# @@ -1513,6 +1515,7 @@ + List, + NamedTuple, + Sequence, ++ Tuple, +) +from typing import Type # noqa + @@ -1576,6 +1579,11 @@ + } + self.response = asyncio.Future() # type: asyncio.futures.Future + ++ @property ++ def response_or_exception(self) -> Any: ++ ex = self.response.exception() ++ return ex if ex is not None else self.response.result() ++ + +class AsyncJSONRPCLightsClient: + @@ -1637,7 +1645,7 @@ + + futures = [call.response for call in pipeline] + await asyncio.wait(futures, loop=self._loop) -+ return {call.id: call.response.result() for call in pipeline} ++ return {call.id: call.response_or_exception for call in pipeline} + + async def close(self) -> None: + if self._listen_task is not None: @@ -1696,40 +1704,74 @@ + raise + + async def _listen(self) -> None: -+ buf = bytearray() ++ # FIXME: ++ # ++ # This method is fucked, we need to add a real streaming mode on ++ # lightsd's side and then an async version of ijson: ++ ++ buf = bytearray() # those bufs need to be bound to some max size ++ sbuf = str() + + while True: + chunk = await self._reader.read(self.READ_SIZE) + if not len(chunk): # EOF, reconnect + logger.info("EOF, reconnecting...") ++ # XXX: deadlock within the close call in _reconnect? (and if ++ # that's the case maybe you can use an event or something). + await self._reconnect() + return + + buf += chunk + try: -+ json.loads(buf.decode(self.encoding, "ignore")) -+ except Exception: ++ sbuf += buf.decode(self.encoding, "strict") # strict is fucked ++ except UnicodeError: + continue -+ response = json.loads(buf.decode(self.encoding, "surrogateescape")) + buf = bytearray() + -+ batch = response if isinstance(response, list) else [response] -+ for response in batch: -+ id = response["id"] ++ while sbuf: ++ # and this is completely fucked: ++ try: ++ response = json.loads(sbuf) ++ sbuf = str() ++ except Exception: ++ def find_response(delim: str) -> Tuple[Dict[str, Any], str]: ++ offset = sbuf.find(delim) ++ while offset != -1: ++ try: ++ response = json.loads(sbuf[:offset + 1]) ++ return response, sbuf[offset + 1:] ++ except Exception: ++ offset = sbuf.find(delim, offset + 2) ++ return None, sbuf + -+ error = response.get("error") -+ if error is not None: -+ code = error.get("code") -+ msg = error.get("msg") -+ logger.warning("Error {}: {} - {}".format(id, code, msg)) -+ call = self._pending_calls.pop(id) -+ ex = exceptions.LightsClientError(msg) -+ call.response.set_exception(ex) -+ call.timeout_handle.cancel() -+ continue ++ for delim in {"}{", "}[", "]{", "]["}: ++ response, sbuf = find_response(delim) ++ if response is not None: ++ break # yay! ++ else: ++ break # need more data ++ ++ batch = response if isinstance(response, list) else [response] ++ for response in batch: ++ id = response["id"] + -+ logger.info("Response {}: {}".format(id, response["result"])) -+ self._handle_response(id, response["result"]) ++ error = response.get("error") ++ if error is not None: ++ code = error.get("code") ++ msg = error.get("message") ++ logger.warning("Error {}: {} - {}".format( ++ id, code, msg ++ )) ++ call = self._pending_calls.pop(id) ++ ex = exceptions.LightsClientError(msg) ++ call.response.set_exception(ex) ++ call.timeout_handle.cancel() ++ continue ++ ++ logger.info("Response {}: {}".format( ++ id, response["result"] ++ )) ++ self._handle_response(id, response["result"]) + + def batch(self) -> "_AsyncJSONRPCBatch": + return _AsyncJSONRPCBatch(self)