# Reservoir Sampling in MapReduce

 [Image source]
We consider the problem of picking a random sample of a given size k from a large dataset of some unknown size n. The hidden assumption here is that n is large enough that the whole dataset does not fit into main memory, whereas the desired sample does.

Let's first review how this problem is tackled in a sequential setting - then we'll proceed with a distributed map-reduce solution.

#### Reservoir sampling

One of the most common sequential approaches to this problem is the so-called reservoir sampling. The algorithm works as follows: the data is coming through a stream and the solution keeps a vector of $k$ elements (the reservoir) initialized with the first $k$ elements in the stream and incrementally updated as follows: when the $i$-th element arrives (with $i \gt k$), pick a random integer $r$ in the interval $[1,..,i]$, and if $r$ happens to be in the interval $[1,..,k]$, replace the $r$-th element in the solution with the current element.

A simple implementation in Python is the following. The input items are the lines coming from the standard input:
# reservoir_sampling.py

import sys, random

k = int(sys.argv[1])
S, c = [], 0

for x in sys.stdin:
if c < k: S.append(x)
else:
r = random.randint(0,c-1)
if r < k: S[r] = x
c += 1

print ''.join(S),

You can test it from the console as follows to pick 3 distinct random numbers between 1 and 100:
for i in {1..100}; do echo $i; done | python ./reservoir_sampling.py 3 ###### Why does it work? The math behind it(*) (Feel free to skip this section if math and probability are not your friends) Let's convince ourselves that every element belongs to the final solution with the same probability. Let$x_i$be the$i$-th element and$S_i$be the solution obtained after examining the first$i$elements. We will show that$\Pr[x_j \in S_i] = k/i$for all$j\le i$with$k\le i\le n$. This will imply that the probability that any element is in the final solution$S_n$is exactly$k/n$. The proof is by induction on$i$: the base case$i=k$is clearly true since the first$k$elements are in the solution with probability exactly 1. Now let's say we're looking at the$i$-th element for some$i>k$. We know that this element will enter the solution$S_i$with probability exactly$k/i$. On the other hand, for any of the elements$j\lt i$, we know that it will be in$S_i$only if it was in$S_{i-1}$and is not kicked out by the$i$-th element. By induction hypothesis,$\Pr[x_j \in S_{i-1}]= k/(i-1)$, whereas the probability that$x_j$is not kicked out by the current element is$(1-1/i) = (i-1)/i$. We can conclude that$\Pr[x_j \in S_{i}] = \frac{k}{i-1}\cdot\frac{i-1}{i} = \frac{k}{i}$. #### MapReduce solution How do we move from a sequential solution to a distributed solution? To make the problem more concrete, let's say we have a number of files where each line is one of the input elements (the number of lines over all files sum up to n) and we'd like to select exactly k of those lines. ###### The Naive solution The simplest solution is to reduce the distributed problem to a sequential problem by using a single reducer and have every mapper map every line to that reducer. Then the reducer can apply the reservoir sampling algorithm to the data. The problem with this approach though is that the amount of data sent by the mappers to the reducer is the whole dataset. ###### A better approach The core insight behind reservoir sampling is that picking a random sample of size$k$is equivalent to generating a random permutation (ordering) of the elements and picking the top$k$elements. Indeed, a random sample can be generated as follows: associate a random float id with each element and pick the elements with the$k$largest ids. Since the ids induce a random ordering of the elements (assuming the ids are distinct), it is clear that the elements associated with the$k$largest ids form a random subset. We will start implementing this new algorithm in a streaming sequential setting. The goal here is to incrementally keep track of the$k$elements with largest ids seen so far. A useful data structure that can be used to this goal is the binary min-heap. We can use it as follows: we initialize the heap with the first$k$elements, each associated with a random id. Then, when a new element comes, we associate a random id with it: if its id is larger than the smallest id in the heap (the heap's root), we replace the heap's root with this new element. A simple implementation in Python is the following: # rand_subset_seq.py import sys, random from heapq import heappush, heapreplace k = int(sys.argv[1]) H = [] for x in sys.stdin: r = random.random() # this is the id if len(H) < k: heappush(H, (r, x)) elif r > H[0][0]: heapreplace(H, (r, x)) # H[0] is the root of the heap, H[0][0] its id print ''.join([x for (r,x) in H]),  Again, the following test pick 3 distinct random numbers between 1 and 100: for i in {1..100}; do echo$i; done | python ./rand_subset_seq.py 3
By looking at the problem under this new light, we can now provide an improved map-reduce implementation. The idea is to compute the ordering distributedly, with each mapper associating a random id with each element and keeping track of the top $k$ elements. The top $k$ elements of each mapper are then sent to a single reducer which will complete the job by extracting the top $k$ elements among all. Notice how in this case the amount of data sent out by the map phase is reduced to the top $k$ elements of each mapper as opposed to the whole dataset.

