|
| 1 | +import std/asyncdispatch |
| 2 | +import std/os |
| 3 | +import std/strutils |
| 4 | + |
| 5 | + |
| 6 | + |
| 7 | +## |
| 8 | +## # Channel[T] |
| 9 | +## |
| 10 | +## As far as I know Nim's asyncdispatch has many features but lacks an awaitable |
| 11 | +## channel type. It uses Futures as the work horse of async communication instead. |
| 12 | +## |
| 13 | +## I craeted a very simple async-awaitable channel type so I can match the |
| 14 | +## reference Go implementation this benchmark originated from. |
| 15 | +## |
| 16 | + |
| 17 | +type Channel[T] = object |
| 18 | + ## An simple one-item, async-awaitable Channel. |
| 19 | + untilIsEmpty: Future[void] |
| 20 | + untilIsFull: Future[void] |
| 21 | + val: T |
| 22 | + |
| 23 | + |
| 24 | +proc newChannel[T](): ref Channel[T] = |
| 25 | + ## Initializer. Allocate a new ref Channel object on the heap. |
| 26 | + result = new Channel[T] |
| 27 | + result[].untilIsEmpty = newFuture[void]() |
| 28 | + result[].untilIsFull = newFuture[void]() |
| 29 | + result[].untilIsEmpty.complete() |
| 30 | +
|
| 31 | +
|
| 32 | +proc send[T](chan: ref Channel[T], val: T) {.async.} = |
| 33 | + # Accept val if empty, otherwise, suspend until empty. |
| 34 | + await chan[].untilIsEmpty |
| 35 | + chan[].untilIsEmpty = newFuture[void]() |
| 36 | + chan[].val = val |
| 37 | + chan[].untilIsFull.complete() |
| 38 | +
|
| 39 | +
|
| 40 | +proc recv[T](chan: ref Channel[T]): Future[T] {.async.} = |
| 41 | + # Return held val if full, otherwise, suspend until full. |
| 42 | + await chan[].untilIsFull |
| 43 | + chan[].untilIsFull = newFuture[void]() |
| 44 | + result = chan[].val |
| 45 | + chan[].untilIsEmpty.complete() |
| 46 | +
|
| 47 | +
|
| 48 | +
|
| 49 | +## |
| 50 | +## # Benchmark |
| 51 | +## |
| 52 | +## Below, "Concurrent Prime Sieve" that matches Go reference implementation. |
| 53 | +## |
| 54 | +## [X] Uses coroutines. |
| 55 | +## [X] Uses a coroutine scheduler. |
| 56 | +## [X] Uses an async channel for communitating between coroutines. |
| 57 | +## [X] Same 3 functions, structured like the reference. |
| 58 | +## |
| 59 | +
|
| 60 | +
|
| 61 | +proc generate(chan: ref Channel[int]) {.async.} = |
| 62 | + ## Send the sequence 2, 3, 4, ... to cannel `chan`. |
| 63 | + for i in 2 .. int.high: |
| 64 | + await chan.send(i) |
| 65 | +
|
| 66 | +
|
| 67 | +proc filter(inChan, outChan: ref Channel[int], prime: int) {.async.} = |
| 68 | + ## Copy the values from channel `inChan` to channel `outChan`, removing those |
| 69 | + ## divisible by `prime`. |
| 70 | + while true: |
| 71 | + let i = await inChan.recv() # revieve value from `inChan` |
| 72 | + if i mod prime != 0: |
| 73 | + await outChan.send(i) # send `i` to `outChan` |
| 74 | +
|
| 75 | +
|
| 76 | +proc main(n: int) {.async.} = |
| 77 | + ## The prime sieve: Daisy-chain filter processes. |
| 78 | + var firstChan = newChannel[int]() # craete a new channel |
| 79 | + asyncCheck generate(firstChan) # launch generate coroutine |
| 80 | + for i in 0 ..< n: |
| 81 | + let prime = await firstChan.recv() |
| 82 | + echo prime |
| 83 | + var secondChan = newChannel[int]() |
| 84 | + asyncCheck filter(firstChan, secondChan, prime) |
| 85 | + firstChan = secondChan |
| 86 | +
|
| 87 | +
|
| 88 | +when isMainModule: |
| 89 | +
|
| 90 | + let n = if paramCount() > 0: parseInt(paramStr(1)) else: 100 |
| 91 | + waitFor main(n) |
| 92 | +
|
| 93 | +
|
0 commit comments