Clojure — transducers], reducer and other dregs

lately, a certain known transducers — a new feature of the not yet released Clojure 1.7. At the time of writing, relevant Сlojure 1.7-alpha5, but have already appeared a fair amount of ports transducers] in a variety of languages: Python, Ruby, JavaScript, PHP, Java, C++, Lua, Erlang. And... it's, frankly, a bit discouraging. After quite a long time ago (in Clojure 1.5) added the library reducers. So that's about reduceri nobody said, anywhere, nothing ported, although, like as they do similar things... Or not?

Let's see what we needed in Clojure, all these reducers & transducers (we really need?), how they work, how to use them... And finally find out whether it is time to throw the reducers in the dump.

It is wrong to describe concepts that emerged in Clojure outside the context of the language. Because next will be a lot of listings in Clojure. But there will be no calculus. In General, basic knowledge of Clojure is appropriate (especially the idea of sequences), but do not have to know Haskell. Also advance warning that all of the listings are standard features in fact changed very much, sometimes even "slightly" broken. All for the benefit of simplification. Ah Yes, picture the same burrito.

the

Fold


So, Clojure is a functional language, which means ordinary imperative cycle not good.
Well, okay, we didn't want — there's funkcionalnosti reduce!

the
(defn my-reduce
([rf coll] ;; this option for convenience
(if-let [s (seq coll)]
(my-reduce rf (first s) (next s))
(rf)))
([rf acc coll]
(if-let [[x &xs] (seq coll)]
(recur (rf acc x) xs)
acc)))

In fact, reduce, of course, realized bit different, but it doesn't matter now, forget it. The function rf (let's call it the f-function) here takes two arguments: the first is a "moving state"; the second element of the sequence coll. If the initial state is not specified, then use (first coll) or (rf). Check the entire collection coll, for each element called rf, while "dragging" the state of the acc. When the elements over, then simply return acc.

A small example. Let's say we have a list of strings, I want to calculate their total length.
Here is imperative code with a loop:

the
(defn length-of-strings [strings]
(with-local-vars [acc 0] ;; Yes, Clojure has local variables!
(doseq [c strings]
(var-set
acc
(+ @acc (count c)))) ;; actually all work
@acc))

The loop is a simple counter acc (number). At each iteration, we suppose it equal to (+ @acc (count c)).
And now, once more, only through reduce:

the
(defn length-of-strings [coll]
(my-reduce
(fn ([acc c] (+ acc (count c)))) ;; our f-function
0 ;; starting value
coll))

In case, if you temporarily "forgotten" about laziness, it can be a primitive operation implemented like map or filter.

the
(defn my-map [f coll]
(my-reduce
(fn [acc c] (conj acc (f c)))
[]
coll))

(defn my-filter [p coll]
(my-reduce
(fn [acc c] (if (p c) (conj acc c) acc))
[]
coll))