An important trick that we can use is the fact that Hadoop framework will automatically present the values to the reducer in order of keys from lowest to highest. Therefore, by using the negation of the id as key, the first $k$ element read by the reducer will be the top $k$ elements we are looking for.

We now provide the mapper and reducer code in Python language, to be used with Hadoop streaming.

The following is the code for the mapper:
#!/usr/bin/python
# rand_subset_m.py

import sys, random
from heapq import heappush, heapreplace

k = int(sys.argv[1])
H = []

for x in sys.stdin:
r = random.random()
if len(H) < k: heappush(H, (r, x))
elif r > H[0][0]: heapreplace(H, (r, x))

for (r, x) in H:
# by negating the id, the reducer receives the elements from highest to lowest
print '%f\t%s' % (-r, x),

The Reducer simply returns the first $k$ elements received.
#!/usr/bin/python
# rand_subset_r.py

import sys

k = int(sys.argv[1])
c = 0

for line in sys.stdin:
(r, x) = line.split('\t', 1)
print x,
c += 1
if c == k: break

We can test the code by simulating the map-reduce framework. First, add the execution flag to the mapper and reducer files (e.g., chmod +x ./rand_subset_m.py and chmod +x ./rand_subset_r.py). Then we pipe the data to the mapper, sort the mapper output, and pipe it to the reducer.
k=3; for i in {1..100}; do echo $i; done | ./rand_subset_m.py$k | sort -k1,1n | ./rand_subset_r.py $k ###### Running the Hadoop job We can finally run our Python MapReduce job with Hadoop. If you don't have Hadoop installed, you can easily set it up on your machine following these steps. We leverage Hadoop Streaming to pass the data between our Map and Reduce phases via standard input and output. Run the following command, replacing [myinput] and [myoutput] with your desired locations. Here, we assume that the environment variable HADOOP_INSTALL refers to the Hadoop installation directory. k=10 # set k to what you need hadoop jar${HADOOP_INSTALL}/contrib/streaming/hadoop-*streaming*.jar \
-D mapred.text.key.comparator.options=-n \
-file ./rand_subset_m.py -mapper "./rand_subset_m.py $k" \ -file ./rand_subset_r.py -reducer "./rand_subset_r.py$k" \
-input [myinput] -output [myoutput]
The first flag sets a single reducer, whereas the second and third are used to make Hadoop sort the keys numerically (as opposed to using string comparison).
###### Further notes
The algorithm-savvy reader has probably noticed that while reservoir sampling takes linear time to complete (as every step takes constant time), the same cannot be said of the approach that uses the heap. Each heap operation takes $O(\log k)$ time, so a trivial bound for the overall running time would be $O(n \log k)$. However, this bound can be improved as the heap replace operation is only executed when the $i$-th element is larger than the root of the heap. This happens only if the $i$-th element is one of the $k$ largest elements among the first $i$ elements, which happens with probability $k/i$. Therefore the expected number of heap replacements is $\sum_{i=k+1}^n k/i \approx k \log(n/k)$. The overall time complexity is then $O(n + k\log(n/k)\log k)$, which is substantially linear in $n$ unless $k$ is comparable to $n$.

