Wishful Coding

Didn't you ever wish your computer understood you?

A new seque

I have used clojure.core/seque quite a bit lately, and I love it.

Creates a queued seq on another (presumably lazy) seq s. The queued
seq will produce a concrete seq in the background, and can get up to
n items ahead of the consumer. n-or-q can be an integer n buffer
size, or an instance of java.util.concurrent BlockingQueue. Note
that reading from a seque can block if the reader gets ahead of the
producer.
First I used it in Begame for decoupling the frame computation from the rendering. But for most games, this needs to work without the frames getting ahead of rendering.

This morning I wanted to make a lazy heapsort in one line, rather than with my own heap implementation.

That is when problems started to occur. While seque optionally takes a BlockingQueue as argument, it is not made to support exotic queues like SynchronousQueue(for Begame) and PriorityBlockingQueue(for heapsort), which do a bit more than just threading items through a queue.

Long story short, I submitted a bug report and wrote something that works for me.

Update: I'm not even sure It's possible anymore to write seque so that it works in all cases.

(ns test
(:import [java.util.concurrent
BlockingQueue
LinkedBlockingQueue
SynchronousQueue
PriorityBlockingQueue
CyclicBarrier])
(:use clojure.test)
(:refer-clojure :exclude [seque]))

;; some BlockingQueues are...
;; not ordered
;; not finite
;; unable to contain nil
;; without content
;; mutable
;; dropping or recieving extra items

;; seque...
;; fills the que on another thread
;; propagates errors
;; replaces nil with a sentinel
;; detects the end of the input seq
;; can be consumed on any thread

(defn seque
"Creates a queued seq on another (presumably lazy) seq s. The queued
seq will produce a concrete seq in the background, and can get up to
n items ahead of the consumer. n-or-q can be an integer n buffer
size, or an instance of java.util.concurrent BlockingQueue. Note
that reading from a seque can block if the reader gets ahead of the
producer."

{:added "1.0"}
([s] (seque 100 s))
([n-or-q s]
(let [^BlockingQueue q (if (instance? BlockingQueue n-or-q)
n-or-q
(LinkedBlockingQueue. (int n-or-q)))
NIL (Object.) ;nil sentinel since BQ doesn't support nils
s (map (fnil identity NIL) s)
channel (LinkedBlockingQueue.)
fill (fn fill [s]
(try
(if (seq s)
(do
(.put channel #(.take q))
(.put q (first s))
(recur (next s)))
(.put channel #(throw (InterruptedException.))))
(catch Exception e
(.put channel #(throw e)))))
fut (future (fill s))
drain (fn drain []
(lazy-seq
(try
(cons ((.take channel)) (drain))
(catch InterruptedException e nil))))]
(map #(if (identical? % NIL) nil %) (drain)))))

(defn trickle
[slow]
(map #(do (Thread/sleep 100) %) slow))

(deftest identity?
(is (= (seque (range 10)) (range 10))))

(deftest priority
(is (= 4 (count (seque (PriorityBlockingQueue.)
[:a :b :c :a])))))

(deftest synchronous
(is (= (seque (SynchronousQueue.)
(range 10))
(range 10))))

(deftest nils
(is (= (seque [nil true false []])
[nil true false []])))

(deftest errors
(is (thrown? Throwable
(dorun (seque (map #(throw (Exception. (str %))) (range 10)))))))

(deftest threads
(let [s (seque (SynchronousQueue.) (range 10))
f1 (future (doall s))
f2 (future (doall s))]
(is (= (range 10) @f1 @f2))))

(defn moronic [] ; untestable
(let [q (LinkedBlockingQueue. 100)]
(future (dotimes [_ 1000] (Thread/yield) (.poll q)))
(future (dotimes [_ 1000] (Thread/yield) (.offer q :foo)))
(seque q (range 1000))))
Pepijn de Vos