Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 57 additions & 37 deletions net/rfc6455/conn-api.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -133,46 +133,66 @@
(define ws-executor (make-will-executor))
(void (thread (lambda () (let loop () (will-execute ws-executor) (loop)))))

;; if ws-conn is gc'd while still active, closes it gracefully
(define (ws-conn-will ws-conn)
(unless (ws-conn-closed? ws-conn)
(ws-close! ws-conn)))

;; sets maybe-conn's read-thread-status to e, if maybe-conn is not #f
;; : (U #f ws-conn-base) Any -> Void
(define (try-set-ws-conn-base-read-thread-status! maybe-conn e)
(when maybe-conn
(set-ws-conn-base-read-thread-status! maybe-conn e)))


(define (ws-read-thread)
(thread
(lambda ()
(define thd (current-thread))
(match-define (list 'start! ws-conn) (thread-receive))
(will-register ws-executor ws-conn (lambda (_) (break-thread thd)))
(define ws-conn/weak
(make-weak-box
(match (thread-receive)
[(list 'start! ws-conn) ws-conn])))
(will-register ws-executor (weak-box-value ws-conn/weak) ws-conn-will)


(with-handlers [((lambda (e) #t)
(lambda (e) (set-ws-conn-base-read-thread-status! ws-conn e)))]
(lambda (e) (try-set-ws-conn-base-read-thread-status!
(weak-box-value ws-conn/weak) e)))]
(let loop ((backlog #f))
(match (thread-receive)
[(list 'single payload-type nack-evt ch)
(define (deliver raw-item auto-conv)
(define item ((or (match payload-type
['text bytes->string/utf-8]
['binary values]
['auto #f])
auto-conv)
raw-item))
(sync (handle-evt nack-evt
(lambda (_) (loop (list raw-item auto-conv))))
(handle-evt (channel-put-evt ch item)
(lambda (_) (loop #f)))))
(match backlog
[#f
(sync (handle-evt nack-evt
(lambda (_) (loop #f)))
(handle-evt (ws-conn-base-ip ws-conn)
(lambda (_)
(match (ws-recv** ws-conn)
[(? eof-object?) (void)] ;; terminate!
[(list raw-item auto-conv) (deliver raw-item auto-conv)]))))]
[(list raw-item auto-conv)
(deliver raw-item auto-conv)])]
[(list 'stream o)
(match backlog
[#f
(ws-stream** ws-conn o)
(loop #f)]
[(list raw-item _auto-conv)
(display raw-item o)
(close-output-port o)
(loop #f)])]))))))
(define v (thread-receive))
(define ws-conn (weak-box-value ws-conn/weak))
(when ws-conn
(match v
[(list 'single payload-type nack-evt ch)
(define (deliver raw-item auto-conv)
(define item ((or (match payload-type
['text bytes->string/utf-8]
['binary values]
['auto #f])
auto-conv)
raw-item))
(sync (handle-evt nack-evt
(lambda (_) (loop (list raw-item auto-conv))))
(handle-evt (channel-put-evt ch item)
(lambda (_) (loop #f)))))
(match backlog
[#f
(sync (handle-evt nack-evt
(lambda (_) (loop #f)))
(handle-evt (ws-conn-base-ip ws-conn)
(lambda (_)
(match (ws-recv** ws-conn)
[(? eof-object?) (void)] ;; terminate!
[(list raw-item auto-conv) (deliver raw-item auto-conv)]))))]
[(list raw-item auto-conv)
(deliver raw-item auto-conv)])]
[(list 'stream o)
(match backlog
[#f
(ws-stream** ws-conn o)
(loop #f)]
[(list raw-item _auto-conv)
(display raw-item o)
(close-output-port o)
(loop #f)])])))))))