Wishful Coding

Didn't you ever wish your
computer understood you?

Anatomy of a Channel

I have been trying to implement a CyclicBarrier in core.async, but it took me a while to understand how a channel works.

To support IOC and thread and be fast, the implementation is a lot more verbose and complicated than the bunch of promises I initially expected.

It seems all verbosity and callback hell that can be avoided by using channels is bundled in their implementation. Take this loop from cleanup that removes inactive handlers, can you spot the 2 actual lines of code?

(when-not (.isEmpty takes)           ;bla
  (let [iter (.iterator takes)]      ;bla
    (loop [taker (.next iter)]       ;bla
      (when-not (impl/active? taker) ;inactive?
        (.remove iter))              ;KILL!
      (when (.hasNext iter)          ;bla
        (recur (.next iter))))))     ;bla

But let’s go back to the beginning. The basic unit of a channel is a handler, a glorified callback. It has a commit method with returns the callback, an id for avoiding deadlock and a method to check if it is active.

The latter is used with alts!!, so that after one possible channel has been acted upon, the other handlers turn inactive, but lets stick to a simple <!!.

(defprotocol Handler
  (active? [h] "returns true if has callback. Must work w/o lock")
  (lock-id [h] "a unique id for lock acquisition order, 0 if no lock")
  (commit [h] "commit to fulfilling its end of the transfer, returns cb. Must be called within lock"))

;; no locking, always active.
(defn- fn-handler
  [f]
  (reify
   Lock
   (lock [_])
   (unlock [_])
   
   impl/Handler
   (active? [_] true)
   (lock-id [_] 0)
   (commit [_] f)))

Now let’s look at <!! itself, which is pretty simple. It creates a promise and a callback that delivers the promise. It then calls take! with it. If take! returns nil, deref the promise, otherwise deref the box.

(defn <!!
  "takes a val from port. Will return nil if closed. Will block
  if nothing is available."
  [port]
  (let [p (promise)
        ret (impl/take! port (fn-handler (fn [v] (deliver p v))))]
    (if ret
      @ret
      (deref p))))

Now let’s look at the main course, ManyToManyChannel.

If you look through all the verbosity, take! and put! are pretty similar. There are 3 main code paths in each.

  1. There is someone at the other end. Match up the handlers, call their callbacks and return the value or nil in an IDeref.
  2. There is room in the buffer. Take/put the value in the buffer and return the value or nil in an IDeref.
  3. There is no room. Put the handler in a list and return nil.
(take!
 [this handler]
 (.lock mutex)
 ; remove all inactive handlers
 (cleanup this)
 (let [^Lock handler handler
       ; get the actual callback if it is active
       commit-handler (fn []
                        (.lock handler)
                        (let [take-cb (and (impl/active? handler) (impl/commit handler))]
                          (.unlock handler)
                          take-cb))]
   ; If there are items in the buffer,
   ; take one, put it in a box and return it.
   ; We now have a free spot in the buffer
   ; so we try to find an active put handler.
   ; if we find one, put its value in the buffer
   ; and commit its callback.
   (if (and buf (pos? (count buf)))
     (do
       (if-let [take-cb (commit-handler)]
         (let [val (impl/remove! buf)
               iter (.iterator puts)
               cb (when (.hasNext iter)
                    (loop [[^Lock putter val] (.next iter)]
                      (.lock putter)
                      (let [cb (and (impl/active? putter) (impl/commit putter))]
                        (.unlock putter)
                        (.remove iter)
                        (if cb
                          (do (impl/add! buf val)
                              cb)
                          (when (.hasNext iter)
                            (recur (.next iter)))))))]
           (.unlock mutex)
           (when cb
             (dispatch/run cb))
           (box val))
         (do (.unlock mutex)
             nil)))
     ; There is nu buffer to take from,
     ; so we search for an active putter
     ; for each putter check if both ends are active.
     ; If so, remove it from the list and let the callbacks and value,
     (let [iter (.iterator puts)
           [take-cb put-cb val]
           (when (.hasNext iter)
             (loop [[^Lock putter val] (.next iter)]
               ; funny bit where deadlock is avoided
               (if (< (impl/lock-id handler) (impl/lock-id putter))
                 (do (.lock handler) (.lock putter))
                 (do (.lock putter) (.lock handler)))
               (let [ret (when (and (impl/active? handler) (impl/active? putter))
                           [(impl/commit handler) (impl/commit putter) val])]
                 (.unlock handler)
                 (.unlock putter)
                 (if ret
                   (do
                     (.remove iter)
                     ret)
                   (when-not (impl/active? putter)
                     (.remove iter)
                     (when (.hasNext iter)
                       (recur (.next iter))))))))]
       ; if we found 2 callbacks in the previous step
       ; immediately return the value in a box and commit the putter.
       ; if the channel is closed, return nil in a box.
       ; if the channel is open and there is no matching callback
       ; add the handler to the list of takers
       ; and return nil, without a box.
       (if (and put-cb take-cb)
         (do
           (.unlock mutex)
           (dispatch/run put-cb)
           (box val))
         (if @closed
           (do
             (.unlock mutex)
             (if-let [take-cb (commit-handler)]
               (box nil)
               nil))
           (do
             (.add takes handler)
             (.unlock mutex)
             nil)))))))