#### What if the sample doesn't fit into memory?

So far we worked under the assumption that the desired sample would fit into memory. While this is usually the case, there are scenarios in which the assumption may not hold. Afterall, in the big data world, 1% of a huge dataset may still be too much to keep in memory!

A simple solution to generate large samples is to modify the mapper to simply output every item along with a random id as key. The MapReduce framework will sort the items by id (substantially, generating a random permutation of the elements). The (single) reducer can be left as is to just pick the first $k$ elements. The drawback with this approach is again that the whole dataset needs to be sent to a single reducer. Moreover, even if the reducer does not store the $k$ items in memory, it has to go through them, which can be time-consuming if $k$ is very large (say $k=n/2$).

We now discuss a different approach that uses multiple reducers. The key idea is the following: suppose we have $\ell$ buckets and generate a random ordering of the elements first by putting each element in a random bucket and then by generating a random ordering in each bucket. The elements in the first bucket are considered smaller (with respect to the ordering) than the elements in the second bucket and so on. Then, if we want to pick a sample of size $k$, we can collect all of the elements in the first $j$ buckets if they overall contain a number of elements $t$ less than $k$, and then pick the remaining $k-t$ elements from the next bucket. Here $\ell$ is a parameter such that $n/\ell$ elements fit into memory. Note the key aspect that buckets can be processed distributedly.

The implementation is as follows: mappers associate with each element an id $(j,r)$ where $j$ is a random index in $\{1,2,\ldots,\ell\}$ to be used as key, and $r$ is a random float for secondary sorting. In addition, mappers keep track of the number of elements with key less than $j$ (for $1\le j\le \ell$) and transmit this information to the reducers. The reducer associated with some key (bucket) $j$ acts as follows: if the number of elements with key less or equal than $j$ is less or equal than $k$ then output all elements in bucket $j$; otherwise, if the number of elements with key strictly less than $j$ is $t\lt k$, then run a reservoir sampling to pick $k-t$ random elements from the bucket; in the remaining case, that is when the number of elements with key strictly less than $j$ is at least $k$, don't output anything.

After outputting the elements, the mapper sends the relevant counts to each reducer, using -1 as secondary key so that this info is presented to the reducer first.
#!/usr/bin/python
# rand_large_subset_m.py

import sys, random

l = int(sys.argv[1])
S = [0 for j in range(l)]

for x in sys.stdin:
(j,r) = (random.randint(0,l-1), random.random())
S[j] += 1
print '%d\t%f\t%s' % (j, r, x),

for j in range(l): # compute partial sums
prev = 0 if j == 0 else S[j-1]
S[j] += prev # number of elements with key less than j
print '%d\t-1\t%d\t%d' % (j, prev, S[j]) # secondary key is -1 so reducer gets this first

The reducer first reads the counts for each bucket and decides what to do accordingly.
#!/usr/bin/python
# rand_large_subset_r.py

import sys, random

k = int(sys.argv[1])

while line:
# Aggregate Mappers information
less_count, upto_count = 0, 0
(j, r, x) = line.split('\t', 2)
while float(r) == -1:
l, u = x.split('\t', 1)
less_count, upto_count = less_count + int(l), upto_count + int(u)
(j, r, x) = sys.stdin.readline().split('\t', 2)
n = upto_count - less_count # elements in bucket j

# Proceed with one of the three cases
if upto_count <= k: # in this case output the whole bucket
print x,
for i in range(n-1):
(j, r, x) = sys.stdin.readline().split('\t', 2)
print x,

elif less_count >= k: # in this case do not output anything
for i in range(n-1):

else: # run reservoir sampling picking (k-less_count) elements
k = k - less_count
S = [x]
for i in range(1,n):
(j, r, x) = sys.stdin.readline().split('\t', 2)
if i < k:
S.append(x)
else:
r = random.randint(0,i-1)
if r < k: S[r] = x
print ''.join(S),


