changeset 518:7431abeb5b7c

good lightsc fixes, but yeah need streaming in lightsd in the future, this works but sucks
author Louis Opter <kalessin@kalessin.fr>
date Thu, 10 Nov 2016 13:02:27 -0800
parents 24a8464934ff
children ddce014cc621
files add_monolight.patch
diffstat 1 files changed, 64 insertions(+), 22 deletions(-) [+]
line wrap: on
line diff
--- a/add_monolight.patch	Tue Nov 08 09:36:37 2016 -0800
+++ b/add_monolight.patch	Thu Nov 10 13:02:27 2016 -0800
@@ -589,7 +589,7 @@
 new file mode 100644
 --- /dev/null
 +++ b/apps/monolight/monolight/ui/elements.py
-@@ -0,0 +1,455 @@
+@@ -0,0 +1,457 @@
 +# Copyright (c) 2016, Louis Opter <louis@opter.org>
 +#
 +# This file is part of lightsd.
@@ -742,6 +742,8 @@
 +
 +            if current_action in done:
 +                self._action_queue.task_done()
++                # always retrieve the result, we might have an error to raise:
++                current_action.result()
 +                current_action = None
 +            if next_action in done:
 +                next_action = next_action.result()
@@ -1467,7 +1469,7 @@
 new file mode 100644
 --- /dev/null
 +++ b/clients/python/lightsc/lightsc/client.py
-@@ -0,0 +1,352 @@
+@@ -0,0 +1,392 @@
 +# Copyright (c) 2016, Louis Opter <louis@opter.org>
 +# All rights reserved.
 +#
@@ -1513,6 +1515,7 @@
 +    List,
 +    NamedTuple,
 +    Sequence,
++    Tuple,
 +)
 +from typing import Type  # noqa
 +
@@ -1576,6 +1579,11 @@
 +        }
 +        self.response = asyncio.Future()  # type: asyncio.futures.Future
 +
++    @property
++    def response_or_exception(self) -> Any:
++        ex = self.response.exception()
++        return ex if ex is not None else self.response.result()
++
 +
 +class AsyncJSONRPCLightsClient:
 +
@@ -1637,7 +1645,7 @@
 +
 +        futures = [call.response for call in pipeline]
 +        await asyncio.wait(futures, loop=self._loop)
-+        return {call.id: call.response.result() for call in pipeline}
++        return {call.id: call.response_or_exception for call in pipeline}
 +
 +    async def close(self) -> None:
 +        if self._listen_task is not None:
@@ -1696,40 +1704,74 @@
 +            raise
 +
 +    async def _listen(self) -> None:
-+        buf = bytearray()
++        # FIXME:
++        #
++        # This method is fucked, we need to add a real streaming mode on
++        # lightsd's side and then an async version of ijson:
++
++        buf = bytearray()  # those bufs need to be bound to some max size
++        sbuf = str()
 +
 +        while True:
 +            chunk = await self._reader.read(self.READ_SIZE)
 +            if not len(chunk):  # EOF, reconnect
 +                logger.info("EOF, reconnecting...")
++                # XXX: deadlock within the close call in _reconnect? (and if
++                # that's the case maybe you can use an event or something).
 +                await self._reconnect()
 +                return
 +
 +            buf += chunk
 +            try:
-+                json.loads(buf.decode(self.encoding, "ignore"))
-+            except Exception:
++                sbuf += buf.decode(self.encoding, "strict")  # strict is fucked
++            except UnicodeError:
 +                continue
-+            response = json.loads(buf.decode(self.encoding, "surrogateescape"))
 +            buf = bytearray()
 +
-+            batch = response if isinstance(response, list) else [response]
-+            for response in batch:
-+                id = response["id"]
++            while sbuf:
++                # and this is completely fucked:
++                try:
++                    response = json.loads(sbuf)
++                    sbuf = str()
++                except Exception:
++                    def find_response(delim: str) -> Tuple[Dict[str, Any], str]:
++                        offset = sbuf.find(delim)
++                        while offset != -1:
++                            try:
++                                response = json.loads(sbuf[:offset + 1])
++                                return response, sbuf[offset + 1:]
++                            except Exception:
++                                offset = sbuf.find(delim, offset + 2)
++                        return None, sbuf
 +
-+                error = response.get("error")
-+                if error is not None:
-+                    code = error.get("code")
-+                    msg = error.get("msg")
-+                    logger.warning("Error {}: {} - {}".format(id, code, msg))
-+                    call = self._pending_calls.pop(id)
-+                    ex = exceptions.LightsClientError(msg)
-+                    call.response.set_exception(ex)
-+                    call.timeout_handle.cancel()
-+                    continue
++                    for delim in {"}{", "}[", "]{", "]["}:
++                        response, sbuf = find_response(delim)
++                        if response is not None:
++                            break  # yay!
++                    else:
++                        break  # need more data
++
++                batch = response if isinstance(response, list) else [response]
++                for response in batch:
++                    id = response["id"]
 +
-+                logger.info("Response {}: {}".format(id, response["result"]))
-+                self._handle_response(id, response["result"])
++                    error = response.get("error")
++                    if error is not None:
++                        code = error.get("code")
++                        msg = error.get("message")
++                        logger.warning("Error {}: {} - {}".format(
++                            id, code, msg
++                        ))
++                        call = self._pending_calls.pop(id)
++                        ex = exceptions.LightsClientError(msg)
++                        call.response.set_exception(ex)
++                        call.timeout_handle.cancel()
++                        continue
++
++                    logger.info("Response {}: {}".format(
++                        id, response["result"]
++                    ))
++                    self._handle_response(id, response["result"])
 +
 +    def batch(self) -> "_AsyncJSONRPCBatch":
 +        return _AsyncJSONRPCBatch(self)