#Create RDD from parallelize
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd=sc.parallelize(data)
print(rdd.collect())
#Create RDD from external Data source
rdd2 = sc.textFile("/home/ubuntu/data")
print(rdd2.collect())
#Reads entire file into a RDD as single record.
wholeTextFiles() function returns a PairRDD with the key being the file path and value being file content.
rdd3 = sc.wholeTextFiles("/home/ubuntu/data")
print(rdd2.collect())
# Creates empty RDD with no partition
rdd = spark.sparkContext.emptyRDD
#Create empty RDD with partition
rdd2 = spark.sparkContext.parallelize([],10)
TRANSFORMATION
map
Map transformation returns a new RDD by applying a function to each element of this RDD
a = sc.parallelize (
["gowtham",
"nandhu",
"saro"]
)
b = a.map(lambda x: (x, 1))
c = b.collect()
print "Key value pair -> %s" % (c)
flatMap
flatMap is similar to map, because it applies a function to all elements in a RDD. But, flatMap flattens the results.
sc.parallelize([2, 3, 4]).flatMap(lambda x: [x,x,x]).collect()
sc.parallelize([1,2,3]).map(lambda x: [x,x,x]).collect()
filter
Create a new RDD bye returning only the elements that satisfy the search filter. For SQL minded, think where clause.
data = sc.textFile("/home/ubuntu/data_1")
rows = data.map(lambda line: line.split(","))
rows.filter(lambda line: "welcome" in line).collect()
mapPartitions
a = range(1,10)
parallel = sc.parallelize(a, 3)
def f(iterator): yield sum(iterator)
parallel.mapPartitions(f).collect()
Partion 1: 1+2+3 = 6
Partition 2: 4+5+6 = 15
Partition 3: 7+8+9 = 24
example 2
p = sc.parallelize(a)
p.mapPartitions(f).collect()
print sc.defaultParallelism
union
rdd1 = sc.parallelize(range(1,10))
rdd2 = sc.parallelize(range(5,15))
rdd1.union(rdd2).collect()
intersection
rdd1 = sc.parallelize(range(1,10))
rdd2 = sc.parallelize(range(5,15))
rdd1.intersection(rdd2).collect()
distinct
rdd1 = sc.parallelize(range(1,10))
rdd2 = sc.parallelize(range(5,15))
rdd1.union(rdd2).distinct().collect()
join
x = sc.parallelize(("gowtham", "nandhu", "rahul")).map(lambda a: (a, 1))
y = sc.parallelize(("saro", "gowtham", "nila")).map(lambda a: (a, 1))
x.join(y).collect()
x.leftOuterJoin(y).collect()
x.rightOuterJoin(y).collect()
groupByKey
data = [("tv",1000),("phone",3000),("tv",4000),("phone",20000),("C",5))]
rdd=spark.sparkContext.parallelize(data)
rdd.groupByKey.collect()
reduceByKey
data=[("phone", 1),
("tv", 1),
("ac", 1),
("wc", 1),
("phone", 1),
("tv", 1),
("wc", 1),
("phone", 1),
("tv", 1))]
rdd=spark.sparkContext.parallelize(data)
rdd2=rdd.reduceByKey(lambda a,b: a+b)
for element in rdd2.collect():
print(element)
sortyByKey
myRDD = [("a", 1), ("c", 3), ("b", 4)]
rdd=spark.sparkContext.parallelize(data)
rdd.sortyByKey(True).collect()
aggregateByKey
its actually combiner with reducer
name = sc.textFile("/home/ubuntu/name")
a = name.filter(lambda line: "Count" not in line).map(lambda line: line.split(",")) ==> this to remove the header line
b=a.map(lambda n: (str(n[1]), int(n[4]) ) ).aggregateByKey(0, lambda k,v: int(k)+v, lambda k,v: k+v).collect()
print(b)
getNumPartitions() – This a RDD function which returns a number of partitions our dataset split into.
print("initial partition count:"+str(rdd.getNumPartitions()))
#Outputs: initial partition count:2
Set parallelize manually –
rdd2 = spark.sparkContext.parallelize([1,2,3,4,56,7,8,9,12,3], 10)
print("initial partition count:"+str(rdd2.getNumPartitions()))
repartition
repartition() method is a very expensive operation as it shuffles data from all nodes in a cluster.
reparRdd = rdd.repartition(4)
print("re-partition count:"+str(reparRdd.getNumPartitions()))
coalesce
The coalesce reduces the number of partitions in a DataFrame. Coalesce avoids complete shuffle; instead of creating new partitions, it shuffles the data using Hash Partitioner (Default) and adjusts into existing partitions.
rdd3 = rdd1.coalesce(4)
print("Repartition size : "+str(rdd3.getNumPartitions()))
ACTION
Count - action
words = sc.parallelize (
["hi",
"hi",
"hi"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
reduce - action , it aggregates the data
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding -> %i" % (adding)
first - action
a = sc.parallelize(["gowtham", "nandhu", "rahul"])
a.first()
takeSample
Similar to take, in return size of n. Includes boolean option of with or without replacement and random generator seed which defaults to None
a = sc.parallelize(("welcome", "to", "thedatatech", "my", "youtube", "channel and thanks for watching"))
a.takeSample(True, 3)
countByKey
Count the number of elements for each key, and return the result to the master as a dictionary.
a = sc.parallelize(("welcome", "to", "thedatatech", "my", "youtube", "channel and thanks for watching"))
a.map(lambda k: (k,1)).countByKey().items()
saveAsTextFile
a = sc.parallelize(("welcome", "to", "thedatatech", "my", "youtube", "channel and thanks for watching"))
a.map(lambda k: (k,1)).countByKey().items()
a.saveAsTextFile("/home/ubuntu/output.txt")
Stroage levels
from pyspark import SparkContext
import pyspark
cache
words = sc.parallelize (
["hi",
"hi",
"hi"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
MEMORY_AND_DISK_2
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
print(rdd1.getStorageLevel())
Other Options
DISK_ONLY = StorageLevel(True, False, False, False, 1)
DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
OFF_HEAP = StorageLevel(True, True, True, False, 1)
WORD COUNT
rdd = spark.sparkContext.textFile("/home/ubuntu/word")
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())
rdd3 = rdd2.map(lambda x: (x,1))
print(rdd3.collect())
rdd4 = rdd3.reduceByKey(lambda a,b: a+b)
print(rdd4.collect())
Can you share the video link for this ?
ReplyDelete