After experimenting a bit of pyspark I feel Its much better to handle with R / Python. Most of things we can achieve are repetitive between R /Python / Spark / SQL.
- Data Pipeline tasks at DB Level
- One Hot Encoding also can done with basic TSQL Code
- While working in NLP it makes sense to use TF-IDF Vectorizers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#https://spark.apache.org/docs/2.3.0/sql-programming-guide.html | |
import os | |
from pyspark import SparkContext, SparkConf | |
from pyspark.streaming import StreamingContext | |
from pyspark.sql.session import SparkSession | |
spark = SparkSession.builder.getOrCreate() | |
sc = spark.sparkContext | |
df1 = spark.createDataFrame([("Happy",1.0),("Sad",0.9),("Happy",1.5),("Cofee",3.0)],["Name","Size"]) | |
print(df1.count()) | |
df2 = spark.createDataFrame([("Happy",94110),("Happy",94103),("Tea",7012),("Cofee",10504)],["Name","Zip"]) | |
print(df2.count()) | |
df1.printSchema() | |
df1.createOrReplaceTempView("df1") | |
df2.createOrReplaceTempView("df2") | |
spark.sql("SELECT * FROM df1 join df2 WHERE df1.Name = df2.Name").show() | |
spark.sql("SELECT * FROM df1 join df2 WHERE df1.Name = df2.Name").explain() | |
spark.sql("SELECT * FROM df1 inner join df2 WHERE df1.Name = df2.Name").explain() | |
spark.sql("SELECT * FROM df1 left join df2 WHERE df1.Name = df2.Name").explain() | |
spark.sql("SELECT * FROM df1 right join df2 WHERE df1.Name = df2.Name").explain() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pip install optimuspyspark | |
#https://github.com/FavioVazquez/dataday2018/blob/master/02_pyspark_ml.ipynb | |
#http://lintool.github.io/SparkTutorial/ | |
from pyspark.sql.session import SparkSession | |
spark = SparkSession.builder.appName("TestApp").getOrCreate() | |
sc = spark.sparkContext | |
#Load Data | |
from pyspark.sql.types import * | |
labels = [('INFANT_ALIVE_AT_REPORT',IntegerType()),('BIRTH_PLACE',StringType()),('MOTHER_AGE',IntegerType()),('FATHER_AGE',IntegerType()),('CIG_BEFORE',IntegerType()), | |
('CIG1',IntegerType()),('CIG2',IntegerType()),('CIG3',IntegerType()),('MOTHER_HEIGHT',IntegerType()), | |
('MOTHER_PRE_WEIGHT',IntegerType()),('MOTHER_DELIVERY_WEIGHT',IntegerType()),('MOTHER_WEIGHT_GAIN',IntegerType()),('DIABETES_PRE',IntegerType()), | |
('DIABETES_GEST',IntegerType()),('HYP_PRE',IntegerType()),('HYP_GEST',IntegerType()),('PREV_BIRTH',IntegerType())] | |
schema = StructType([StructField(e[0],e[1],False) for e in labels]) | |
births = spark.read.csv(r'E:/births_transformed.csv',header=True,schema=schema) | |
print(births.select('BIRTH_PLACE').show()) | |
#Create transformations | |
from pyspark.ml.feature import * | |
births = births.withColumn('BIRTH_PLACE_INT',births['BIRTH_PLACE'].cast(IntegerType())) | |
print(births.select('BIRTH_PLACE_INT').show()) | |
encoder = OneHotEncoder(inputCol='BIRTH_PLACE_INT',outputCol='BIRTH_PLACE_VEC') | |
#Collate all columns into one | |
featuresCreator=VectorAssembler(inputCols=[col[0] for col in labels[2:]]+[encoder.getOutputCol()],outputCol='features') | |
#create an estimator | |
from pyspark.ml.classification import LogisticRegression | |
#Create model | |
logistic = LogisticRegression(maxIter=10,regParam=0.01,labelCol='INFANT_ALIVE_AT_REPORT') | |
#create a pipeline | |
from pyspark.ml import Pipeline | |
pipeline = Pipeline(stages=[encoder,featuresCreator,logistic]) | |
#fit the model | |
births_train, births_tests = births.randomSplit([0.7,0.3],seed=666) | |
model = pipeline.fit(births_train) | |
test_model = model.transform(births_tests) | |
print(test_model.toPandas().head()) | |
#Model performance | |
from pyspark.ml.evaluation import BinaryClassificationEvaluator | |
evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='INFANT_ALIVE_AT_REPORT') | |
print(evaluator.evaluate(test_model,{evaluator.metricName:'areaUnderROC'})) | |
print(evaluator.evaluate(test_model,{evaluator.metricName:'areaUnderPR'})) | |
#Saving the model | |
pipelinePath = 'modeldata' | |
pipeline.write().overwrite().save(pipelinePath) | |
loadpipeline = Pipeline.load(pipelinePath) | |
loadpipeline.fit(births_train).transform(births_tests).take(1) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#http://spark.apache.org/docs/1.2.1/mllib-clustering.html | |
from pyspark import SparkConf, SparkContext | |
import sys | |
from pyspark.mllib.clustering import KMeans | |
from numpy import array | |
from math import sqrt | |
import requests | |
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating | |
import os | |
import time | |
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.6' | |
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6' | |
from pyspark import SparkContext, SparkConf | |
from pyspark.streaming import StreamingContext | |
conf = SparkConf().setMaster("local[*]") | |
sc = SparkContext.getOrCreate(conf) | |
data = sc.textFile("hdfs://ip-xx-xx-xx-xx:8020//user//bdhcluster16jan//test.data") | |
ratings = data.map(lambda l: l.split(',')).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))) | |
# Build the recommendation model using Alternating Least Squares | |
rank = 10 | |
numIterations = 10 | |
model = ALS.train(ratings, rank, numIterations) | |
# Evaluate the model on training data | |
testdata = ratings.map(lambda p: (p[0], p[1])) | |
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) | |
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) | |
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean() | |
print("Mean Squared Error = " + str(MSE)) | |
#https://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/recommendation.html#Rating | |
#https://www.quora.com/What-is-the-Alternating-Least-Squares-method-in-recommendation-systems-And-why-does-this-algorithm-work-intuition-behind-this | |
r1 = (1, 1, 1.0) | |
r2 = (1, 2, 2.0) | |
r3 = (2, 1, 2.0) | |
ratings = sc.parallelize([r1, r2, r3]) | |
model = ALS.trainImplicit(ratings, 1, seed=10) | |
model.predict(2, 2) | |
testset = sc.parallelize([(1, 2), (1, 1)]) | |
model = ALS.train(ratings, 2, seed=0) | |
model.predictAll(testset).collect() | |
model = ALS.train(ratings, 4, seed=10) | |
model.userFeatures().collect() | |
print(model.recommendUsers(1, 2)) | |
print(model.recommendProducts(1, 2)) | |
print(model.rank) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#https://stackoverflow.com/questions/47585723/kmeans-clustering-in-pyspark | |
import os | |
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.6' | |
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6' | |
from pyspark import SparkContext, SparkConf | |
from pyspark.streaming import StreamingContext | |
from pyspark.sql.session import SparkSession | |
spark = SparkSession.builder.getOrCreate() | |
sc = spark.sparkContext | |
df = spark.createDataFrame([[0, 33.3, -17.5], | |
[1, 40.4, -20.5], | |
[2, 28., -23.9], | |
[3, 29.5, -19.0], | |
[4, 32.8, -18.84] | |
], | |
["other","lat", "long"]) | |
print(df.show()) | |
from pyspark.ml.feature import VectorAssembler | |
vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features") | |
new_df = vecAssembler.transform(df) | |
print(new_df.show()) | |
from pyspark.ml.clustering import KMeans | |
kmeans = KMeans(k=2, seed=1) # 2 clusters here | |
model = kmeans.fit(new_df.select('features')) | |
transformed = model.transform(new_df) | |
print(transformed.show()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.6' | |
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6' | |
from pyspark import SparkContext, SparkConf | |
from pyspark.streaming import StreamingContext | |
from pyspark.sql.session import SparkSession | |
spark = SparkSession.builder.getOrCreate() | |
sc = spark.sparkContext | |
df = spark.createDataFrame([(1,4),(2,5),(3,6)],["A","B"]) | |
print(df.count()) | |
print(df.show()) | |
#Group By | |
from pyspark.sql import functions as F | |
df = spark.createDataFrame([('a',33),('b',11),('a',22)],['names','age']) | |
print(df.show) | |
gby = df.groupBy(df.names) | |
print(gby) | |
print(gby.agg(F.min(df.age)).show()) | |
print(gby.agg(F.max(df.age)).show()) | |
#https://spark.apache.org/docs/2.2.1/sql-programming-guide.html | |
#https://github.com/FavioVazquez/dataday2018/blob/master/01_pyspark_df.ipynb | |
#https://jsonformatter.curiousconcept.com/ | |
string_JSON_RDD = sc.parallelize(("""{ | |
"id":"123", | |
"name":"Argenis", | |
"age":19, | |
"eyeColor":"brown" | |
}""", | |
"""{ | |
"id":"234", | |
"name":"Liliana", | |
"age":22, | |
"eyeColor":"green" | |
}""", | |
"""{ | |
"id":"345", | |
"name":"Ana", | |
"age":23, | |
"eyeColor":"blue" | |
}""")) | |
#create Dataframe | |
swimmers_JSON = spark.read.json(string_JSON_RDD) | |
swimmers_JSON.show(2) | |
swimmers_JSON.show() | |
#create temp table | |
swimmers_JSON.createOrReplaceTempView("swimmersJSONView") | |
swimmers_JSON.show() | |
#Use this view swimmersJSON in next step | |
#sparksql | |
spark.sql("select * from swimmersJSONView").show() | |
#Infer schema using reflection | |
#automaticlaly determines the schema of the data based on reviewing the JSON data. | |
swimmers_JSON.printSchema() | |
#programmatically specifying schema | |
from pyspark.sql.types import * | |
string_CSV_RDD = sc.parallelize([(123,'Argenis',19,'brown'),(234,'Liliana',22,'green'),(345,'Ana',23,'blue')]) | |
schemaString = "id name age eyecolor" | |
schema = StructType([ | |
StructField("id",LongType(),True), | |
StructField("name",StringType(),True), | |
StructField("age",LongType(),True), | |
StructField("eyecolor",StringType(),True) | |
]) | |
#Apply the schema | |
swimmers = spark.createDataFrame(string_CSV_RDD,schema) | |
#create view using schema | |
swimmers.createOrReplaceTempView("swimmers") | |
swimmers.printSchema() | |
spark.sql("select * from swimmers") | |
spark.sql("select * from swimmers").show() | |
spark.sql("select * from swimmers").filter("id=123").show() | |
swimmers.select("id","age").filter("age=22").show() | |
spark.sql("select * from swimmers where age=22").show() | |
spark.sql("select * from swimmers where eyecolor like 'b%'").show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.6' | |
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6' | |
from pyspark import SparkContext, SparkConf | |
from pyspark.streaming import StreamingContext | |
from pyspark.sql.session import SparkSession | |
spark = SparkSession.builder.getOrCreate() | |
sc = spark.sparkContext | |
df = spark.createDataFrame([(1,4),(2,5),(3,6)],["A","B"]) | |
print(df.count()) | |
print(df.show()) | |
from pyspark.sql.types import * | |
string_CSV_RDD = sc.parallelize([(123, 'Argenis', 19, 'brown'), (234, 'Liliana', 22, 'green'), (345, 'Ana', 23, 'blue')]) | |
# The schema is encoded in a string, using StructType we define the schema using various pyspark.sql.types | |
schemaString = "id name age eyeColor" | |
schema = StructType([ | |
StructField("id", LongType(), True), | |
StructField("name", StringType(), True), | |
StructField("age", LongType(), True), | |
StructField("eyeColor", StringType(), True) | |
]) | |
# Apply the schema to the RDD and Create DataFrame | |
swimmers = spark.createDataFrame(string_CSV_RDD, schema) | |
# Creates a temporary view using the DataFrame | |
swimmers.createOrReplaceTempView("swimmers") | |
#show values | |
swimmers.show() | |
#get the id, age where age = 22 | |
swimmers.select("id","age").filter("age=22").show() | |
#search by eye color | |
swimmers.select("name","eyeColor").filter("eyeColor like 'b%'").show() | |
#search by eye color | |
swimmers.select("name","eyeColor").filter("eyeColor like 'b%'").show() | |
swimmers.select("name","eyeColor").filter("eyeColor like 'b%'").explain() | |
swimmers.select("id","age").filter("age=22").explain() | |
swimmers.select("id","age").filter("age>20").explain() | |
swimmers.select("name","eyeColor").filter("name like 'Ana%'").explain() |
Happy Learning!!!
No comments:
Post a Comment