On this page:
2.1 Syntax
forever
while
until
apply-values
2.2 Exchanger
exchanger?
make-exchanger
offer
accept
put
get
2.2.1 Process Exchangers
giver
taker
receiver
emitter
forwarder
filterer
coupler
giver-evt
taker-evt
receiver-evt
emitter-evt
forwarder-evt
filterer-evt
coupler-evt
2.3 Event
evt-set
evt-sequence
evt-series
evt-loop
2.4 Process
unhandled
unhandled-command
process?
process
process-tx
process-rx
start
command
stop
kill
wait
dead?
alive?
current-process
quit
die
deadlock
2.5 Messaging
give
take
recv
emit
call
forward-to
forward-from
filter-to
filter-from
couple
give-evt
take-evt
recv-evt
emit-evt
forward-to-evt
forward-from-evt
filter-to-evt
filter-from-evt
couple-evt
2.6 Control
server
proxy
proxy-to
proxy-from
sink
source
stream
service
simulator
pipe
bridge
managed
shutdown
shutdown-evt

2 Concurrency

2.1 Syntax

 (require neuron/syntax) package: neuron-lib

syntax

(forever body ...)

Evaluates bodys repeatedly.

syntax

(while expr body ...)

Evaluates bodys repeatedly for as long as expr evaluates to #t.

syntax

(until expr body ...)

Evaluates bodys repeatedly for as long as expr evalutes to #f.

syntax

(apply-values proc expr)

Evaluates expr and then applies proc to the resulting values.

Example:
> (apply-values list (values 1 2 3))

'(1 2 3)

2.2 Exchanger

The Neuron Technical Report explains the difference between exchangers and channels.

 (require neuron/exchanger) package: neuron-lib

An exchanger is a channel-based primitive that both synchronizes a pair of threads and passes a value from one to the other. Exchangers are synchronous, fair, and support multiple senders and receivers, but can not be used as synchronizable events directly.

The participants of an exchange can be characterized by two orthogonal factors: control flow and data flow. In an exchange, one side waits for the other to initiate. If the initiating side is transmitting, then the waiting side is receiving. Similarly, if the initiating side is receiving, then the waiting side is transmitting. With this distinction, forwarding exchangers with precise control flow semantics can be defined.

procedure

(exchanger? v)  boolean?

  v : any/c
Returns #t if v is an exchanger, #f otherwise.

procedure

(make-exchanger)  exchanger?

Creates and returns a new exchanger.

procedure

