```python linenums=”1” import sys import os sys.path.insert(0, ‘/opt/spark/python’) sys.path.insert(0, ‘/opt/spark/python/lib/py4j-0.10.9.7-src.zip’) os.environ[‘SPARK_HOME’] = ‘/opt/spark’
import pyspark
conf = pyspark.SparkConf() conf.setMaster(“spark://spark-master:7077”) conf.set(“spark.driver.memory”,”1g”)
sc = pyspark.SparkContext(conf=conf)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
---
## 2. Matrix-vector multiplication
### 2.1. Initial data
```python linenums="1"
import numpy as np
np.random.seed(123)
N = 4
M = np.random.randint(5, size = (N, N))
V = np.random.randint(5, size = N)
P = np.matmul(M, V)
# Print the matrices
print(f"Matrix M:\n {M}")
print(f"Vector N:\n {V}")
print(f"Dot Product P:\n {P}")
???note “Output” output Matrix M: [[2 4 2 1] [3 2 3 1] [1 0 1 1] [0 0 1 3]] Vector N: [4 0 0 4] Dot Product P: [12 16 8 12]
!!!warning “Initial assumption” Vector V fits into memory
parallelize into RDD objects on the cluster.```python linenums=”1” mspark = sc.parallelize(M) print(mspark.take(4))
1
2
3
4
5
6
7
8
9
10
11
12
???note "Output"
```output
[array([2, 4, 2, 1]), array([3, 2, 3, 1]), array([1, 0, 1, 1]), array([0, 0, 1, 3])]
```
- This will not work well, as we no longer have an indicator of row order. We need to provide some additional information as we parallelize our local data.
- [zipWithIndex](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.zipWithIndex.html)
```python linenums="1"
mspark = sc.parallelize(M).zipWithIndex()
print(mspark.take(4))
???note “Output”
1
2
3
```output
[(array([2, 4, 2, 1]), 0), (array([3, 2, 3, 1]), 1), (array([1, 0, 1, 1]), 2), (array([0, 0, 1, 3]), 3)]
```
```python linenums=”1” def vectorDot(mRow): return mRow.dot(V)
mspark = sc.parallelize(M).zipWithIndex().
map(lambda item: (vectorDot(item[0])))
print(f”MapReduce Dot Product:\n {np.array(mspark.take(4))}”) print(f”Dot Product P:\n {P}”)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
???note "Output"
```output
MapReduce Dot Product:
[12 16 8 12]
Dot Product P:
[12 16 8 12]
```
!!!warning "Remove assumption"
- Vector V no longer fits into memory
### 2.4. Data processing for multiplication
- Both matrix M and vector V are loaded onto Spark
```python linenums="1"
mspark = sc.parallelize(M).zipWithIndex()
vspark = sc.parallelize(V).zipWithIndex()
print(mspark.take(4))
print(vspark.take(4))
???note “Output”
1
2
3
4
5
6
7
```output
[(array([2, 4, 2, 1]), 0),
(array([3, 2, 3, 1]), 1),
(array([1, 0, 1, 1]), 2),
(array([0, 0, 1, 3]), 3)]
[(np.int64(4), 0), (np.int64(0), 1), (np.int64(0), 2), (np.int64(4), 3)]
```
```python linenums=”1” mspark = sc.parallelize(M).zipWithIndex().map(lambda item: (item[1],item[0])) vspark = sc.parallelize(V).zipWithIndex().map(lambda item: (item[1],item[0])) print(mspark.take(4)) print(vspark.take(4))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
???note "Output"
```output
[(0, array([2, 4, 2, 1])),
(1, array([3, 2, 3, 1])),
(2, array([1, 0, 1, 1])),
(3, array([0, 0, 1, 3]))]
[(0, np.int64(4)), (1, np.int64(0)), (2, np.int64(0)), (3, np.int64(4))]
```
- Let's turn RDDs of M and V into a single RDD
```python linenums="1"
pspark = mspark.union(vspark)
print(pspark.take(8))
???note “Output”
1
2
3
4
5
6
7
8
9
10
```output
[(0, array([2, 4, 2, 1])),
(1, array([3, 2, 3, 1])),
(2, array([1, 0, 1, 1])),
(3, array([0, 0, 1, 3])),
(0, np.int64(4)),
(1, np.int64(0)),
(2, np.int64(0)),
(3, np.int64(4))]
```
Initial data). union ```python linenums=”1” def getPos(t): res = [] for idx,x in np.ndenumerate(t[1]): res.append((idx[0], (t[0], x))) # (row, (column, cell value)) return res
pspark = mspark.flatMap(getPos).union(vspark) print(pspark.take(20))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
???note "Output"
```output
[(0, (0, np.int64(2))), (1, (0, np.int64(4))), (2, (0, np.int64(2))), (3, (0, np.int64(1))),
(0, (1, np.int64(3))), (1, (1, np.int64(2))), (2, (1, np.int64(3))), (3, (1, np.int64(1))),
(0, (2, np.int64(1))), (1, (2, np.int64(0))), (2, (2, np.int64(1))), (3, (2, np.int64(1))),
(0, (3, np.int64(0))), (1, (3, np.int64(0))), (2, (3, np.int64(1))), (3, (3, np.int64(3))),
(0, np.int64(4)), (1, np.int64(0)), (2, np.int64(0)), (3, np.int64(4))]
```
- The keys of the above pairs represent the row identifier.
- If value is a tuple, first element is the column identifier and second element
represent the value of the matrix cell corresponding to the specific row and column.
```python linenums="1"
def getPos(t):
res = []
for idx,x in np.ndenumerate(t[1]):
res.append((idx[0], (t[0], x)))
return res
pspark = mspark.flatMap(getPos).union(vspark).groupByKey()
print(pspark.take(16))
???note “Output”
1
2
3
4
5
6
```output
[(0, <pyspark.resultiterable.ResultIterable object at 0xffff885f62c0>),
(1, <pyspark.resultiterable.ResultIterable object at 0xffff885f4d30>),
(2, <pyspark.resultiterable.ResultIterable object at 0xffff885f6a70>),
(3, <pyspark.resultiterable.ResultIterable object at 0xffff885f6ec0>)]
```
list type instead of Spark’s iterables ```python linenums=”1” def getPos(t): res = [] for idx,x in np.ndenumerate(t[1]): res.append((idx[0], (t[0], x))) return res
pspark = mspark.flatMap(getPos).union(vspark).groupByKey().mapValues(list) print(pspark.take(16))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
???note "Output"
```output
[(0, [(1, np.int64(3)), (2, np.int64(1)), (3, np.int64(0)), (0, np.int64(2)), np.int64(4)]),
(1, [(3, np.int64(0)), np.int64(0), (0, np.int64(4)), (1, np.int64(2)), (2, np.int64(0))]),
(2, [(0, np.int64(2)), (1, np.int64(3)), (3, np.int64(1)), (2, np.int64(1)), np.int64(0)]),
(3, [(1, np.int64(1)), np.int64(4), (3, np.int64(3)), (0, np.int64(1)), (2, np.int64(1))])]
```
- Each element in this `pspark` frame is a key/value pair:
- The key is the row index
- The value is a 5-element array:
- Four elements belong to a row of the matrix M, each of this element is a tuple with the key representing the column index and the value represent the corresponding matrix cell value.
- One remaining element belong to the value of the vector V at that row index position.
- As shown in the previous output, we cannot assume proper order after union, especially for large scale data
- Need to flatten the final results (list of tupples instead of list of lists of tuples)
```python linenums="1"
def getPos(t):
res = []
for idx,x in np.ndenumerate(t[1]):
res.append((idx[0], (t[0], x)))
return res
def arrayMul(t):
res = []
vectorVal = 0
for x in t[1]:
if type(x) != tuple:
vectorVal = x
for x in t[1]:
if type(x) == tuple:
res.append((x[0], x[1] * vectorVal))
return res
pspark = mspark.flatMap(getPos).union(vspark).groupByKey().mapValues(list).flatMap(arrayMul)
print(pspark.take(16))
???note “Output”
1
2
3
4
5
6
```output
[(3, np.int64(0)), (2, np.int64(4)), (0, np.int64(8)), (1, np.int64(12)),
(1, np.int64(0)), (2, np.int64(0)), (0, np.int64(0)), (3, np.int64(0)),
(3, np.int64(0)), (2, np.int64(0)), (1, np.int64(0)), (0, np.int64(0)),
(0, np.int64(4)), (2, np.int64(4)), (3, np.int64(12)), (1, np.int64(4))]
```
iterables into list.```python linenums=”1” dotspark = pspark.groupByKey().mapValues(list).mapValues(sum).map(lambda x: x[1]) print(f”MapReduce Dot Product:\n {np.array(dotspark.take(4))}”) print(f”Dot Product P:\n {P}”)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
???note "Output"
```output
MapReduce Dot Product:
[12 16 8 12]
Dot Product P:
[12 16 8 12]
```
---
## 3. Analyzing text data (not using Spark SQL)
### 3.1. Getting MovieLens data
- Downloading move review data
```python linenums="1"
!wget https://files.grouplens.org/datasets/movielens/ml-32m.zip
!unzip ml-32m.zip
```python linenums=”1” !ls -lh ml-32m/
1
2
3
4
5
6
7
- You can review the [README](https://files.grouplens.org/datasets/movielens/ml-32m-README.html) of this data set.
```python linenums="1"
ratings = sc.textFile(working_dir + "/ml-32m/ratings.csv")
print(ratings.take(5))
ratings.cache()
```python linenums=”1” %%time ratings.count()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
???note "Output"
```output
CPU times: user 252 ms, sys: 28.2 ms, total: 280 ms
Wall time: 1min 40s
32000205
```
- Second count, preloaded
```python linenums="1"
%%time
ratings.count()
???note “Output”
1
2
3
4
5
```output
CPU times: user 147 ms, sys: 19.8 ms, total: 167 ms
Wall time: 58.4 s
32000205
```
python linenums="1" %%time ratings.count()
???note “Output”
1
2
3
4
5
```output
CPU times: user 147 ms, sys: 19.8 ms, total: 167 ms
Wall time: 58.4 s
32000205
```
???question “Hands-on 1” - What are the average ratings over the years of each movie?
- Note: don’t forget to filter!
???question “Hands-on 2” - What are the average ratings over the years of each movie? - Display the results with movie titles instead of movie IDs - Assumption: movies.csv does not fit in memory
???question “Hands-on 3” - Identify movies that can be considered cult-classic?
???question “Hands-on 4” - What are the average ratings over the years of each genre - Assumption: movies.csv does not fit in memory