Parallel collections without combiners
Just as it is possible to define custom sequential collections without
defining their builders, it is possible to define parallel collections without
defining their combiners. The consequence of not having a combiner is that
transformer methods (e.g. map
, flatMap
, collect
, filter
, …) will by
default return a standard collection type which is nearest in the hierarchy.
For example, ranges do not have builders, so mapping elements of a range
creates a vector.
In the following example we define a parallel string collection. Since strings
are logically immutable sequences, we have parallel strings inherit
immutable.ParSeq[Char]
:
class ParString(val str: String)
extends immutable.ParSeq[Char] {
Next, we define methods found in every immutable sequence:
def apply(i: Int) = str.charAt(i)
def length = str.length
We have to also define the sequential counterpart of this parallel collection.
In this case, we return the WrappedString
class:
def seq = new collection.immutable.WrappedString(str)
Finally, we have to define a splitter for our parallel string collection. We
name the splitter ParStringSplitter
and have it inherit a sequence splitter,
that is, SeqSplitter[Char]
:
def splitter = new ParStringSplitter(str, 0, str.length)
class ParStringSplitter(private var s: String, private var i: Int, private val ntl: Int)
extends SeqSplitter[Char] {
final def hasNext = i < ntl
final def next = {
val r = s.charAt(i)
i += 1
r
}
Above, ntl
represents the total length of the string, i
is the current
position and s
is the string itself.
Parallel collection iterators or splitters require a few more methods in
addition to next
and hasNext
found in sequential collection iterators.
First of all, they have a method called remaining
which returns the number
of elements this splitter has yet to traverse. Next, they have a method called
dup
which duplicates the current splitter.
def remaining = ntl - i
def dup = new ParStringSplitter(s, i, ntl)
Finally, methods split
and psplit
are used to create splitters which
traverse subsets of the elements of the current splitter. Method split
has
the contract that it returns a sequence of splitters which traverse disjoint,
non-overlapping subsets of elements that the current splitter traverses, none
of which is empty. If the current splitter has 1 or fewer elements, then
split
just returns a sequence of this splitter. Method psplit
has to
return a sequence of splitters which traverse exactly as many elements as
specified by the sizes
parameter. If the sizes
parameter specifies fewer
elements than the current splitter, then an additional splitter with the rest
of the elements is appended at the end. If the sizes
parameter requires more
elements than there are remaining in the current splitter, it will append an
empty splitter for each size. Finally, calling either split
or psplit
invalidates the current splitter.
def split = {
val rem = remaining
if (rem >= 2) psplit(rem / 2, rem - rem / 2)
else Seq(this)
}
def psplit(sizes: Int*): Seq[ParStringSplitter] = {
val splitted = new ArrayBuffer[ParStringSplitter]
for (sz <- sizes) {
val next = (i + sz) min ntl
splitted += new ParStringSplitter(s, i, next)
i = next
}
if (remaining > 0) splitted += new ParStringSplitter(s, i, ntl)
splitted
}
}
}
Above, split
is implemented in terms of psplit
, which is often the case
with parallel sequences. Implementing a splitter for parallel maps, sets or
iterables is often easier, since it does not require psplit
.
Thus, we obtain a parallel string class. The only downside is that calling transformer methods
such as filter
will not produce a parallel string, but a parallel vector instead, which
may be suboptimal - producing a string again from the vector after filtering may be costly.
Parallel collections with combiners
Let’s say we want to filter
the characters of the parallel string, to get rid
of commas for example. As noted above, calling filter
produces a parallel
vector, and we want to obtain a parallel string (since some interface in the
API might require a sequential string).
To avoid this, we have to write a combiner for the parallel string collection.
We will also inherit the ParSeqLike
trait this time to ensure that return
type of filter
is more specific - a ParString
instead of a ParSeq[Char]
.
The ParSeqLike
has a third type parameter which specifies the type of the
sequential counterpart of the parallel collection (unlike sequential *Like
traits which have only two type parameters).
class ParString(val str: String)
extends immutable.ParSeq[Char]
with ParSeqLike[Char, ParString, collection.immutable.WrappedString]
All the methods remain the same as before, but we add an additional protected method newCombiner
which
is internally used by filter
.
protected[this] override def newCombiner: Combiner[Char, ParString] = new ParStringCombiner
Next we define the ParStringCombiner
class. Combiners are subtypes of
builders, and they introduce an additional method called combine
, which takes
another combiner as an argument and returns a new combiner which contains the
elements of both the current and the argument combiner. The current and the
argument combiner are invalidated after calling combine
. If the argument is
the same object as the current combiner, then combine
just returns the
current combiner. This method is expected to be efficient, having logarithmic
running time with respect to the number of elements in the worst case, since
it is called multiple times during a parallel computation.
Our ParStringCombiner
will internally maintain a sequence of string
builders. It will implement +=
by adding an element to the last string
builder in the sequence, and combine
by concatenating the lists of string
builders of the current and the argument combiner. The result
method, which
is called at the end of the parallel computation, will produce a parallel
string by appending all the string builders together. This way, elements are
copied only once at the end instead of being copied every time combine
is
called. Ideally, we would like to parallelize this process and copy them in
parallel (this is being done for parallel arrays), but without tapping into
the internal representation of strings this is the best we can do– we have to
live with this sequential bottleneck.
private class ParStringCombiner extends Combiner[Char, ParString] {
var sz = 0
val chunks = new ArrayBuffer[StringBuilder] += new StringBuilder
var lastc = chunks.last
def size: Int = sz
def +=(elem: Char): this.type = {
lastc += elem
sz += 1
this
}
def clear = {
chunks.clear
chunks += new StringBuilder
lastc = chunks.last
sz = 0
}
def result: ParString = {
val rsb = new StringBuilder
for (sb <- chunks) rsb.append(sb)
new ParString(rsb.toString)
}
def combine[U <: Char, NewTo >: ParString](other: Combiner[U, NewTo]) = if (other eq this) this else {
val that = other.asInstanceOf[ParStringCombiner]
sz += that.sz
chunks ++= that.chunks
lastc = chunks.last
this
}
}
How do I implement my combiner in general?
There are no predefined recipes– it depends on the data-structure at hand, and usually requires a bit of ingenuity on the implementer’s part. However, there are a few approaches usually taken:
-
Concatenation and merge. Some data-structures have efficient implementations (usually logarithmic) of these operations. If the collection at hand is backed by such a data-structure, its combiner can be the collection itself. Finger trees, ropes and various heaps are particularly suitable for such an approach.
-
Two-phase evaluation. An approach taken in parallel arrays and parallel hash tables, it assumes the elements can be efficiently partially sorted into concatenable buckets from which the final data-structure can be constructed in parallel. In the first phase different processors populate these buckets independently and concatenate the buckets together. In the second phase, the data structure is allocated and different processors populate different parts of the data structure in parallel using elements from disjoint buckets. Care must be taken that different processors never modify the same part of the data structure, otherwise subtle concurrency errors may occur. This approach is easily applicable to random access sequences, as we have shown in the previous section.
-
A concurrent data-structure. While the last two approaches actually do not require any synchronization primitives in the data-structure itself, they assume that it can be constructed concurrently in a way such that two different processors never modify the same memory location. There exists a large number of concurrent data-structures that can be modified safely by multiple processors– concurrent skip lists, concurrent hash tables, split-ordered lists, concurrent avl trees, to name a few. An important consideration in this case is that the concurrent data-structure has a horizontally scalable insertion method. For concurrent parallel collections the combiner can be the collection itself, and a single combiner instance is shared between all the processors performing a parallel operation.
Integration with the collections framework
Our ParString
class is not complete yet. Although we have implemented a
custom combiner which will be used by methods such as filter
, partition
,
takeWhile
or span
, most transformer methods require an implicit
CanBuildFrom
evidence (see Scala collections guide for a full explanation).
To make it available and completely integrate ParString
with the collections
framework, we have to mix an additional trait called GenericParTemplate
and
define the companion object of ParString
.
class ParString(val str: String)
extends immutable.ParSeq[Char]
with GenericParTemplate[Char, ParString]
with ParSeqLike[Char, ParString, collection.immutable.WrappedString] {
def companion = ParString
Inside the companion object we provide an implicit evidence for the CanBuildFrom
parameter.
object ParString {
implicit def canBuildFrom: CanCombineFrom[ParString, Char, ParString] =
new CanCombinerFrom[ParString, Char, ParString] {
def apply(from: ParString) = newCombiner
def apply() = newCombiner
}
def newBuilder: Combiner[Char, ParString] = newCombiner
def newCombiner: Combiner[Char, ParString] = new ParStringCombiner
def apply(elems: Char*): ParString = {
val cb = newCombiner
cb ++= elems
cb.result
}
}
Further customizations– concurrent and other collections
Implementing a concurrent collection (unlike parallel collections, concurrent
collections are ones that can be concurrently modified, like
collection.concurrent.TrieMap
) is not always straightforward. Combiners in
particular often require a lot of thought. In most parallel collections
described so far, combiners use a two-step evaluation. In the first step the
elements are added to the combiners by different processors and the combiners
are merged together. In the second step, after all the elements are available,
the resulting collection is constructed.
Another approach to combiners is to construct the resulting collection as the elements. This requires the collection to be thread-safe– a combiner must allow concurrent element insertion. In this case one combiner is shared by all the processors.
To parallelize a concurrent collection, its combiners must override the method
canBeShared
to return true
. This will ensure that only one combiner is
created when a parallel operation is invoked. Next, the +=
method must be
thread-safe. Finally, method combine
still returns the current combiner if
the current combiner and the argument combiner are the same, and is free to
throw an exception otherwise.
Splitters are divided into smaller splitters to achieve better load balancing.
By default, information returned by the remaining
method is used to decide
when to stop dividing the splitter. For some collections, calling the
remaining
method may be costly and some other means should be used to decide
when to divide the splitter. In this case, one should override the
shouldSplitFurther
method in the splitter.
The default implementation divides the splitter if the number of remaining elements is greater than the collection size divided by eight times the parallelism level.
def shouldSplitFurther[S](coll: ParIterable[S], parallelismLevel: Int) =
remaining > thresholdFromSize(coll.size, parallelismLevel)
Equivalently, a splitter can hold a counter on how many times it was split and
implement shouldSplitFurther
by returning true
if the split count is
greater than 3 + log(parallelismLevel)
. This avoids having to call
remaining
.
Furthermore, if calling remaining
is not a cheap operation for a particular
collection (i.e. it requires evaluating the number of elements in the
collection), then the method isRemainingCheap
in splitters should be
overridden to return false
.
Finally, if the remaining
method in splitters is extremely cumbersome to
implement, you can override the method isStrictSplitterCollection
in its
collection to return false
. Such collections will fail to execute some
methods which rely on splitters being strict, i.e. returning a correct value
in the remaining
method. Importantly, this does not effect methods used in
for-comprehensions.