(offer ex1 #:to ex2)  void?

  ex1 : exchanger?
  ex2 : exchanger?
Blocks until ex2 is ready to accept ex1.

procedure

(accept #:from ex)  exchanger?

  ex : exchanger?
Blocks until an exchanger is offered to ex.

procedure

(put v #:into ex)  void?

  v : any/c
  ex : exchanger?
Blocks until an exchanger is ready to get v from ex.

procedure

(get #:from ex)  any/c

  ex : exchanger?
Blocks until an exchanger puts a value into ex.

2.2.1 Process Exchangers

 (require neuron/process/exchanger) package: neuron-lib

procedure

(giver tx rx v)  void?

  tx : exchanger?
  rx : exchanger?
  v : any/c
Offers tx to rx, then puts v into tx.

procedure

(taker rx)  any/c

  rx : exchanger?
Gets a value from an exchanger accepted from rx.

procedure

(receiver rx tx)  any/c

  rx : exchanger?
  tx : exchanger?
Offers rx to tx, then gets a value from rx.

procedure

(emitter tx v)  void?

  tx : exchanger?
  v : any/c
Puts v into an exchanger accepted from tx.

procedure

(forwarder ex1 ex2)  void?

  ex1 : exchanger?
  ex2 : exchanger?
Offers an exchanger accepted from ex1 to ex2.

procedure

(filterer ex1 ex2 #:with proc)  void?

  ex1 : exchanger?
  ex2 : exchanger?
  proc : (-> any/c any/c)
Forwards a value from ex1 to ex2. Applies proc to the value being forwarded.

procedure

(coupler rx tx [ex])  void?

  rx : exchanger?
  tx : exchanger?
  ex : exchanger? = (make-exchanger)
Offers ex to rx and tx.

procedure

(giver-evt tx rx v)  evt?

  tx : exchanger?
  rx : exchanger?
  v : any/c
Returns a fresh synchronizable event that becomes ready for synchronization when (giver tx rx v) would not block.

procedure

(taker-evt rx)  evt?

  rx : exchanger?
Returns a constant synchronizable event that becomes ready for synchronization when (taker rx) would not block, and the synchronization result is the value taken through rx.

procedure

(receiver-evt rx tx)  evt?

  rx : exchanger?
  tx : exchanger?
Returns a constant synchronizable event that becomes ready for synchronization when (receiver rx tx) would not block, and the synchronization result is the value received through rx.

procedure

(emitter-evt tx v)  evt?

  tx : exchanger?
  v : any/c
Returns a fresh synchronizable event that becomes ready for synchronization when (emitter tx v) would not block.

procedure

(forwarder-evt ex1 ex2)  evt?

  ex1 : exchanger?
  ex2 : exchanger?
Returns a constant synchronizable event that becomes ready for synchronization when (forwarder ex1 ex2) would not block.

procedure

(filterer-evt ex1 ex2 #:with proc)  evt?

  ex1 : exchanger?
  ex2 : exchanger?
  proc : (-> any/c any/c)
Returns a constant synchronizable event that becomes ready for synchronization when (filterer ex1 ex2 #:with proc) would not block.

procedure

(coupler-evt rx tx [ex])  evt?

  rx : exchanger?
  tx : exchanger?
  ex : exchanger? = (make-exchanger)
Returns a constant synchronizable event that becomes ready for synchronization when (coupler rx tx ex) would not block.

2.3 Event

 (require neuron/event) package: neuron-lib

procedure

(evt-set evt ...)  evt?

  evt : evt?
Returns a fresh synchronizable event that becomes ready for synchronization when all evts are ready for synchronization. The synchronization result is a list of the synchronization results of evts in the order specified.

Example:
> (sync
   (evt-set
    (wrap-evt (thread (λ () (sleep 0.1) (write 1))) (λ _ 1))
    (wrap-evt (thread (λ () (write 2))) (λ _ 2))))

21

'(1 2)

procedure

(evt-sequence make-evt    
  ...+    
  [#:then make-result])  evt?
  make-evt : (-> evt?)
  make-result : (-> any/c any) = values
Returns a fresh synchronizable event that becomes ready for synchronization when all events generated by make-evts are ready for synchronization. Calls each make-evt in the order specified and immediately syncs the result. Wtaps the last make-evt in a handle-evt that applies the synchronization result of the previous event to make-result. The synchronization result of the sequence is the synchronization result of its final event.

Example:
> (sync
   (evt-sequence
    (λ () (wrap-evt (thread (λ () (sleep 0.1) (write 1))) (λ _ 1)))
    (λ () (wrap-evt (thread (λ () (write 2))) (λ _ 2)))))

12

2

procedure

(evt-series [#:init init]    
  make-evt ...+    
  [#:then make-result])  evt?
  init : any/c = (void)
  make-evt : (-> any/c evt?)
  make-result : (-> any/c any) = values
Returns a fresh synchronizable event that becomes ready for synchronization when all events generated by make-evts have become ready for synchronization. Calls each make-evt in the order specified and immediately syncs the result. Applies make-evt first to init, then to the synchronization result of the previous event. Wraps the last make-evt in a handle-evt that applies the synchronization result of the previous event to make-result. The synchronization result of the series is the synchronization result of its final event.

Example:
> (sync
   (evt-series
    #:init 1
    (λ (x) (wrap-evt (thread (λ () (write x))) (λ _ (+ x 2))))
    (λ (x) (wrap-evt (thread (λ () (write x))) (λ _ (+ x 4))))))

13

7

procedure

(evt-loop [#:init init] next-evt)  evt?

  init : any/c = (void)
  next-evt : (-> any/c evt?)
Returns a fresh synchronizable event that is never ready for synchronization. Repeatedly calls next-evt and immediately syncs the result. Applies next-evt first to init, then to the synchronization result of the previous event.

Example:
> (sync
   (evt-loop
    #:init 1
    (λ (x)
      (if (> x 5)
          (raise x)
          (wrap-evt always-evt (λ _ (+ x 1)))))))

uncaught exception: 6

2.4 Process

 (require neuron/process) package: neuron-lib

A process is a thread-like concurrency primitive. Processes are made from threads by replacing the thread mailbox with a few other features:

A process can be applied as a procedure, which invokes its command handler, or handler. The command handler is a list of procedures, and the result of a command is the same as the result of the first procedure in the list to return a value other than unhandled. If every procedure returns unhandled or the list is empty, unhandled-command is raised.

> (define π
    (start
     (process deadlock)
     #:command (bind ([A 1]
                      [B (λ _ 2)])
                     #:else unhandled)))
> (π 'A)

1

> ((π 'B) 5)

2

> (π '(x y))

uncaught exception: #(struct:unhandled-command

#<procedure:process> ((x y)))

A process can be used as a synchronizable event. A process is ready for synchronization when dead? would return #t. The synchronization result is the process itself.

Unhandled exceptions are fatal. Attempting to synchronize a process killed by an unhandled exception re-raises the exception.

> (sync (process (λ () (raise 'VAL))))

uncaught exception: VAL

Processes are created explicitly by the process function. Use start to install hooks and handlers.

Return this value from a command handler to indicate that it will not handle a command.

struct

(struct unhandled-command (process args)
    #:transparent)
  process : process?
  args : (listof any/c)
Raised when a command handler applied to args returns unhandled.

procedure

(process? v)  boolean?

  v : any/c
Returns #t if v is a process, #f otherwise.

procedure

(process thunk)  process?

  thunk : (-> any)
Calls thunk with no arguments in a new process. Returns immediately with a process descriptor value.

procedure

(process-tx π)  transmitter?

  π : process?
Returns the transmitting exchanger of π.

procedure

(process-rx π)  transmitter?

  π : process?
Returns the receiving exchanger of π.

syntax

(start π-expr hooks-and-handlers ...)

 
hooks-and-handlers = #:on-stop on-stop
  | #:on-dead on-dead
  | #:command handler
Installs hooks-and-handlers into all processes created in the lexical scope of π-expr.

Example:
> (define π
    (start
     (process deadlock)
     #:on-stop (λ () (displayln 'STOP1))
     #:on-dead (λ () (displayln 'DEAD1))
     #:on-stop (λ () (displayln 'STOP2))
     #:on-dead (λ () (displayln 'DEAD2))
     #:command add1))
> (π 1)

2

> (stop π)

STOP1

STOP2

DEAD1

DEAD2

procedure

(command π v ...)  any

  π : process?
  v : any/c
Applies the command handler of π to vs and returns the result. Does not raise unhandled-command if the result is unhandled.

procedure

(stop π)  void?

  π : process?
Gracefully terminates the execution of π if it is running. Blocks until π is dead. If π is already dead, stop has no effect.

procedure

(kill π)  void?

  π : process?
Immediately terminates the execution of π if it is running. Blocks until π is dead. If π is already dead, kill has no effect.

procedure

(wait π)  void?

  π : process?
 = (void (sync π))
Blocks until π is ready for synchronization.

procedure

(dead? π)  boolean?

  π : process?
Returns #t if π has terminated, #f otherwise.

procedure

(alive? π)  boolean?

  π : process?
Returns #t if π is not dead, #f otherwise.

procedure

(current-process)  process?

Returns the process descriptor for the currently executing process.

procedure

(quit v ...)  void?

  v : any/c
Gracefully terminates the current process, ignoring any arguments.

procedure

(die v ...)  void?

  v : any/c
Immediately terminates the current process, ignoring any arguments.

procedure

(deadlock v ...)  void?

  v : any/c
Hangs the current process, ignoring any arguments.

2.5 Messaging

 (require neuron/process/messaging) package: neuron-lib

procedure

(give π [v])  boolean?

  π : process?
  v : any/c = (void)
Blocks until π is ready to accept v on its receiving exchanger, or until π is dead. Returns #t if π accepted v, #f otherwise.

procedure

(take)  any/c

Blocks until a sender is ready to provide a value on the receiving exchanger of the current process. Returns the provided value.

procedure

(recv π)  any/c

  π : process?
Blocks until π is ready to provide a value through its transmitting exchanger, or until π is dead. Returns the provided value, or eof if π died.

procedure

(emit [v])  void?

  v : any/c = (void)
Blocks until a receiver is ready to accept the value v through the transmitting exchanger of the current process.

procedure

(call π [v])  any/c

  π : process?
  v : any/c = (void)
Gives v to π and then immediately recvs from π. Returns the received value.

procedure

(forward-to π)  void?

  π : process?
Takes a value and gives it to π.

procedure

(forward-from π)  void?

  π : process?
Emits a value received from π.

procedure

(filter-to π #:with proc)  void?

  π : process?
  proc : (-> any/c any/c)
Takes a value, applies proc to it, and gives the result to π.

procedure

(filter-from π #:with proc)  void?

  π : process?
  proc : (-> any/c any/c)
Receives a value from π, applies proc to it, and emits the result.

procedure

(couple π1 π2)  void?

  π1 : process?
  π2 : process?
Receives a value from π1 and gives it to π2.

procedure

(give-evt π [v])  evt?

  π : process?
  v : any/c = (void)
Returns a fresh synchronizable event that becomes ready for synchronization when π is ready to accept the value v on its receiving exchanger, or until π is dead. The synchronization result is #t if π accepted v, #f otherwise.

procedure

(take-evt)  evt?

Returns a constant synchronizable event that becomes ready for synchronization when a sender is ready to provide a value on the receiving exchanger of the current process. The synchronization result is the provided value.

procedure

(recv-evt π)  evt?

  π : process?
Returns a constant synchronizable event that becomes ready for synchronization when π is ready to provide a value through its transmitting exchanger, or until π is dead. The synchronization result is the provided value or eof.

procedure

(emit-evt [v])  evt?

  v : any/c = (void)
Returns a fresh synchronizable event that becomes ready for synchronization when a receiver is ready to accept the value v through the transmitting exchanger of the current process.

procedure

(forward-to-evt π)  evt?

  π : process?
Returns a constant synchronizable event that becomes ready for synchronization when a value has been taken and then given to π.

procedure

(forward-from-evt π)  evt?

  π : process?
Returns a constant synchronizable event that becomes ready for synchronization when a value has been received from π and then emitted.

procedure

(filter-to-evt π #:with proc)  evt?

  π : process?
  proc : (-> any/c any/c)
Returns a constant synchronizable event that becomes ready for synchronization when (filter-to π #:with proc) would not block.

procedure

(filter-from-evt π #:with proc)  evt?

  π : process?
  proc : (-> any/c any/c)
Returns a constant synchronizable event that becomes ready for synchronization when (filter-from π #:with proc) would not block.

procedure

(couple-evt π1 π2)  void?

  π1 : process?
  π2 : process?
Returns a constant synchronizable event that becomes ready for synchronization when a value has been received from π1 and then given to π2.

2.6 Control

 (require neuron/process/control) package: neuron-lib

procedure

(server proc)  process?

  proc : (-> any/c any/c)
Applies proc to each value taken and then emits the result.

Example:
> (define π (server add1))
> (call π 1)

2

> (call π -1)

0

procedure

(proxy π    
  [#:filter-to to-proc    
  #:filter-from from-proc])  process?
  π : process?
  to-proc : (or/c (-> any/c any/c) #f) = #f
  from-proc : (or/c (-> any/c any/c) #f) = #f
Forwards values to and from π. Filters taken values with to-proc when not #f. Filters emitted values with from-proc when not #f. Stops π when it stops. Dies when π dies.

Example:
> (call (proxy (server (curry * 3))) 2)

6

procedure

(proxy-to π [#:with proc])  process?

  π : process?
  proc : (or/c (-> any/c any/c) #f) = #f
Gives all values taken to π. Filters taken values with proc when not #f. Stops π when it stops. Dies when π dies.

procedure

(proxy-from π [#:with proc])  process?

  π : process?
  proc : (or/c (-> any/c any/c) #f) = #f
Emits all values emitted by π. Filters emitted values with proc when not #f. Stops π when it stops. Dies when π dies.

procedure

(sink proc)  process?

  proc : (-> any/c any)
Applies proc to each value taken and ignores the result.

Example:
> (define i 0)
> (define π (sink (λ (x) (set! i (+ i x)))))
> (give π 1)

#t

> (give π 2)

#t

> i

3

procedure

(source proc)  process?

  proc : (-> any/c)
Calls proc repeatedly and emits each result.

Example:
> (define π (source random))
> (recv π)

0.1591727701267079

> (recv π)

0.9636171831359096

procedure

(stream snk src)  process?

  snk : process?
  src : process?
Forwards to snk and from src. Stops snk and src when it stops. Dies when both snk and src die.

Commands:
  • 'sink – returns snk

  • 'source – returns src

Example:
> (define π-out (server add1))
> (define π-in (sink (λ (x) (give π-out (* x 2)))))
> (call (stream π-in π-out) 3)

7

procedure

(service key-proc [#:on-drop on-drop])  process?

  key-proc : (-> process? any/c)
  on-drop : (-> any/c process? any) = void
Associates processes to keys generated by key-proc. When given (list key v), forwards v to the process associated with key. Emits (list key v) when the process associated with key emits v. Applies on-drop to each key–process pair it drops. Drops each process that dies. Drops every process when it stops.

Commands:

Example:
> (define times
    (let ([N -1])
      (service
       (λ _ (set! N (add1 N)) N)
       #:on-drop (λ (k _) (displayln `(STOP ,k))))))
> (for ([i 10])
    (times `(add ,(server (curry * i)))))
> (writeln
   (for/list ([i 10])
     (call times (list i 3))))

((0 0) (1 3) (2 6) (3 9) (4 12) (5 15) (6 18) (7 21) (8 24) (9 27))

> (for ([i 10] #:when (even? i))
    (times `(drop ,i)))

(STOP 0)

(STOP 2)

(STOP 4)

(STOP 6)

(STOP 8)

> (writeln
   (for/list ([i 10] #:when (odd? i))
     (call times (list i 4))))

((1 4) (3 12) (5 20) (7 28) (9 36))

> (stop times)

(STOP 9)

(STOP 7)

(STOP 5)

(STOP 3)

(STOP 1)

procedure

(simulator proc [#:rate rate])  process?

  proc : (-> real? any)
  rate : real? = 10
Repeatedly calls proc at a frequency of up to rate times per second. Applies proc to the period corresponding to rate in milliseconds.

Example:
> (define i 0)
> (define t (current-inexact-milliseconds))
> (wait
   (simulator
    (λ (p)
      (printf "~a ~a\n" p (- (current-inexact-milliseconds) t))
      (when (> i 2) (die))
      (set! i (add1 i))
      (sleep 0.25))))

100.0 101.669189453125

100.0 351.734130859375

100.0 601.796142578125

100.0 851.869140625

procedure

(pipe π ...+)  process?

  π : process?
Calls πs in series, implicitly starting with take and ending with emit. Stops all πs when it stops. Dies when any π dies.

Example:
> (define π
    (pipe
     (server add1)
     (server (curry * 3))
     (server sub1)))
> (call π 2)

8

procedure

(bridge π1 π2)  process?

  π1 : process?
  π2 : process?
Forwards from π1 to π2, and vice versa. Stops π1 and π2 when it stops. Dies when π1 or π2 die.

A bridge will attempt to forward unrecognized commands—first to π1, then to π2before raising unhandled-command.

Example:
> (wait
   (bridge
    (server add1)
    (process (λ () (emit 1) (writeln (take))))))

2

procedure

(managed π)  process?

  π : process?
Forwards non-eof values to and from π. Stops π when it stops. Dies when π dies.

Example:
> (define π (managed (server add1)))
> (call π 1)

2

> (shutdown π)
> (dead? π)

#t

procedure

(shutdown π)  void?

  π : process?
Gives eof to π and blocks until it dies.

procedure

(shutdown-evt π)  evt?

  π : process?
Gives eof to π and returns a synchronizable event that becomes ready for synchronization when π dies. The synchronization result is π.