I wonder if the code could be made more readable with a few macros for locking and iterators.

Understanding put! is left as a exercise for the reader. Understanding the IOC part is left as an exercise for the Clojure gods.

I will write more about my ideas for barriers and transactions later.

TRC Open-sourced

pepijndevos/irc-deploy

I finally took some time to cut out the private bits from my deploy script and open source it to the world.

This script allows you to deploy an IRC server, bouncer, web interface, bot and some custom modules to anything you can think of: VirtualBox, VPS, EC2, OpenStack, Docker…

The only things that are missing are the SSL certificates and services.

You can add your own certificates, generate self-signed ones, or disable SSL altogether:

znc --makepem
openssl req -new -newkey rsa:4096 -days 365 -nodes -x509 -subj \"/C=NL/ST=Gelderland/L=Loenen/O=Wishful Coding/CN=*.teamrelaychat.nl\" -keyout node.key  -out node.cert

Your services are entirely up to you, but I personally use a vbox service for testing:

{:vbox {:provider "vmfest"}}

Now you should be able to just run

lein pallet up --service vbox --roles dev

Of course you can also just let me deploy it for you or hire me to deploy other things.

Dining Philosophers in core.async

For the July Amsterdam Clojure meetup, a lot of people where curious what core.async is all about, so I proposed tackling the dining philosophers problem with core.async.

The problem is explained and a solution proposed in the CSP book section 2.5, but using events rather than channels.

We worked on the problem dojo style, switching the driver seat every few minutes. But with no one really knowing the library very well, progress was slow, and by the end of the meetup we could make one philosopher eat.

One problem we ran into was that go blocks are lexical, so you can’t easily write helper functions that use <!

(defn putdown [ch] (>! ch :fork))
(go (putdown (chan)))
Exception in thread "async-dispatch-1" java.lang.AssertionError: Assert failed: >! used not in (go ...) block

So this morning I sat down to make this thing work.

During the meetup we had a function that would do something silly to setup a vector with 5 forks represented by channels, which I replaced by some equally silly, until I just came up with this.

I use channels with a buffer of one and put a fork in that buffer. This makes sure a fork can but picked up and put down without blocking, but only once.

(defn set-table []
  (let [table [(chan 1) (chan 1) (chan 1) (chan 1) (chan 1)]]
    (doseq [ch table]
      (>!! ch :fork))
    table))

The butler making sure only 4 philosophers are seated is simply represented as

(chan 4)

This leads to the definition of the basic actions

(def pickup <!!)
(def putdown #(>!! % :fork))

(def sit-down #(>!! % :sit))
(def get-up <!!)

The simplified behaviour of a philosopher then becomes

(sit-down butler)
(alts!! my-forks)
(alts!! my-forks)
(println "eating")
(map putdown my-forks)
(get-up butler)
(println "thinking")
(recur)

And finally we can run 5 philos threads.

(doseq [p philosophers]
  (thread (philosopher table butler p)))

Be sure to check out the code and output and join the next meetup if you’re in the Netherlands.