From 3ecb15ca12899df0d46d978e10d2b6be7a545dc3 Mon Sep 17 00:00:00 2001 From: johnd0e <1838643+johnd0e@users.noreply.github.com> Date: Wed, 17 Apr 2024 23:03:14 +0200 Subject: [PATCH 1/3] Simplify SSE parsing Every message ends with two newlines Specs: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format --- openai/init.moon | 31 ++++++------------------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/openai/init.moon b/openai/init.moon index c844ac3..3119db0 100644 --- a/openai/init.moon +++ b/openai/init.moon @@ -97,27 +97,6 @@ parse_completion_chunk = types.partial { } } --- lpeg pattern to read a json data block from the front of a string, returns --- the json blob and the rest of the string if it could parse one -consume_json_head = do - import C, S, P from require "lpeg" - - -- this pattern reads from the front just enough characters to consume a - -- valid json object - consume_json = P (str, pos) -> - str_len = #str - for k=pos+1,str_len - candidate = str\sub pos, k - parsed = false - pcall -> parsed = cjson.decode candidate - if parsed - return k + 1 - - return nil -- fail - - S("\t\n\r ")^0 * P("data: ") * C(consume_json) * C(P(1)^0) - - parse_error_message = types.partial { error: types.partial { message: types.string\tag "message" @@ -248,13 +227,15 @@ class OpenAI accumulation_buffer ..= chunk while true - json_blob, rest = consume_json_head\match accumulation_buffer - unless json_blob + line, rest = accumulation_buffer\match "^(.-)\r?\n\r?\n(.-)$" + unless line break accumulation_buffer = rest - if chunk = parse_completion_chunk cjson.decode json_blob - chunk_callback chunk + json_blob = line\match "^data:%s+(.-)%s*$" + if json_blob and json_blob~="[DONE]" + if chunk = parse_completion_chunk cjson.decode json_blob + chunk_callback chunk ... From 3a05a89467ae90f6e44e9bea8a45dcfc6be971ea Mon Sep 17 00:00:00 2001 From: johnd0e <1838643+johnd0e@users.noreply.github.com> Date: Fri, 6 Sep 2024 00:04:57 +0200 Subject: [PATCH 2/3] More strict parsing according SSE spec --- openai/init.moon | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/openai/init.moon b/openai/init.moon index 3119db0..650c519 100644 --- a/openai/init.moon +++ b/openai/init.moon @@ -227,15 +227,24 @@ class OpenAI accumulation_buffer ..= chunk while true - line, rest = accumulation_buffer\match "^(.-)\r?\n\r?\n(.-)$" - unless line + event, rest = accumulation_buffer\match "^(.-)\r?\n\r?\n(.-)$" + unless event break accumulation_buffer = rest - json_blob = line\match "^data:%s+(.-)%s*$" - if json_blob and json_blob~="[DONE]" - if chunk = parse_completion_chunk cjson.decode json_blob - chunk_callback chunk + while event and #event>0 + field, value, rest_evt = event\match "^(.-):%s+([^\r\n]+)(.-)$" + switch field + when "data" + unless value=="[DONE]" + chunk_callback (json.decode value) + when "event","id","retry","" --comment + nil -- noop + when nil + error "Cannot parse SSE event: "..event + else + error ('Unknown field "%s" with value "%s"')\format(field, value) + event = #rest_evt>0 and rest_evt\match"^\r?\n(.*)" ... From 1836199f43fcedd9bfa952be80afe35686bc7372 Mon Sep 17 00:00:00 2001 From: johnd0e <1838643+johnd0e@users.noreply.github.com> Date: Fri, 6 Sep 2024 00:08:09 +0200 Subject: [PATCH 3/3] Even stricter --- openai/init.moon | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/openai/init.moon b/openai/init.moon index 650c519..89b6198 100644 --- a/openai/init.moon +++ b/openai/init.moon @@ -219,11 +219,14 @@ class OpenAI assert types.function(chunk_callback), "Must provide chunk_callback function when streaming response" accumulation_buffer = "" + streamed = false (...) -> chunk = ... - if type(chunk) == "string" + if chunk == nil + assert not streamed or accumulation_buffer\match"^%s*$", "buffer not empty" + elseif type(chunk) == "string" accumulation_buffer ..= chunk while true @@ -236,8 +239,9 @@ class OpenAI field, value, rest_evt = event\match "^(.-):%s+([^\r\n]+)(.-)$" switch field when "data" + streamed = true unless value=="[DONE]" - chunk_callback (json.decode value) + chunk_callback (cjson.decode value) when "event","id","retry","" --comment nil -- noop when nil