Spark RDD Boot Camp

 #Create RDD from parallelize    

data = [1,2,3,4,5,6,7,8,9,10,11,12]

rdd=sc.parallelize(data)

print(rdd.collect())



#Create RDD from external Data source
rdd2 = sc.textFile("/home/ubuntu/data")
print(rdd2.collect())


#Reads entire file into a RDD as single record.
wholeTextFiles() function returns a PairRDD with the key being the file path and value being file content.

rdd3 = sc.wholeTextFiles("/home/ubuntu/data")
print(rdd2.collect())


# Creates empty RDD with no partition    
rdd = spark.sparkContext.emptyRDD 
#Create empty RDD with partition
rdd2 = spark.sparkContext.parallelize([],10)

TRANSFORMATION

map 
Map transformation returns a new RDD by applying a function to each element of this RDD

a = sc.parallelize (
   ["gowtham", 
   "nandhu", 
   "saro"]
)
b = a.map(lambda x: (x, 1))
c = b.collect()
print "Key value pair -> %s" % (c)



flatMap
flatMap is similar to map, because it applies a function to all elements in a RDD.  But, flatMap flattens the results.

sc.parallelize([2, 3, 4]).flatMap(lambda x: [x,x,x]).collect()
sc.parallelize([1,2,3]).map(lambda x: [x,x,x]).collect()


filter

Create a new RDD bye returning only the elements that satisfy the search filter.  For SQL minded, think where clause.

data = sc.textFile("/home/ubuntu/data_1")
rows = data.map(lambda line: line.split(","))
rows.filter(lambda line: "welcome" in line).collect()


mapPartitions


a = range(1,10)
parallel = sc.parallelize(a, 3)
def f(iterator): yield sum(iterator)
parallel.mapPartitions(f).collect()


Partion 1: 1+2+3 = 6

Partition 2: 4+5+6 = 15

Partition 3: 7+8+9 = 24



example 2 
p = sc.parallelize(a)
p.mapPartitions(f).collect()


print sc.defaultParallelism
union

rdd1 = sc.parallelize(range(1,10))
rdd2 = sc.parallelize(range(5,15))

rdd1.union(rdd2).collect()


intersection


rdd1 = sc.parallelize(range(1,10))
rdd2 = sc.parallelize(range(5,15))

rdd1.intersection(rdd2).collect()


distinct


rdd1 = sc.parallelize(range(1,10))
rdd2 = sc.parallelize(range(5,15))

rdd1.union(rdd2).distinct().collect()


join 

x = sc.parallelize(("gowtham", "nandhu", "rahul")).map(lambda a: (a, 1))
y = sc.parallelize(("saro", "gowtham", "nila")).map(lambda a: (a, 1))

x.join(y).collect()
x.leftOuterJoin(y).collect()
x.rightOuterJoin(y).collect()


groupByKey


data = [("tv",1000),("phone",3000),("tv",4000),("phone",20000),("C",5))]
rdd=spark.sparkContext.parallelize(data)
rdd.groupByKey.collect()


reduceByKey

data=[("phone", 1),
  ("tv", 1),
  ("ac", 1),
  ("wc", 1),
  ("phone", 1),
  ("tv", 1),
  ("wc", 1),
  ("phone", 1),
  ("tv", 1))]

rdd=spark.sparkContext.parallelize(data)
rdd2=rdd.reduceByKey(lambda a,b: a+b)
for element in rdd2.collect():
    print(element)
sortyByKey

myRDD = [("a", 1), ("c", 3), ("b", 4)]
rdd=spark.sparkContext.parallelize(data)
rdd.sortyByKey(True).collect()

aggregateByKey

its actually combiner with reducer 

name = sc.textFile("/home/ubuntu/name")
a = name.filter(lambda line: "Count" not in line).map(lambda line: line.split(","))   ==> this to remove the header line 
b=a.map(lambda n:  (str(n[1]), int(n[4]) ) ).aggregateByKey(0, lambda k,v: int(k)+v, lambda k,v: k+v).collect() 
                                                     
print(b)

getNumPartitions() – This a RDD function which returns a number of partitions our dataset split into.

print("initial partition count:"+str(rdd.getNumPartitions()))
#Outputs: initial partition count:2

Set parallelize manually –
rdd2 = spark.sparkContext.parallelize([1,2,3,4,56,7,8,9,12,3], 10)
print("initial partition count:"+str(rdd2.getNumPartitions()))

repartition

repartition() method is a very expensive operation as it shuffles data from all nodes in a cluster. 

reparRdd = rdd.repartition(4)
print("re-partition count:"+str(reparRdd.getNumPartitions()))

coalesce

The coalesce reduces the number of partitions in a DataFrame. Coalesce avoids complete shuffle; instead of creating new partitions, it shuffles the data using Hash Partitioner (Default) and adjusts into existing partitions.

rdd3 = rdd1.coalesce(4)
print("Repartition size : "+str(rdd3.getNumPartitions()))

ACTION

Count - action

words = sc.parallelize (
   ["hi", 
   "hi", 
   "hi"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)



reduce - action , it aggregates the data 

nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding -> %i" % (adding)


first - action 
a = sc.parallelize(["gowtham", "nandhu", "rahul"])
a.first()

takeSample

Similar to take, in return size of n.  Includes boolean option  of with or without replacement and random generator seed which defaults to None


a = sc.parallelize(("welcome", "to", "thedatatech", "my", "youtube", "channel and thanks for watching"))
a.takeSample(True, 3)


countByKey
Count the number of elements for each key, and return the result to the master as a dictionary.

a = sc.parallelize(("welcome", "to", "thedatatech", "my", "youtube", "channel and thanks for watching"))
a.map(lambda k: (k,1)).countByKey().items()


saveAsTextFile

a = sc.parallelize(("welcome", "to", "thedatatech", "my", "youtube", "channel and thanks for watching"))
a.map(lambda k: (k,1)).countByKey().items()
a.saveAsTextFile("/home/ubuntu/output.txt")


Stroage levels 

from pyspark import SparkContext
import pyspark

cache

words = sc.parallelize (
   ["hi", 
   "hi", 
   "hi"]
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)


MEMORY_AND_DISK_2 

rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
print(rdd1.getStorageLevel())

Other Options 

DISK_ONLY = StorageLevel(True, False, False, False, 1)

DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)

MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)

MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)

MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)

MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)

MEMORY_ONLY = StorageLevel(False, True, False, False, 1)

MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)

MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)

MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)

OFF_HEAP = StorageLevel(True, True, True, False, 1) 


WORD COUNT


rdd = spark.sparkContext.textFile("/home/ubuntu/word") 
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())
rdd3 = rdd2.map(lambda x: (x,1))
print(rdd3.collect())
rdd4 = rdd3.reduceByKey(lambda a,b: a+b)
print(rdd4.collect())

Comments