Niko Matsakis: Rayon: data parallelism in Rust |
Over the last week or so, I’ve been working on an update to Rayon, my experimental library for data parallelism in Rust. I’m pretty happy with the way it’s been going, so I wanted to write a blog post to explain what I’ve got so far.
Rayon’s goal is to make it easy to add parallelism to your sequential code – so basically to take existing for loops or iterators and make them run in parallel. For example, if you have an existing iterator chain like this:
1 2 3 |
|
then you could convert that to run in parallel just by changing from
the standard sequential iterator
to Rayon’s parallel iterator
:
1 2 3 |
|
Of course, part of making parallelism easy is making it safe. Rayon guarantees you that using Rayon APIs will not introduce data races.
This blog post explains how Rayon works. It starts by describing the
core Rayon primitive (join
) and explains how that is implemented. I
look in particular at how many of Rust’s features come together to let
us implement join
with very low runtime overhead and with strong
safety guarantees. I then explain briefly how the parallel iterator
abstraction is built on top of join
.
I do want to emphasize, though, that Rayon is very much work in
progress
. I expect the design of the parallel iterator code in
particular to see a lot of, well, iteration (no pun intended), since
the current setup is not as flexible as I would like. There are also
various corner cases that are not correctly handled, notably around
panic propagation and cleanup. Still, Rayon is definitely usable today
for certain tasks. I’m pretty excited about it, and I hope you will be
too!
In the beginning of this post, I showed an example of using a parallel iterator to do a map-reduce operation:
1 2 3 |
|
In fact, though, parallel iterators are just a small utility library
built atop a more fundamental primitive: join
. The usage of join
is very simple. You invoke it with two closures, like shown below, and
it will potentially execute them in parallel. Once they have both
finished, it will return:
1 2 |
|
The fact that the two closures potentially run in parallel is key:
the decision of whether or not to use parallel threads is made
dynamically, based on whether idle cores are available. The idea is
that you can basically annotate your programs with calls to join
to
indicate where parallelism might be a good idea, and let the runtime
decide when to take advantage of that.
This approach of potential parallelism
is, in fact, the key point of
difference between Rayon’s approach and
crossbeam’s scoped threads. Whereas in crossbeam,
when you put two bits of work onto scoped threads, they will always
execute concurrently with one another, calling join
in Rayon does
not necessarily imply that the code will execute in parallel. This not
only makes for a simpler API, it can make for more efficient
execution. This is because knowing when parallelism is profitable is
difficult to predict in advance, and always requires a certain amount
of global context: for example, does the computer have idle cores?
What other parallel operations are happening right now? In fact, one
of the main points of this post is to advocate for potential
parallelism as the basis for Rust data parallelism libraries, in
contrast to the guaranteed concurrency that we have seen thus far.
This is not to say that there is no role for guaranteed concurrency
like what crossbeam offers. Potential parallelism
semantics also
imply some limits on what your parallel closures can do. For example,
if I try to use a channel to communicate between the two closures in
join
, that will likely deadlock. The right way to think about join
is that it is a parallelization hint for an otherwise sequential
algorithm. Sometimes that’s not what you want – some algorithms are
inherently parallel. (Note though that it is perfectly reasonable to
use types like Mutex
, AtomicU32
, etc from within a join
call –
you just don’t want one closure to block waiting for the other.)
join
is a great primitive for divide-and-conquer
algorithms. These
algorithms tend to divide up the work into two roughly equal parts and
then recursively process those parts. For example, we can implement a
parallel version of quicksort like so:
1 2 3 4 5 6 7 8 9 10 11 |
|
In fact, the only difference between this version of quicksort and a
sequential one is that we call rayon::join
at the end!
Behind the scenes, join
is implemented using a technique called
work-stealing. As far as I know, work stealing was first
introduced as part of the Cilk project, and it has since become a
fairly standard technique (in fact, the name Rayon is an homage to
Cilk).
The basic idea is that, on each call to join(a, b)
, we have
identified two tasks a
and b
that could safely run in parallel,
but we don’t know yet whether there are idle threads. All that the
current thread does is to add b
into a local queue of pending work
and then go and immediately start executing a
. Meanwhile, there is a
pool of other active threads (typically one per CPU, or something like
that). Whenever it is idle, each thread goes off to scour the pending
work
queues of other threads: if they find an item there, then they
will steal it and execute it themselves. So, in this case, while the
first thread is busy executing a
, another thread might come along
and start executing b
.
Once the first thread finishes with a
, it then checks: did somebody
else start executing b
already? If not, we can execute it
ourselves. If so, we should wait for them to finish: but while we
wait, we can go off and steal from other processors, and thus try to
help drive the overall process towards completion.
In Rust-y pseudocode, join
thus looks something like this (the
actual code works somewhat differently; for example, it allows
for each operation to have a result):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
|
What makes work stealing so elegant is that it adapts naturally to the
CPU’s load. That is, if all the workers are busy, then join(a, b)
basically devolves into executing each closure sequentially (i.e.,
a(); b();
). This is no worse than the sequential code. But if there
are idle threads available, then we get parallelism.
Rayon is still fairly young, and I don’t have a lot of sample programs to test (nor have I spent a lot of time tuning it). Nonetheless, you can get pretty decent speedups even today, but it does take a bit more tuning than I would like. For example, with a tweaked version of quicksort, I see the following parallel speedups on my 4-core Macbook Pro (hence, 4x is basically the best you could expect):
Array Length | Speedup |
---|---|
1K | 0.95x |
32K | 2.19x |
64K | 3.09x |
128K | 3.52x |
512K | 3.84x |
1024K | 4.01x |
The change that I made from the original version is to introduce
sequential fallback. Basically, we just check if we have a small
array (in my code, less than 5K elements). If so, we fallback to a
sequential version of the code that never calls join
. This can
actually be done without any code duplication using traits, as you can
see from the demo code. (If you’re curious, I explain the idea
in an appendix below.)
Hopefully, further optimizations will mean that sequential fallback is less necessary – but it’s worth pointing out that higher-level APIs like the parallel iterator I alluded to earlier can also handle the sequential fallback for you, so that you don’t have to actively think about it.
In any case, if you don’t do sequential fallback, then the results you see are not as good, though they could be a lot worse:
Array Length | Speedup |
---|---|
1K | 0.41x |
32K | 2.05x |
64K | 2.42x |
128K | 2.75x |
512K | 3.02x |
1024K | 3.10x |
In particular, keep in mind that this version of the code is pushing
a parallel task for all subarrays down to length 1. If the array is
512K or 1024K, that’s a lot of subarrays and hence a lot of task
pushing, but we still get a speedup of 3.10x. I think the reason that
the code does as well as it does is because it gets the big things
right – that is, Rayon avoids memory allocation and virtual dispatch,
as described in the next section. Still, I would like to do better
than
0.41x for a 1K array (and I think we can).
As you can see above, to make this scheme work, you really want to drive down the overhead of pushing a task onto the local queue. After all, the expectation is that most tasks will never be stolen, because there are far fewer processors than there are tasks. Rayon’s API is designed to leverage several Rust features and drive this overhead down:
join
is defined generically with respect to the closure types of
its arguments. This means that monomorphization will generate a
distinct copy of join
specialized to each callsite. This in turn
means that when join
invokes oper_a()
and oper_b()
(as opposed
to the relatively rare case where they are stolen), those calls are
statically dispatched, which means that they can be inlined.
It also means that creating a closure requires no allocation.join
blocks until both of its closures are finished, we
are able to make full use of stack allocation. This is good both
for users of the API and for the implementation: for example, the
quicksort example above relied on being able to access an &mut [T]
slice that was provided as input, which only works because join
blocks. Similarly, the implementation of join
itself is able to
completely avoid heap allocation and instead rely solely on the
stack (e.g., the closure objects that we place into our local work
queue are allocated on the stack).As you saw above, the overhead for pushing a task is reasonably low, though not nearly as low as I would like. There are various ways to reduce it further:
hungrythreads) that might steal it.
join
,
for example, and it seems likely that there is low-hanging fruit
there.Earlier I mentioned that Rayon also guarantees data-race freedom. This means that you can add parallelism to previously sequential code without worrying about introducing weird, hard-to-reproduce bugs.
There are two kinds of mistakes we have to be concerned about. First,
the two closures might shared some mutable state, so that changes made
by one would affect the other. For example, if I modify the above
example so that it (incorrectly) calls quick_sort
on lo
in both
closures, then I would hope that this will not compile:
1 2 3 4 5 6 7 8 |
|
And indeed I will see the following error:
1 2 3 |
|
Similar errors arise if I try to have one closure process lo
(or
hi
) and the other process v
, which overlaps with both of them.
Side note: This example may seem artificial, but in fact this is an actual bug that I made (or rather, would have made) while implementing the parallel iterator abstraction I describe later. It’s very easy to make these sorts of copy-and-paste errors, and it’s very nice that Rust makes this kind of error a non-event, rather than a crashing bug.
Another kind of bug one might have is to use a non-threadsafe type
from within one of the join
closures. For example, Rust offers a
non-atomic reference-counted type called Rc
. Because Rc
uses
non-atomic instructions to update the reference counter, it is not
safe to share an Rc
between threads. If one were to do so, as I show
in the following example, the ref count could easily become incorrect,
which would lead to double frees or worse:
1 2 3 4 5 6 7 |
|
But of course if I try that example, I get a compilation error:
1 2 3 4 5 6 |
|
As you can see in the final note
, the compiler is telling us that
you cannot share Rc
values across threads.
So you might wonder what kind of deep wizardry is required for the
join
function to enforce both of these invariants? In fact, the
answer is surprisingly simple. The first error, which I got when I
shared the same &mut
slice across two closures, falls out from
Rust’s basic type system: you cannot have two closures that are both
in scope at the same time and both access the same &mut
slice. This
is because &mut
data is supposed to be uniquely accessed, and
hence if you had two closures, they would both have access to the same
unique
data. Which of course makes it not so unique.
(In fact, this was one of the great epiphanies for me in
working on Rust’s type system. Previously I thought that dangling
pointers
in sequential programs and data races
were sort of
distinct bugs: but now I see them as two heads of the same Hydra.
Basically both are caused by having rampant aliasing and mutation, and
both can be solved by the ownership and borrowing. Nifty, no?)
So what about the second error, the one I got for sending an Rc
across threads? This occurs because the join
function declares that
it’s two closures must be Send
. Send
is the Rust name for a trait
that indicates whether data can be safely transferred across
threads. So when join
declares that its two closures must be Send
,
it is saying it must be safe for the data those closures can reach to
be transferred to another thread and back again
.
At the start of this post, I gave an example of using a parallel iterator:
1 2 3 |
|
But since then, I’ve just focused on join
. As I mentioned earlier,
the parallel iterator API is really just a
pretty simple wrapper around join
. At the moment, it’s
more of a proof of concept than anything else. But what’s really nifty
about it is that it does not require any unsafe code related to
parallelism – that is, it just builds on join
, which encapsulates
all of the unsafety. (To be clear, there is a small amount of unsafe
code related to managing uninitialized memory when
collecting into a vector. But this has nothing to do with
parallelism; you’ll find similar code in Vec
. This code is also
wrong in some edge cases because I’ve not had time to do it properly.)
I don’t want to go too far into the details of the existing parallel
iterator code because I expect it to change. But the high-level idea
is that we have this trait ParallelIterator
which
has the following core members:
1 2 3 4 5 6 7 8 9 |
|
The idea is that the method state
divides up the iterator into some
shared state and some per-thread
state. The shared state will
(potentially) be accessible by all worker threads, so it must be
Sync
(sharable across threads). The per-thread-safe will be split
for each call to join
, so it only has to be Send
(transferrable to
a single other thread).
The ParallelIteratorState
trait represents some
chunk of the remaining work (e.g., a subslice to be processed). It has
three methods:
1 2 3 4 5 6 7 8 9 10 11 |
|
The len
method gives an idea of how much work remains. The
split_at
method divides this state into two other pieces. The
for_each
method produces all the values in this chunk of the
iterator. So, for example, the parallel iterator for a slice &[T]
would:
len
by just returning the length of the slice,split_at
by splitting the slice into two subslices,for_each
by iterating over the array and
invoking op
on each element.Given these two traits, we can implement a parallel operation like collection by following the same basic template. We check how much work there is: if it’s too much, we split into two pieces. Otherwise, we process sequentially (note that this automatically incorporates the sequential fallback we saw before):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
Click these links, for example, to see the code to collect into a vector or to reduce a stream of values into one.
I’m pretty excited about this latest iteration of Rayon. It’s dead simple to use, very expressive, and I think it has a lot of potential to be very efficient.
It’s also very gratifying to see how elegant data parallelism in Rust has become. This is the result of a long evolution and a lot of iteration. In Rust’s early days, for example, it took a strict, Erlang-like approach, where you just had parallel tasks communicating over channels, with no shared memory. This is good for the high-levels of your application, but not so good for writing a parallel quicksort. Gradually though, as we refined the type system, we got closer and closer to a smooth version of parallel quicksort.
If you look at some of my earlier designs,
it should be clear that the current iteration of Rayon
is by far the
smoothest yet. What I particularly like is that it is simple for
users, but also simple for implementors – that is, it doesn’t
require any crazy Rust type system tricks or funky traits to achieve
safety here. I think this is largely due to two key developments:
IMHTWAMA, which was the decision to make
&mut
references
be non-aliasable and to remove const
(read-only, but not
immutable) references. This basically meant that Rust authors were
now writing data-race-free code by default.Send
trait to permit borrowed references. Prior to this RFC, which was
authored by Joshua Yanovski, we had the constraint that for
data to be Send
, it had to be 'static
– meaning it could not
have any references into the stack. This was a holdover from the
Erlang-like days, when all threads were independent, asynchronous
workers, but none of us saw it. This led to some awful contortions
in my early designs to try to find alternate traits to express the
idea of data that was threadsafe but also contained stack
references. Thankfully Joshua had the insight that simply removing
the 'static
bound would make this all much smoother!Earlier, I mentioned that for peak performance in the quicksort demo, you want to fallback to sequential code if the array size is too small. It would be a drag to have to have two copies of the quicksort routine. Fortunately, we can use Rust traits to generate those two copies automatically from a single source. This appendix explains the trick that I used in the demo code.
First, you define a trait Joiner
that abstracts over the join
function:
1 2 3 4 5 6 7 8 |
|
This Joiner
trait has two implementations, corresponding to
sequential and parallel mode:
1 2 3 4 5 |
|
Now we can rewrite quick_sort
to be generic over a type J: Joiner
,
indicating whether this is the parallel or sequential implementation.
The parallel version will, for small arrays, convert over to
sequential mode:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
http://smallcultfollowing.com/babysteps/blog/2015/12/18/rayon-data-parallelism-in-rust/
Комментировать | « Пред. запись — К дневнику — След. запись » | Страницы: [1] [Новые] |