The following bash statement tests the code with $\ell=10$ and $k=50$ (note the sort flag to simulate secondary sorting):
l=10; k=50; for i in {1..100}; do echo $i; done | ./rand_large_subset_m.py$l | sort -k1,2n | ./rand_large_subset_r.py $k ###### Running the Hadoop job Again, we're assuming you have Hadoop ready to crunch data (if not, follow these steps). To run our Python MapReduce job with Hadoop, run the following command, replacing [myinput] and [myoutput] with your desired locations. k=100000 # set k to what you need l=50 # set the number of "buckets" r=16 # set the number of "reducers" (depends on your cluster) hadoop jar${HADOOP_INSTALL}/contrib/streaming/hadoop-*streaming*.jar \
-D mapred.reduce.tasks=$r \ -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ -D stream.num.map.output.key.fields=2 \ -D mapred.text.key.partitioner.options=-k1,1 \ -D mapred.text.key.comparator.options="-k1n -k2n" \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -file ./rand_large_subset_m.py -mapper "./rand_large_subset_m.py$l" \
-file ./rand_large_subset_r.py -reducer "./rand_large_subset_r.py $k" \ -input [myinput] -output [myoutput] Note how we enabled secondary key sorting as explained in the Hadoop streaming quickguide. Each map output record is composed of the bucket$j$, the random id$r$, and the rest. We use stream.num.map.output.key.fields sets the key to be the pair$(j, r)$. We use mapred.text.key.partitioner.options along with the -partitioner argument to partition only over$j$. Finally, we use mapred.text.key.comparator.options along with mapred.output.key.comparator.class to sort by$j$in numerical order and then by$r$again in numerical order. ###### Further notes While this approach is general and can be used even in the case when$k\$ is small, it still has the overhead of transmitting the whole dataset from the map phase to the reduce phase (although not to a single machine/reducer). When the sample fits in memory the other approach we discussed is faster and should be preferred.

1. I think your implementation of reservoir sampling is not correct. In line 11 the randint should be inclusive c. If you take k=1 and only 2 samples, then the end result would always be the 2nd sample. You can also test it by running the sampler many times and check that all input samples have equal chance. You will see that the first k items in the sequence have less chance to be picked. If the randint is inclusive c everything is fine.

1. The index is 0-based.

