"No one is harder on a talented person than the person themselves" - Linda Wilkinson ; "Trust your guts and don't follow the herd" ; "Validate direction not destination" ;

October 05, 2018

Deep Dive PySpark Examples - Big Data Setup - Part II

After experimenting a bit of pyspark I feel Its much better to handle with R / Python. Most of things we can achieve are repetitive between R /Python / Spark / SQL.

  • Data Pipeline tasks at DB Level
  • One Hot Encoding also can done with basic TSQL Code
  • While working in NLP it makes sense to use TF-IDF Vectorizers
#https://spark.apache.org/docs/2.3.0/sql-programming-guide.html
import os
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
df1 = spark.createDataFrame([("Happy",1.0),("Sad",0.9),("Happy",1.5),("Cofee",3.0)],["Name","Size"])
print(df1.count())
df2 = spark.createDataFrame([("Happy",94110),("Happy",94103),("Tea",7012),("Cofee",10504)],["Name","Zip"])
print(df2.count())
df1.printSchema()
df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
spark.sql("SELECT * FROM df1 join df2 WHERE df1.Name = df2.Name").show()
spark.sql("SELECT * FROM df1 join df2 WHERE df1.Name = df2.Name").explain()
spark.sql("SELECT * FROM df1 inner join df2 WHERE df1.Name = df2.Name").explain()
spark.sql("SELECT * FROM df1 left join df2 WHERE df1.Name = df2.Name").explain()
spark.sql("SELECT * FROM df1 right join df2 WHERE df1.Name = df2.Name").explain()
view raw sparkdfjoin.py hosted with ❤ by GitHub
#pip install optimuspyspark
#https://github.com/FavioVazquez/dataday2018/blob/master/02_pyspark_ml.ipynb
#http://lintool.github.io/SparkTutorial/
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName("TestApp").getOrCreate()
sc = spark.sparkContext
#Load Data
from pyspark.sql.types import *
labels = [('INFANT_ALIVE_AT_REPORT',IntegerType()),('BIRTH_PLACE',StringType()),('MOTHER_AGE',IntegerType()),('FATHER_AGE',IntegerType()),('CIG_BEFORE',IntegerType()),
('CIG1',IntegerType()),('CIG2',IntegerType()),('CIG3',IntegerType()),('MOTHER_HEIGHT',IntegerType()),
('MOTHER_PRE_WEIGHT',IntegerType()),('MOTHER_DELIVERY_WEIGHT',IntegerType()),('MOTHER_WEIGHT_GAIN',IntegerType()),('DIABETES_PRE',IntegerType()),
('DIABETES_GEST',IntegerType()),('HYP_PRE',IntegerType()),('HYP_GEST',IntegerType()),('PREV_BIRTH',IntegerType())]
schema = StructType([StructField(e[0],e[1],False) for e in labels])
births = spark.read.csv(r'E:/births_transformed.csv',header=True,schema=schema)
print(births.select('BIRTH_PLACE').show())
#Create transformations
from pyspark.ml.feature import *
births = births.withColumn('BIRTH_PLACE_INT',births['BIRTH_PLACE'].cast(IntegerType()))
print(births.select('BIRTH_PLACE_INT').show())
encoder = OneHotEncoder(inputCol='BIRTH_PLACE_INT',outputCol='BIRTH_PLACE_VEC')
#Collate all columns into one
featuresCreator=VectorAssembler(inputCols=[col[0] for col in labels[2:]]+[encoder.getOutputCol()],outputCol='features')
#create an estimator
from pyspark.ml.classification import LogisticRegression
#Create model
logistic = LogisticRegression(maxIter=10,regParam=0.01,labelCol='INFANT_ALIVE_AT_REPORT')
#create a pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[encoder,featuresCreator,logistic])
#fit the model
births_train, births_tests = births.randomSplit([0.7,0.3],seed=666)
model = pipeline.fit(births_train)
test_model = model.transform(births_tests)
print(test_model.toPandas().head())
#Model performance
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test_model,{evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test_model,{evaluator.metricName:'areaUnderPR'}))
#Saving the model
pipelinePath = 'modeldata'
pipeline.write().overwrite().save(pipelinePath)
loadpipeline = Pipeline.load(pipelinePath)
loadpipeline.fit(births_train).transform(births_tests).take(1)
#http://spark.apache.org/docs/1.2.1/mllib-clustering.html
from pyspark import SparkConf, SparkContext
import sys
from pyspark.mllib.clustering import KMeans
from numpy import array
from math import sqrt
import requests
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import os
import time
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.6'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6'
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf().setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
data = sc.textFile("hdfs://ip-xx-xx-xx-xx:8020//user//bdhcluster16jan//test.data")
ratings = data.map(lambda l: l.split(',')).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
#https://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/recommendation.html#Rating
#https://www.quora.com/What-is-the-Alternating-Least-Squares-method-in-recommendation-systems-And-why-does-this-algorithm-work-intuition-behind-this
r1 = (1, 1, 1.0)
r2 = (1, 2, 2.0)
r3 = (2, 1, 2.0)
ratings = sc.parallelize([r1, r2, r3])
model = ALS.trainImplicit(ratings, 1, seed=10)
model.predict(2, 2)
testset = sc.parallelize([(1, 2), (1, 1)])
model = ALS.train(ratings, 2, seed=0)
model.predictAll(testset).collect()
model = ALS.train(ratings, 4, seed=10)
model.userFeatures().collect()
print(model.recommendUsers(1, 2))
print(model.recommendProducts(1, 2))
print(model.rank)
view raw Pyspark_ALS.py hosted with ❤ by GitHub
#https://stackoverflow.com/questions/47585723/kmeans-clustering-in-pyspark
import os
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.6'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6'
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
df = spark.createDataFrame([[0, 33.3, -17.5],
[1, 40.4, -20.5],
[2, 28., -23.9],
[3, 29.5, -19.0],
[4, 32.8, -18.84]
],
["other","lat", "long"])
print(df.show())
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
print(new_df.show())
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=2, seed=1) # 2 clusters here
model = kmeans.fit(new_df.select('features'))
transformed = model.transform(new_df)
print(transformed.show())
view raw SparkKmeans.py hosted with ❤ by GitHub
import os
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.6'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6'
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
df = spark.createDataFrame([(1,4),(2,5),(3,6)],["A","B"])
print(df.count())
print(df.show())
#Group By
from pyspark.sql import functions as F
df = spark.createDataFrame([('a',33),('b',11),('a',22)],['names','age'])
print(df.show)
gby = df.groupBy(df.names)
print(gby)
print(gby.agg(F.min(df.age)).show())
print(gby.agg(F.max(df.age)).show())
#https://spark.apache.org/docs/2.2.1/sql-programming-guide.html
#https://github.com/FavioVazquez/dataday2018/blob/master/01_pyspark_df.ipynb
#https://jsonformatter.curiousconcept.com/
string_JSON_RDD = sc.parallelize(("""{
"id":"123",
"name":"Argenis",
"age":19,
"eyeColor":"brown"
}""",
"""{
"id":"234",
"name":"Liliana",
"age":22,
"eyeColor":"green"
}""",
"""{
"id":"345",
"name":"Ana",
"age":23,
"eyeColor":"blue"
}"""))
#create Dataframe
swimmers_JSON = spark.read.json(string_JSON_RDD)
swimmers_JSON.show(2)
swimmers_JSON.show()
#create temp table
swimmers_JSON.createOrReplaceTempView("swimmersJSONView")
swimmers_JSON.show()
#Use this view swimmersJSON in next step
#sparksql
spark.sql("select * from swimmersJSONView").show()
#Infer schema using reflection
#automaticlaly determines the schema of the data based on reviewing the JSON data.
swimmers_JSON.printSchema()
#programmatically specifying schema
from pyspark.sql.types import *
string_CSV_RDD = sc.parallelize([(123,'Argenis',19,'brown'),(234,'Liliana',22,'green'),(345,'Ana',23,'blue')])
schemaString = "id name age eyecolor"
schema = StructType([
StructField("id",LongType(),True),
StructField("name",StringType(),True),
StructField("age",LongType(),True),
StructField("eyecolor",StringType(),True)
])
#Apply the schema
swimmers = spark.createDataFrame(string_CSV_RDD,schema)
#create view using schema
swimmers.createOrReplaceTempView("swimmers")
swimmers.printSchema()
spark.sql("select * from swimmers")
spark.sql("select * from swimmers").show()
spark.sql("select * from swimmers").filter("id=123").show()
swimmers.select("id","age").filter("age=22").show()
spark.sql("select * from swimmers where age=22").show()
spark.sql("select * from swimmers where eyecolor like 'b%'").show()
view raw SparkDFSQL.py hosted with ❤ by GitHub
import os
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.6'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6'
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
df = spark.createDataFrame([(1,4),(2,5),(3,6)],["A","B"])
print(df.count())
print(df.show())
from pyspark.sql.types import *
string_CSV_RDD = sc.parallelize([(123, 'Argenis', 19, 'brown'), (234, 'Liliana', 22, 'green'), (345, 'Ana', 23, 'blue')])
# The schema is encoded in a string, using StructType we define the schema using various pyspark.sql.types
schemaString = "id name age eyeColor"
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("eyeColor", StringType(), True)
])
# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(string_CSV_RDD, schema)
# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")
#show values
swimmers.show()
#get the id, age where age = 22
swimmers.select("id","age").filter("age=22").show()
#search by eye color
swimmers.select("name","eyeColor").filter("eyeColor like 'b%'").show()
#search by eye color
swimmers.select("name","eyeColor").filter("eyeColor like 'b%'").show()
swimmers.select("name","eyeColor").filter("eyeColor like 'b%'").explain()
swimmers.select("id","age").filter("age=22").explain()
swimmers.select("id","age").filter("age>20").explain()
swimmers.select("name","eyeColor").filter("name like 'Ana%'").explain()

Happy Learning!!!


No comments: