2 Concurrency
2.1 Syntax
(require neuron/syntax) | package: neuron-lib |
syntax
(forever body ...)
syntax
(while expr body ...)
syntax
(until expr body ...)
syntax
(apply-values proc expr)
> (apply-values list (values 1 2 3)) '(1 2 3)
2.2 Exchanger
The Neuron Technical Report explains the difference between exchangers and channels.
(require neuron/exchanger) | package: neuron-lib |
An exchanger is a channel-based primitive that both synchronizes a pair of threads and passes a value from one to the other. Exchangers are synchronous, fair, and support multiple senders and receivers, but can not be used as synchronizable events directly.
The participants of an exchange can be characterized by two orthogonal factors: control flow and data flow. In an exchange, one side waits for the other to initiate. If the initiating side is transmitting, then the waiting side is receiving. Similarly, if the initiating side is receiving, then the waiting side is transmitting. With this distinction, forwarding exchangers with precise control flow semantics can be defined.
procedure
(exchanger? v) → boolean?
v : any/c
procedure
procedure
ex1 : exchanger? ex2 : exchanger?
procedure
(accept #:from ex) → exchanger?
ex : exchanger?
procedure
v : any/c ex : exchanger?
procedure
ex : exchanger?
2.2.1 Process Exchangers
(require neuron/process/exchanger) | package: neuron-lib |
procedure
tx : exchanger? rx : exchanger? v : any/c
procedure
rx : exchanger?
procedure
rx : exchanger? tx : exchanger?
procedure
tx : exchanger? v : any/c
procedure
ex1 : exchanger? ex2 : exchanger?
procedure
ex1 : exchanger? ex2 : exchanger? proc : (-> any/c any/c)
procedure
rx : exchanger? tx : exchanger? ex : exchanger? = (make-exchanger)
procedure
tx : exchanger? rx : exchanger? v : any/c
procedure
rx : exchanger?
procedure
(receiver-evt rx tx) → evt?
rx : exchanger? tx : exchanger?
procedure
(emitter-evt tx v) → evt?
tx : exchanger? v : any/c
procedure
(forwarder-evt ex1 ex2) → evt?
ex1 : exchanger? ex2 : exchanger?
procedure
(filterer-evt ex1 ex2 #:with proc) → evt?
ex1 : exchanger? ex2 : exchanger? proc : (-> any/c any/c)
procedure
(coupler-evt rx tx [ex]) → evt?
rx : exchanger? tx : exchanger? ex : exchanger? = (make-exchanger)
2.3 Event
(require neuron/event) | package: neuron-lib |
> (sync (evt-set (wrap-evt (thread (λ () (sleep 0.1) (write 1))) (λ _ 1)) (wrap-evt (thread (λ () (write 2))) (λ _ 2)))) 21
'(1 2)
procedure
(evt-sequence make-evt ...+ [ #:then make-result]) → evt? make-evt : (-> evt?) make-result : (-> any/c any) = values
> (sync (evt-sequence (λ () (wrap-evt (thread (λ () (sleep 0.1) (write 1))) (λ _ 1))) (λ () (wrap-evt (thread (λ () (write 2))) (λ _ 2))))) 12
2
procedure
(evt-series [ #:init init] make-evt ...+ [ #:then make-result]) → evt? init : any/c = (void) make-evt : (-> any/c evt?) make-result : (-> any/c any) = values
> (sync (evt-series #:init 1 (λ (x) (wrap-evt (thread (λ () (write x))) (λ _ (+ x 2)))) (λ (x) (wrap-evt (thread (λ () (write x))) (λ _ (+ x 4)))))) 13
7
> (sync (evt-loop #:init 1 (λ (x) (if (> x 5) (raise x) (wrap-evt always-evt (λ _ (+ x 1))))))) uncaught exception: 6
2.4 Process
(require neuron/process) | package: neuron-lib |
A process is a thread-like concurrency primitive. Processes are made from threads by replacing the thread mailbox with a few other features:
A pair of exchangers: one for transmitting and another for receiving.
An out-of-band command handler.
An on-stop hook that is called when a process ends gracefully, but not when it dies abruptly.
An on-dead hook that is called unconditionally when a process terminates.
A process can be applied as a procedure, which invokes its command handler, or handler. The command handler is a list of procedures, and the result of a command is the same as the result of the first procedure in the list to return a value other than unhandled. If every procedure returns unhandled or the list is empty, unhandled-command is raised.
> (define π (start (process deadlock) #:command (bind ([A 1] [B (λ _ 2)]) #:else unhandled))) > (π 'A) 1
> ((π 'B) 5) 2
> (π '(x y)) uncaught exception: #(struct:unhandled-command
#<procedure:process> ((x y)))
A process can be used as a synchronizable event. A process is ready for synchronization when dead? would return #t. The synchronization result is the process itself.
Unhandled exceptions are fatal. Attempting to synchronize a process killed by an unhandled exception re-raises the exception.
> (sync (process (λ () (raise 'VAL)))) uncaught exception: VAL
Processes are created explicitly by the process function. Use start to install hooks and handlers.
struct
(struct unhandled-command (process args) #:transparent) process : process? args : (listof any/c)
procedure
(process-tx π) → transmitter?
π : process?
procedure
(process-rx π) → transmitter?
π : process?
syntax
(start π-expr hooks-and-handlers ...)
hooks-and-handlers = #:on-stop on-stop | #:on-dead on-dead | #:command handler
> (define π (start (process deadlock) #:on-stop (λ () (displayln 'STOP1)) #:on-dead (λ () (displayln 'DEAD1)) #:on-stop (λ () (displayln 'STOP2)) #:on-dead (λ () (displayln 'DEAD2)) #:command add1)) > (π 1) 2
> (stop π)
STOP1
STOP2
DEAD1
DEAD2
procedure
2.5 Messaging
(require neuron/process/messaging) | package: neuron-lib |
procedure
(forward-to π) → void?
π : process?
procedure
(forward-from π) → void?
π : process?
procedure
(forward-to-evt π) → evt?
π : process?
procedure
(forward-from-evt π) → evt?
π : process?
procedure
(couple-evt π1 π2) → void?
π1 : process? π2 : process?
2.6 Control
(require neuron/process/control) | package: neuron-lib |
procedure
(proxy π [ #:filter-to to-proc #:filter-from from-proc]) → process? π : process? to-proc : (or/c (-> any/c any/c) #f) = #f from-proc : (or/c (-> any/c any/c) #f) = #f
procedure
(proxy-from π [#:with proc]) → process?
π : process? proc : (or/c (-> any/c any/c) #f) = #f
'sink – returns snk
'source – returns src
> (define π-out (server add1)) > (define π-in (sink (λ (x) (give π-out (* x 2))))) > (call (stream π-in π-out) 3) 7
Commands:
'peers – returns an alist of active peers
'add π – adds process π to the set of active peers; returns the key associated with π
'get key – returns the process associated with key, or #f if no such process exists
'drop key – drops the process associated with key; returns #t if key was in use, #f otherwise.
> (define times (let ([N -1]) (service (λ _ (set! N (add1 N)) N) #:on-drop (λ (k _) (displayln `(STOP ,k))))))
> (for ([i 10]) (times `(add ,(server (curry * i)))))
> (writeln (for/list ([i 10]) (call times (list i 3)))) ((0 0) (1 3) (2 6) (3 9) (4 12) (5 15) (6 18) (7 21) (8 24) (9 27))
> (for ([i 10] #:when (even? i)) (times `(drop ,i)))
(STOP 0)
(STOP 2)
(STOP 4)
(STOP 6)
(STOP 8)
> (writeln (for/list ([i 10] #:when (odd? i)) (call times (list i 4)))) ((1 4) (3 12) (5 20) (7 28) (9 36))
> (stop times)
(STOP 9)
(STOP 7)
(STOP 5)
(STOP 3)
(STOP 1)
> (define i 0) > (define t (current-inexact-milliseconds))
> (wait (simulator (λ (p) (printf "~a ~a\n" p (- (current-inexact-milliseconds) t)) (when (> i 2) (die)) (set! i (add1 i)) (sleep 0.25))))
100.0 101.669189453125
100.0 351.734130859375
100.0 601.796142578125
100.0 851.869140625
A bridge will attempt to forward unrecognized commands—
procedure
(shutdown-evt π) → evt?
π : process?