2. Even with zero-based indexing, it has to be inclusive c. Try it out for yourself by letting k=2 and using an input of 2 lines. The output will always be the second line. (Yes, I've tested this on your code.) So:

This
r = random.randint(0,c-1)
Should be
r = random.randint(0,c)

Other than that, great work!

2. Hello there,

I'm glad you appreciated my image as a nice way to represent random sampling. Could you please site it? I use the CC BY-NC-SA 3.0 license on all my works, so you're free to use it as long as you cite the original source. It was originally hosted here: http://faculty.elgin.edu/dkernler/statistics/ch01/1-3.html.

Thanks,
Dan Kernler

1. Thanks for the image Dan, I added a link to the original source.

3. Fortunately, Apache Hadoop is a tailor-made solution that delivers on both counts, by turning big data insights into actionable business enhancements for long-term success. To know more, visit Big data Training Bangalore

4. This comment has been removed by the author.

5. This comment has been removed by the author.

6. Wow amazing i saw the article with execution models you had posted. It was such informative. Really its a wonderful article. Thank you for sharing and please keep update like this type of article because i want to learn more relevant to this topic.

7. Wow amazing i saw the article with execution models you had posted for the mapreduce concept with the Hadoop. It was such informative. Really its a wonderful article. Thank you for sharing and please keep update like this type of article because i want to learn more relevant to this topic.

SAS Training in Chennai

8. This comment has been removed by the author.

9. I just see the post i am so happy the post of information's.So I have really enjoyed and reading your blogs for these posts.Any way I’ll be subscribing to your feed and I hope you post again soon.

digital marketing course in chennai

10. Your thinking toward the respective issue is awesome also the idea behind the blog is very interesting which would bring a new evolution in respective field. Thanks for sharing.

Home Spa Services in Mumbai

11. Really nice information you had posted. Its very informative and definitely it will be useful for many people

SEO Company in India

12. I just stumbled upon your blog and wanted to say that I have really enjoyed reading your blog posts

Best Dental Clinic in Velachery | Best Dental Clinic in Tambaram

13. Learning new technolgy would help oneself at hard part of their career. And staying updated is the only way to survive in current position. Your content tells the same. Thanks for sharing this information in here. Keep blogging like this. Android App Development Company in Chennai

14. your information is really awesome as well as it is very excellent and i got more interesting information from your blog. Security Mobile alerts Chennai

15. Very nice post here and thanks for it .I always like and such a super contents of these post.Excellent and very cool idea and great content of different kinds of the valuable information's.
Sat Coaching Chennai

16. This is an awesome post.Really very informative and creative contents. These concept is a good way to enhance the knowledge.I like it and help me to development very well.Thank you for this brief explanation and very nice information.Well, got a good knowledge.

Fresher Jobs in Mumbai
Fresher Jobs in Pune
Fresher Jobs in Noida

17. Thanks for sharing this information and keep updating us. This content is quite informatics to me.
Hadoop Training in Chennai | Hadoop Training Chennai | Big Data Training in Chennai

Hadoop technology has a huge demand in IT Industry.

19. This content is so informatics and it was motivating all the programmers and beginners to switch over the career into the Big Data Technology. This article is so impressed and keeps updating us regularly.
Hadoop Training in Chennai | Hadoop Training Chennai | Big Data Training in Chennai

20. This blog is the general information for the feature. You got a good work for these blog.We have a developing our creative content of this mind.Thank you for this blog. This for very interesting and useful.

Oracle Training in Chennai

21. It's interesting that many of the bloggers to helped clarify a few things for me as well as giving.Most of ideas can be nice content.The people to give them a good shake to get your point and across the command.

22. This blog is the general information for the feature. You got a good work for these blog.We have a developing our creative content of this mind.Thank you for this blog. This for very interesting and useful.
Java Training in Chennai

23. Superb explanation & it's too clear to understand the concept as well, keep sharing admin with some updated information with right examples.Keep update more posts.

Digital Marketing Training in Chennai

24. Very nice post here and thanks for it .I always like and such a super contents of these post.Excellent and very cool idea and great content of different kinds of the valuable information's.
Python Training in Chennai

25. Very nice post here and thanks for it .I always like and such a super contents of these post.Excellent and very cool idea and great content of different kinds of the valuable information's.

26. Very nice post here and thanks for it .I always like and such a super contents of these post.Excellent and very cool idea and great content of different kinds of the valuable information's.

27. Thanks for sharing a good article....it is very nice and informative blog.

Digital Marketing Training in Chennai

30. Very nice post here and thanks for it .I always like and such a super contents of these post.Excellent and very cool idea and great content of different kinds of the valuable information's.

31. I simply couldn’t depart your site before suggesting that I really enjoyed the usual information an individual supply in your visitors? Is going to be again steadily to check out new posts.
Restaurant Interior Designers in Chennai
Turnkey Interiors in Chennai
Corporate Office Interiors in Chennai

32. I have seen a lot of blogs and Info. on other Blogs and Web sites But in this Hadoop Blog Information is useful very thanks for sharing it........

33. Really, these quotes are the holistic approach towards mindfulness. In fact, all of your posts are. Proudly saying I’m getting fruitfulness out of it what you write and share. Thank you so much to both of you.
Sharepoint Training in Chennai
Web Designing Training in Chennai

34. It is amazing and wonderful to visit your site.Thanks for sharing this information,this is useful to me...
Android Training in Chennai
Ios Training in Chennai

35. Thank you for taking the time and sharing this information with us. It was indeed very helpful and insightful while being straight forward and to the point.
Mcdonalds gutscheine | Startlr | salud limpia

36. I do agree with all the ideas you have presented in your post. They’re really convincing and will certainly work. Still, the posts are very short for newbies. Could you please extend them a little from next time? Thanks for the post..
Office movers singapore
Professional movers singapore
Movers company in singapore

37. you are giving a very interesting post and it is usefull.
informatica training in chennai

38. Really nice information here about by choosing with the headlines. We want to make the readers whether it is relevant for their searches or not. They will decide by looking at the headline itself. I agree with your points but i can't understand what's logic behind by including with the number? Why most of the marketers will suggest that one? Is there any important factor within that please convey me.....

VMWare Training in Chennai

MSBI Training in Chennai

39. Thanks for providing valuable information.It saves our time to search..keep update with your blogs..once check it out Big Data Hadoop Online Training Hyderabad

40. Great article,thank u for sharing interesting concept...

41. Thanks for sharing such a great information..Its really nice and informative.

Turnkey Home Interiors Chennai

42. Thanks For Sharing such a good post. Keep sharing...

44. Nice Information One Can also Check other sources to opt for best choice.
Microsoft Dynamics CRM Online Training

45. Revanth Technologies is a vast experienced online training center in Hyderabad, India since 2006, with highly qualified and real time experienced faculties, offers Python online training with real time project scenarios.

In the course training we are covering Types and Operations,Statements and Syntax,Functions,Modules,Classes and OOP, Exceptions and Tools etc..

Mail id: revanthonlinetraining@gmail.com

For course content and more details please visit
http://www.revanthtechnologies.com/python-online-training-from-india.php

46. Hi I have heard about this Reservoir Sampling in MapReduce which comes in Hadoop. Sampling the data into different segments and workingout on different platforms will give the compressed data in and around the subset of elements When I was having my PMP Training in Kuwait I was supposed to get into different examples of hadoop also As it is having the best outcome in the market Hadoop trainers give their examples using this type of sampling Anywayz Nice blog and I hope for more updates from you Thankyou

47. wonderful post and very helpful, thanks for all this information. You are including better information regarding this topic in an effective way.Thank you so much
Web Design Company in Chennai

48. Reservoir Sampling is one of the best technology and Sequential approach for solving the problem. The way you Explaining about it is Really Fantastic... Thank you Very Much... Hope you come with more articles like Microsoft Dynamics CRM . Thanks In Advance

49. Being new to the blogging world I feel like there is still so much to learn. Your tips helped to clarify a few things for me as well as giving..

Base SAS Training in Chennai

MSBI Training in Chennai

50. Thank you for sharing such a nice and interesting blog with us. I have seen that all will say the same thing repeatedly. But in your blog, I had a chance to get some useful and unique information. I would like to suggest your blog in my dude circle.
Web development Company in Chennai

51. It's my pleasure see a blog like this in a real time of blog world-:)

SAP ABAP Training in Chennai

SAP MM Training in Chennai

SAP HR Training in Chennai

52. Splunk is a software that enables, and manages search data from any application, server, and network device in no time. Splunk makes machine data reachable, utilizable and helpful to everyone. It enables the curious to look closely at what others ignore machine data and find what others never see: insights that can help make your company more productive, profitable, competitive and secure.

Revanth Technologies offers Splunk online training with real time concepts and with real time scenarios. With Revanth Technologies provides deep knowledge of Splunk services and their connectivity. Every student will understand high availability concepts and implementations. After finishing this Course in Revanth Technologies you will be well versed in Splunk installation and configuration, Splunk indexes, monitoring and scaling large volumes of search, Report creation, analyzing and sorting data with the Splunk tool. We are providing a free demo class for the students.

Our Online Training Institute's unique features.

1. The trainers have ample experience in this field of work.
2. 24/7 support will be provided.
3. Mock Interviews will be conducted to the students who completed online trainings.
4. Online Training timings will be as per student's convenience only.
5. All the doubts arised at the time of class will be cleared by the trainers.
6. All the training is based on real time scenarios only.

For more details please call us on +91 9290971883, 9247461324 or drop a mail to revanthonlinetraining@gmail.