Aleksandar Prokopec, Heather Miller
Amidst the shift in recent years by processor manufacturers from single to multi-core architectures, academia and industry alike have conceded that Popular Parallel Programming remains a formidable challenge.
Parallel collections were included in the Scala standard library in an effort to facilitate parallel programming by sparing users from low-level parallelization details, meanwhile providing them with a familiar and simple high-level abstraction. The hope was, and still is, that implicit parallelism behind a collections abstraction will bring reliable parallel execution one step closer to the workflow of mainstream developers.
The idea is simple– collections are a well-understood and frequently-used programming abstraction. And given their regularity, they’re able to be efficiently parallelized, transparently. By allowing a user to “swap out” sequential collections for ones that are operated on in parallel, Scala’s parallel collections take a large step forward in enabling parallelism to be easily brought into more code.
Take the following, sequential example, where we perform a monadic operation on some large collection:
val list = (1 to 10000).toList list.map(_ + 42)
To perform the same operation in parallel, one must simply invoke the
par method on the sequential collection,
list. After that, one can use a parallel collection in the same way one would normally use a sequential collection. The above example can be parallelized by simply doing the following:
list.par.map(_ + 42)
The design of Scala’s parallel collections library is inspired by and deeply integrated with Scala’s (sequential) collections library (introduced in 2.8). It provides a parallel counterpart to a number of important data structures from Scala’s (sequential) collection library, including:
collection.concurrent.TrieMaps are new in 2.10)
In addition to a common architecture, Scala’s parallel collections library additionally shares extensibility with the sequential collections library. That is, like normal sequential collections, users can integrate their own collection types and automatically inherit all of the predefined (parallel) operations available on the other parallel collections in the standard library.
To attempt to illustrate the generality and utility of parallel collections, we provide a handful of simple example usages, all of which are transparently executed in parallel.
Note: Some of the following examples operate on small collections, which isn’t recommended. They’re provided as examples for illustrative purposes only. As a general heuristic, speed-ups tend to be noticeable when the size of the collection is large, typically several thousand elements. (For more information on the relationship between the size of a parallel collection and performance, please see the appropriate subsection of the performance section of this guide.)
Using a parallel
map to transform a collection of
String to all-uppercase:
scala> val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin) scala> lastNames.map(_.toUpperCase) res0: scala.collection.parallel.immutable.ParSeq[String] = ParVector(SMITH, JONES, FRANKENSTEIN, BACH, JACKSON, RODIN)
fold on a
scala> val parArray = (1 to 1000000).toArray.par parArray: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3, ... scala> parArray.fold(0)(_ + _) res0: Int = 1784293664
Using a parallel
filter to select the last names that come alphabetically after the letter “K”.
scala> val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin) scala> lastNames.filter(_.head >= 'J') res0: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Jackson, Rodin)
Parallel collections are meant to be used in exactly the same way as sequential collections– the only noteworthy difference is how to obtain a parallel collection.
Generally, one has two choices for creating a parallel collection:
First, by using the
new keyword and a proper import statement:
import scala.collection.parallel.immutable.ParVector val pv = new ParVector[Int]
Second, by converting from a sequential collection:
val pv = Vector(1,2,3,4,5,6,7,8,9).par
What’s important to expand upon here are these conversion methods– sequential collections can be converted to parallel collections by invoking the sequential collection’s
par method, and likewise, parallel collections can be converted to sequential collections by invoking the parallel collection’s
Of Note: Collections that are inherently sequential (in the sense that the elements must be accessed one after the other), like lists, queues, and streams, are converted to their parallel counterparts by copying the elements into a similar parallel collection. An example is
List– it’s converted into a standard immutable parallel sequence, which is a
ParVector. Of course, the copying required for these collection types introduces an overhead not incurred by any other collection types, like
While the parallel collections abstraction feels very much the same as normal sequential collections, it’s important to note that its semantics differs, especially with regards to side-effects and non-associative operations.
In order to see how this is the case, first, we visualize how operations are performed in parallel. Conceptually, Scala’s parallel collections framework parallelizes an operation on a parallel collection by recursively “splitting” a given collection, applying an operation on each partition of the collection in parallel, and re-“combining” all of the results that were completed in parallel.
These concurrent, and “out-of-order” semantics of parallel collections lead to the following two implications:
Given the concurrent execution semantics of the parallel collections framework, operations performed on a collection which cause side-effects should generally be avoided, in order to maintain determinism. A simple example is by using an accessor method, like
foreach to increment a
var declared outside of the closure which is passed to
scala> var sum = 0 sum: Int = 0 scala> val list = (1 to 1000).toList.par list: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3,… scala> list.foreach(sum += _); sum res01: Int = 467766 scala> var sum = 0 sum: Int = 0 scala> list.foreach(sum += _); sum res02: Int = 457073 scala> var sum = 0 sum: Int = 0 scala> list.foreach(sum += _); sum res03: Int = 468520
Here, we can see that each time
sum is reinitialized to 0, and
foreach is called again on
sum holds a different value. The source of this non-determinism is a data race– concurrent reads/writes to the same mutable variable.
In the above example, it’s possible for two threads to read the same value in
sum, to spend some time doing some operation on that value of
sum, and then to attempt to write a new value to
sum, potentially resulting in an overwrite (and thus, loss) of a valuable result, as illustrated below:
ThreadA: read value in sum, sum = 0 value in sum: 0 ThreadB: read value in sum, sum = 0 value in sum: 0 ThreadA: increment sum by 760, write sum = 760 value in sum: 760 ThreadB: increment sum by 12, write sum = 12 value in sum: 12
The above example illustrates a scenario where two threads read the same value,
0, before one or the other can sum
0 with an element from their partition of the parallel collection. In this case,
0 and sums it with its element,
0+760, and in the case of
0 with its element,
0+12. After computing their respective sums, they each write their computed value in
ThreadB, it writes first, only for the value in
sum to be overwritten shortly after by
ThreadB, in effect completely overwriting (and thus losing) the value
Given this “out-of-order” semantics, also must be careful to perform only associative operations in order to avoid non-determinism. That is, given a parallel collection,
pcoll, one should be sure that when invoking a higher-order function on
pcoll, such as
pcoll.reduce(func), the order in which
func is applied to the elements of
pcoll can be arbitrary. A simple, but obvious example is a non-associative operation such as subtraction:
scala> val list = (1 to 1000).toList.par list: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3,… scala> list.reduce(_-_) res01: Int = -228888 scala> list.reduce(_-_) res02: Int = -61000 scala> list.reduce(_-_) res03: Int = -331818
In the above example, we take a
reduce, and pass to it
_-_, which simply takes two unnamed elements, and subtracts the first from the second. Due to the fact that the parallel collections framework spawns threads which, in effect, independently perform
reduce(_-_) on different sections of the collection, the result of two runs of
reduce(_-_) on the same collection will not be the same.
Note: Often, it is thought that, like non-associative operations, non-commutative operations passed to a higher-order function on a parallel collection likewise result in non-deterministic behavior. This is not the case, a simple example is string concatenation– an associative, but non- commutative operation:
scala> val strings = List("abc","def","ghi","jk","lmnop","qrs","tuv","wx","yz").par strings: scala.collection.parallel.immutable.ParSeq[java.lang.String] = ParVector(abc, def, ghi, jk, lmnop, qrs, tuv, wx, yz) scala> val alphabet = strings.reduce(_++_) alphabet: java.lang.String = abcdefghijklmnopqrstuvwxyz
The “out of order” semantics of parallel collections only means that the operation will be executed out of order (in a temporal sense. That is, non-sequentially), it does not mean that the result will be re-”combined” out of order (in a spatial sense). On the contrary, results will generally always be reassembled in order– that is, a parallel collection broken into partitions A, B, C, in that order, will be reassembled once again in the order A, B, C. Not some other arbitrary order like B, C, A.
For more on how parallel collections split and combine operations on different parallel collection types, see the Architecture section of this guide.blog comments powered by Disqus