To implement take the option reduce will not fit — the loop always runs through the whole sequence (it's not there any Haskell where everything is lazy).

In order to overcome this shortcoming in version 1.5 added a special spike marker reduced and the corresponding predicate offer?. At the same time rewrote reduce, to get something like this:
the
(defn my-reduce
([rf coll]
(if-let [s (seq coll)]
(my-reduce rf (first s) (next s))
(rf)))
([rf acc coll]
(if-let [[x &xs] (seq coll)]
(let [ret (rf acc x)]
(if (reduced? ret)
@ret
(recur ret rf xs)))
acc)))

As soon as the f-function will return (reduced ...), the loop terminates and returns the value of @ret.

the
(defn take-r [n coll]
(my-reduce
(fn [[n1 acc] c]
(if (pos? n1)
[(dec n1) (conj acc c)]
(reduced acc)))
[n []]
coll))

;; the function supports an infinite sequence!
(take-r 5 (range))
;; => [0 1 2 3 4]

It is impossible not to recall the wonderful function reductions. Essentially this is an analogue of reduce only returns lazy list of all intermediate values of acc, not just the last. It is very handy to use when debugging. Write the step of the algorithm as a function that runs reduce on the collection of the input data. If suddenly something not so, the model reduce to reductions, run in the REPL and get all the intermediate steps. With cycles it just won't work — it will be necessary to fence crutches debug, which is not very convenient.

Is reductions useful in and of itself, there are some factorials to calculate:

the
;; => *lazy* sequence of factorials
(def factorials (reductions *' (cons 1 (map inc (range)))))

(nth factorials 20)
;; => 2432902008176640000

Clojure uses sequences to iterate over collections. If we decide to run through the vector, a hash table or a simple iterator, then the heap will create a fair amount of temporary objects.

The obvious optimization, which asks in this situation is to implement a specialized reduce for those collections for which it makes sense. Well, if the collection is not amenable to such optimizations, then use a standard implementation similar to the one shown in the beginning of the article. It is a special Protocol clojure.core.protocol/CollReduce. When the object collection supports it, then this implementation will be used inside clojure.core/reduce. Therefore, reduce in Clojure is usually faster than a similar loop doseq.

the

transformers


A transformer is a function that takes one f-function and returns a new.
For example, here is the transformer "increase by 1":

the
(defn inc-t [rf]
(fn [acc c] (rf acc (inc c))))

;; and immediately an example of using
(reduce + 0 (map inc [1 2 3 4]))
;; => 14

(reduce (inc-t +) 0 [1 2 3 4])
;; => 14

Can be the case generalized a bit to allow instead of inc to specify any function:

the
(defn map-t [f]
(fn [rf]
(fn [acc c] (rf acc (f c)))))

(def inc-t (map-t inc))
(def dec-t (map-t dec))
;; ...

(reduce (inc-t +) 0 [1 2 3 4])
;; => 14

Here, for example, the transformer "filter feeders":

the
(defn filter-t [pred]
(fn [rf]
(fn [acc c]
(if (pred c)
(rf acc c)
acc))))

(def odd?-t (filter-t odd?))
(def even?-t (filter-t even?))

;; example
(reduce (even?-t *) 1 [1 2 3 4])
;; => 8

Is it possible to combine multiple transformers? Of course!

the
(defn odd?-inc-t [rf]
(odd?-t (inc-rf t)))

;; ..or a little more canonically
(def odd?-inc-t (comp (filter-t odd?) (map-t inc)))

;; which is logically equivalent..
(def odd?-inc-t
(comp
(fn [rf]
(fn [acc c]
(if (odd? c) (rf acc c) acc)))
(fn [rf]
(fn [acc c]
(rf acc (inc c))))))

;; which will give the equivalent of such a function
(defn odd?-inc-t [rf]
(fn [acc c]
(if (odd? c)
(rf acc (inc c))
acc)))

;; example usage
(reduce * 1 (->> [1 2 3 4 5] (map inc) (filter odd?)))
;; => 15

(reduce (odd?-inc-t *) 1 [1 2 3 4 5])
;; ==> 15

You should pay attention that the transformers are in "reverse" order. If we want the elements of the collection were processed by a transformer A before they get into B, then glue them as you need (comp A B). Now the trick:

the
(def cc (vec (range 1000000)))

(time (reduce + 0 (->> cc (filter odd?) (map inc))))
;; "Elapsed time: 171.390143 msecs"
;; => 250000500000

(time (reduce ((comp (filter-t odd?) (map-t inc)) +) 0 cc))
;; "Elapsed time: 93.246015 msecs"
;; => 250000500000

Here's how increase speed on level ground! All, of course, depends on many details and various nansov, so in reality the gain can be different. In General, I want to say that you should not take this piece of code as a benchmark.
But the overall results is not surprising. When using map and filter is created 2 the intermediate sequence. We run through the original vector, create a temporary list of filtered values. Then run through this list and build another one, but with enhanced elements. And finally, the check is on it, summing up the values.

On the other hand, the option of transformers does not create any temporary collections. Instead, the source elements are immediately applied and odd? and inc.

the

Where are my reducer?


And it was all good until version 1.5 has not introduced a new standard library clojure.core.reducers. Exactly a separate library, you'll need to explicitly import. It also announced its version of map, filter, take-while, and others. And, of course, they are not compatible with regular versions of clojure.core. Therefore, it is better to write (require '[clojure.core.reducers :as r]) instead of simply (use 'clojure.core.reducers).

So, what is reducer? Brief and stupid: reducer is any object which reducatse. Any collection in terms of clojure.core.reducers is reducer. Hash-table — reducer. And java.lang.String reducer. Well, nil, of course toe. View definition:

the
(defn reducer [coll xf]
;; `xf` is the transformer
(reify
clojure.core.protocols/CollReduce
(coll-reduce [this f1]
(let [f2 (xf f1)]
(clojure.core.protocols/coll-reduce coll f2 (f2))))
(coll-reduce [this f1 init]
(let [f2 (xf f1)]
(clojure.core.protocols/coll-reduce coll f2 init)))))

Here, a collection coll, and returns a new one, at which to run reduce, and only that. To add an element or delete, or even go through the elements. But before starting reduce f-the function will be passed through transformer xf.

the
(def nums [1 2 3 4 5])
(def nums+1 (reducer nums inc-t))

(reduce + 0 nums)
;; => 15

(reduce + 0 nums+1)
;; => 20

As already mentioned, in the reducers library announced their own versions of map, filter, take-while, and the like. All of them take reducer and return a new, to which is attached a matching transformer.

So could be a clojure.core.reducers/map (she looks very inace):

the
(def map-r [f coll]
(reducer coll (map f t)))

And now a few examples of how all this stuff can be used:

the
(require '[clojure.core.reducers :as r])

(def nums [1 2 3 4 5 6 7 8])

(type (map inc nums))
;; = > clojure.lang.LazySeq

(reduce conj [] (map inc nums))
;; = > ; [2 3 4 5 6 7 8 9]

(type (r/map inc nums))
;; = > clojure.core.reducers$folder$reify__1234
;; there is no sequence

(reduce conj [] (r/map inc nums))
;; = > ; [2 3 4 5 6 7 8 9]
;; but still able reducirse

(reduce conj [] (r/filter odd? nums))
;; => [1 3 5 7]

(reduce + 0 (->> nums (r/map inc) (r/map inc)))
;; => 52
;; ~~ (+ 0 (inc (inc 1)) (inc (inc 2)) ...)

(reduce + 0 (->> nums (r/filter odd?) (r/map inc)))
;; => 20
;; ~~ (+ 0 (inc 1) (inc 3) ...)

the

Parallelize


To be honest, "reducers" so named for good reason. It would be better to "folders". Because in addition to the Protocol CollReduce (which appeared long before reducers), the library announced another, more important, the Protocol CollFold:

the
(defprotocol CollFold
(coll-fold [n combinef reducef coll]))

In principle very similar, only the f-functions two now, but also added a strange argument n. Idea: for some collections, you can proberutsya in several threads. Briefly: divide it into blocks about the size of n elements, each piece is collapsible with #(reduce reducef (combinef) %), then a list of results (one per unit) turn again, but this time with #(reduce combinef %).

Reduser who knows how to fold itself in parallel, is called folder.
Only 2 standard collection Protocol supports the CollFold — vector and the hash table.

the
(def v (vec (range 10000000)))

;; linearly, 1 stream
(time (reduce + v))
;; "Elapsed time: 648.897616 msecs"
;; => 49999995000000

;; in several threads
(time (r/coll-fold v 512 + +))
;; "Elapsed time: 187.414147 msecs"
;; => 49999995000000

All standard reducer for which it makes sense to implement CollFold. This, for example, r/map, r/filter, r/mapcat, r/flatten. On the other hand, r/take, r/take-while, r/drop do not support parallelization. The above cited implementation of r/map. Here's an updated version:

the
(def map-r [f coll]
;; just replaced `reducer` to `folder`
(folder coll (map f t)))

Directly use coll-fold is not necessary — for everyday needs there is a wrapper fold. It is set to the default value for n (the block size) is 512. In General, the hint is clear — reducers are clearly intended for large collections (>1K elements). And again: do not use coll-fold directly call fold.

Oh, there's foldcat. A kind of accelerated (due to multithreading) option #(reduce conj [] %). This function returns the objects clojure.core.reducers.Catthat implement the Counted and Sequable and CollFold.

the
(r/map inc [1 2 3])
;; => #<reducers$folder$reify__.....
;; how did *this* to use?

;; well.. can this case using `reduce` to drive
(reduce conj [] (r/map inc [1 2 3]))
;; => [2 3 4]

;; what's with the speed...
(def v (vec (range 1000000)))

(time (count (reduce conj [] (r/map inc v))))
;; "Elapsed time: 90.124397 msecs"
;; => 1000000

;; something is not very, and if in `foldcat`
(time (count (r/foldcat (r/map inc v))))
;; "Elapsed time: 25.054988 msecs"
;; => 1000000

(time (count (r/foldcat (r/map inc (r/foldcat (r/map inc v))))))
;; "Elapsed time: 32.054988 msecs"
;; => 1000000

;; result is `foldcat`, incidentally, is also foldable (Hello, multithreading)
(satisfies? r/CollFold (r/foldcat []))
;; => true

the

On stage break...


Unlike reduceron, transducers] is no longer a separate library. It is rather a concept (read idea) that will be integrated directly in the module clojure.core. Waiting for this stuff in version 1.7 (quite a bit left).

Briefly: transducers] — those same transformers, only rebranding otherwise named. Well, almost: the f-function can now accept not only 0 and 2 arguments, but also 1. And the transducer, respectively, is a function from 0-1-2-ary f-function in a 0-1-2-arnou new.

the
(def typical-transducer
(fn [rf]
(fn ([] ...) ;; return the initial element
([acc] ...) ;; clear...
([acc c] ...))) ;; actually, there all the most important as before

;; new version of `map-t`, 33% better than the old one
(defn map-t-improved [f]
(fn [rf]
(fn ([] (rf)) ;; parasivam on
([acc] (rf acc)) ;; parasivam on
([acc c] (rf acc (f c)))))) ;; model `c` with `(f c)`

0-ary f-function as before, can be invoked if desired the initial element. Option 2-ary is used to, in fact, most of the reduction. A 1-ary variant is called at the very end of all the work (for completion reduce). It is needed in cases when you need to "add" new items in the last.

Example: transducer dedupeto allow replays from the collection:

the
(defn my-dedupe []
(fn [rf]
;; careful!
(let [prev (atom ::none)]
(fn ;; f is our function
([] (rf))
([acc] (rf acc))
([acc c]
(let [p @prev]
(reset! prev c)
(if (= p c)
acc
(rf acc c))))))))

(def rf ((my-dedupe) +))

(reduce rf 0 [1 1, 2 2, 3, 1 1])
;; => 7

(reduce rf 0 [1 1, 2 2, 3, 1 1])
;; => 6
;; oops... `rf` is not pure, it is impossible to use it 2 times

A subtle point, our transducer returns a new f-function. Moreover, this f-function has mutable state and is able to do 3 different things (1 per arity). Well, just an object of some kind. But in this case, the transducer condition has, it only acts as a kind of factory.

As an example use a 1-Arnage variant of the f-function result partition-all. Simplified implementation:

the
(defn partition-all-t [n]

(let [buffer (java.util.ArrayList. n)] ;; as!
(fn
([] (rf))
([acc]
(if (.buffer isEmpty)
;; if buffer is empty - continue probressive
(rf acc)
;; otherwise...
(let [v (vec (.toArray buffer)) ;; convert the buffer to a vector
acc' (rf acc v)] ;; discharged via the 2-ary `rf`
;; now we can traverse further
(rf acc'))))
([acc c]
(.add buffer c)
(if (= n (.size buffer))
;; if buffer overflowed - "reset" it
(let [v (vec (.toArray buffer))]
(.clear buffer)
(rf acc v))
;; otherwise do nothing
acc))))))

;; use what you got (has not specified the initial element, because (conj) => [])
(reduce ((partition-all-t 3) conj) (range 10))
; >> ClassCastException java.lang.Long cannot be cast to clojure.lang.IPersistentCollection
;; does not work...

;; okay, but if you specify []... 
(reduce ((partition-all-t 3) conj) [] (range 10))
;; = > ; [[0 1 2] [3 4 5] [6 7 8]]
;; works, but is wrong...

Hmm... No 0-ary or 1-ary options ((partition-all-t 3) conj) has not been called — the usual reduce knows nothing about all these innovations. It is 0-ary variant is only if the collection is empty 1-ary causes never.

Therefore created a new function transduce. Here it is, in contrast to the "legacy" reduce, invokes (rf) always, unless the initial state is explicitly set. And this function is guaranteed to cause (rf acc), and exactly once. And transduce itself calls our transducer and hides mutable f-function from our eyes. In other words, all the dirty work (in terms of side effects) is done "under the hood".

the
;; pass the transducer itself is immutable and not the result of his work
(transduce (partition-all-t 3) conj (range 10))
;; = > ; [[0 1 2] [3 4 5] [6 7 8] [9]]
;; AI... the works!

;; composition of transducers] (back)
(transduce (comp (filter odd?) (partition-all-t 3)) conj (range 10))
;; = > ; [[1 3 5] [7 9]]

What if you try transduce instead of reduce to use?

the
(reduce (identity -) 0 [1 2 3 4])
;; => -10
;; ~~ (- (- (- (- 0 1) 2) 3) 4)

(transduce identity - 0 [1 2 3 4])
;; => 10
;; not true!

It turns out that directly replace reduce to transduce does not work — stopping new requirement 1-ary f-function. In our example, after calculations transduce causes (- acc), which changes the sign of the result is reversed. To correct the situation will help completing:

the
((completing -) 3 2)
;; => 1

((identity) 1)
;; => -1

((completing -) 1)
;; => 1

(transduce completing - 0 [1 2 3 4])
;; => -10
;; now right!

In the kernel language there are special functions for working with transformers transducers]. It is expected that also added a set standard most of these transducers]. And in order not to produce many new features (too many of them, confused in the two accounts), I decided to upgrade the existing map, filter, take, interpose, mapcat the company:

the
(map inc [1 2 3])
;; => (2 3 4)

(map inc)
;; => #<core$map$fn__4525 clojure.core$map$fn__4525@2d9e9038>
;; this transducer!

;; you can do so
(transduce (map inc) + [1 2 3])
;; => 9

(transduce (comp (map inc) (filter even?)) + [1 2 3])
;; => 6
;; ~~ (+ (inc 1) (inc 3)) => 6

In addition to the transduce there are several functions for working with transducers]

the
;;  apply  the transducer to the collection
(sequence (map inc) [1 2 3])
;; => (2 3 4)

;; this is equivalent to such code
(transduce (map inc) conj [1 2 3])
;; => [2 3 4]

;; But...
;; the function `sequence` performs transducer *lazy* !
;; `transduce` such a focus is not going to work
(take 5 (sequence (map inc) (range)))
;; = > (1 2 3 4 5)

;; a function `into` also added support for transducers]
(into [9] (map inc) [1 2 3])
;; => [9 2 3 4]

But the most fun feature is eduction. It returns a proxy object on which to call seq, reduce or to a java iterator. Expected that this object will simply call transduce or sequnce. A small thing, but handy.

the
(def odds (eduction (filter odd?) (range)))
(def evens (eduction (remove odd?) (range)))

;; you can work with both sequential
(take 5 odds)
;; = > (1 3 5 7 9)

;; in memory to construct a sequence of the first 100500 numbers
;; but the links at the beginning will not remain - sequence will be going to GC

;; => 2010001

;; and here will immediately reduce running (no time LazyCol)
;; ~= (reduce ((filter even?) ((take 100500) +)) 0 (range))
(transduce (take 100500) + evens)
;; => 10100149500

Stop, stop, stop. It's suspiciously reminiscent of clojure.core.reducers/reducer... that, however, could only collapse, but there is also a seq is allowed to run. So r/reducer to throw in the trash! But not r/folder, and he knows how to multi-threading!

the
(require '[clojure.core.reducers :as r])
(def v (vec (range 1000000)))

(time (transduce (map inc) + v))
;; "Elapsed time: 120.193971 msecs"
;; = > 500000500000

(time (r/fold + (r/folder v (map inc))))
;; "Elapsed time: 37.597224 msecs"
;; = > 500000500000

;; but be careful!
(transduce (take 100500) + v)
;; => 5050074750

(r/fold + (r/reducer v (take 100500)))
;; => 5050074750
;; true

;; reducer outdated - better use eduction
(r/fold + (eduction (take 100500) v))
;; => 5050074750

(reduce + (r/folder v (take 100500)))
;; => 5050074750
;; even so true

(r/fold + (r/folder v (take 100500)))
;; => 109071345018
;; oops...
;; not every transducer parallelepiped (turn in folder)

When using transducers] is achieved.abouthigher performance compared to the normal map/filter/etc (based on lazy sequences), and bomost flexibility/abstraction. Notice that here we are talking about clojur'tion sequences: the level of abstraction and speed transducers] is comparable to the usual iterators/enumerators/generators (in different languages they decided to call different).

But back to Clojure. Early core.async was a whole lot of functions of the form map>, map<, filter<, filter>, etc., Now they upali (well, as removed until only deprecatio). But allowed to specify a convertible transducer when you create the channel:

the
;; do not forget to connect library to the project.clj
(require '[clojure.core.async :as a])

;; with a normal transducer
(def xf (filter odd?))

;; and a channel buffer
(def ch (a/chan 10 xf))

;; put in channel number from 0 to 9 and closed it
(a/onto-chan ch (range 10))

;; get number of channel
(a/<!! (a/into [] ch))
;; => [1 3 5 7 9]

The transducer can be mounted only on buffered channels. And before the element will be in the buffer, it handles our transducer. There are all sorts of pipeline'I they also work with transducers].

the

Summing up


A variety of reducer/transducers] — all the essence of the generalization of the convolution operation. And therefore, they require the f-function with 2 arguments to its useful.

In addition to 2-arnoga better at the same time to define a 0-ary — it can be used, if not specified the initial state of the convolution. And may not be used: if the source collection is not empty, then reduce will take its first element. But transduce so mean not received any initial condition is passed to it explicitly or use 0-ary call to f-function.

On the other hand, transduce requires f-function more — always need a 1-ary variant. Which, in General, often nothing does. Seriously, usually ([x] x) — the meaningful implementation in this case. But we are lazy to rewrite old functions (0/2-ary) we are lazy, so we use a wrapper completing, which adds an empty 1-ary variant.

Further, reduceri based on the transformers. A transformer = a function of type rf - > rf. In fact, reduser is a collection which is firmly screwed to the transformer. And, every time we reduce for this collection run, the first transformer "spoils" our f-funkciju.

The transducer ~= transformer only requires the 1-ary f-function. So it is always defined by the most ill-fated 1-arnik, and proudly so everyone claimed: "well, of course I don't use the outdated transformers, only transducers]".

All transducers] is not limited to working with collections. You can fasten to the channels, streams, I / o, queues, observers etc. In General to everything on that fancy enough.
Total:
the
    the
  • create a new processing algorithm sequences, you should try to write it as a transducer in use;
  • the
  • when a bunch of data in memory, no I/O does not need lazy — use the reducers;
  • the
  • want to handle the collection and run it with reduce — would be nice to try transduce;
  • the
  • but this is not necessarily premature optimization is not good;
  • the
  • filtered channels, clever subscribers to the event... transducers] here and asks;
  • the
  • need lazy sequences or not know what are we going to use the good old map, filter and their friends.
Article based on information from habrahabr.ru

Популярные сообщения из этого блога

Approval of WSUS updates: import, export, copy

Kaspersky Security Center — the fight for automation

The Hilbert curve vs. Z-order