Random Forest is a commonly used classification technique
nowadays. In this blog, I'll demonstrate how to run a Random Forest in Pyspark.
We will have three datasets - train data, test data and scoring
data.
The train data will be the data on which the Random Forest model
will be trained. The test data will be the one where we will be testing our
model performance. And, finally the scoring data will be the one for which we
would be predicting the Y variable based on the attributes.
The train data and the test data looks like the following:
Customer_ID
Y_var
X1
X2
X3
.
.
.
The scoring data looks like the following:
Customer_ID
X1
X2
X3
.
.
.
where the Y_var is the dependent variable and X1 to Xn are
the independent attributes.
The complete code is saved in the github location:
The first step would be to import the data in pyspark. As we can
see, when we import using sqlContext.sql, the resulting database is a
sql.DataFrame. We will then convert these to rdd format
#
------------------------------------------------------------------------------
# Step 1:
# Creating an Pyspark dataframe from a hive table
# Importing the train data, the test data and the scoring data
# ------------------------------------------------------------------------------
data_train = sqlContext.sql("SELECT * FROM
my_db.Sample_50pct_train")
data_test = sqlContext.sql("SELECT * FROM
my_db.Sample_50pct_test")
data_score = sqlContext.sql("SELECT * FROM
my_db.Sample_scoring")
# ------------------------------------------------------------------------------
# Type of object
#
------------------------------------------------------------------------------
type(data_train)
# <class 'pyspark.sql.dataframe.DataFrame'>
# ------------------------------------------------------------------------------
# Step 2:
# Converting to a pyspark RDD from pyspark Dataframe
#
------------------------------------------------------------------------------
data_train_rdd = data_train.rdd
data_test_rdd = data_test.rdd
data_score_rdd = data_score.rdd
type(data_train_rdd)
#<class 'pyspark.rdd.RDD'>
# In order to run the Random Forest in Pyspark, we need to
convert the Data Frame to an RDD of LabeledPoint.
# The LabeledPoint rdd looks as follows:
# [LabeledPoint(0.0, [100.45, 31.25, 76.12 ]), LabeledPoint(1.0,
[110.45, 25.53, 70.0])]
# where 0.0 and 1.0 are the Y variables for the two
records, and the next vector is a vector of X1, X2,X3.
#So, every record is re-written as a LabeledPoint. We can convert
a rdd to a LabeledPoint as follows:
#
------------------------------------------------------------------------------
# Step 3:
# Importing libraries for converting the data frame to a dense
vector
# We need to convert this Data Frame to an RDD of LabeledPoint.
#
------------------------------------------------------------------------------
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
#
------------------------------------------------------------------------------
# Step 4(a):
# For the train and test data, the structure is as follows:
# The first row is Customer_ID; second
row is the Y variable; and the third row onwards are the X Variables
# ------------------------------------------------------------------------------
transformed_train_df = data_train_rdd.map(lambda row:
LabeledPoint(row[1], Vectors.dense(row[2:])))
transformed_test_df = data_test_rdd.map(lambda row:
LabeledPoint(row[1], Vectors.dense(row[2:])))
#
------------------------------------------------------------------------------
# Step 4(b):
# For the scoring data, the structure is as follows:
# The first row is the Customer_ID; and the
second row onwards are the X Variables
# ------------------------------------------------------------------------------
transformed_score_df = data_score_rdd.map(lambda row:
LabeledPoint(row[0], Vectors.dense(row[1:])))
#
------------------------------------------------------------------------------
# Step 5:
# Random Forest model
#
------------------------------------------------------------------------------
from pyspark.mllib.tree import RandomForest
#
------------------------------------------------------------------------------
# Step 5(a):
# Parameters for the Random Forest model
#
------------------------------------------------------------------------------
RANDOM_SEED = 10904
RF_NUM_TREES = 100
RF_MAX_DEPTH = 4
RF_MAX_BINS = 100
#
------------------------------------------------------------------------------
# Step 5(b):
# Training a Random Forest model on the dataset
#
------------------------------------------------------------------------------
model = RandomForest.trainClassifier(transformed_train_df, \
numClasses=2, categoricalFeaturesInfo={}, \
numClasses=2, categoricalFeaturesInfo={}, \
numTrees=RF_NUM_TREES,
featureSubsetStrategy="log2", \
impurity="entropy", maxDepth=RF_MAX_DEPTH, \
maxBins=RF_MAX_BINS, seed=RANDOM_SEED)
impurity="entropy", maxDepth=RF_MAX_DEPTH, \
maxBins=RF_MAX_BINS, seed=RANDOM_SEED)
# ------------------------------------------------------------------------------
# Step 5(c):
# Make predictions and compute accuracy
#
------------------------------------------------------------------------------
predictions = model.predict(transformed_test_df.map(lambda x:
x.features))
labels_and_predictions = transformed_test_df.map(lambda x:
x.label).zip(predictions)
model_accuracy = labels_and_predictions.filter(lambda x: x[0] ==
x[1]).count() / float(transformed_test_df.count())
print("Model accuracy: %.3f%%" % (model_accuracy * 100))
#
------------------------------------------------------------------------------
# Step 5(d):
# Model evaluation
#
------------------------------------------------------------------------------
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(labels_and_predictions)
print("Area under Precision/Recall (PR) curve: %.f" %
(metrics.areaUnderPR * 100))
print("Area under Receiver Operating Characteristic (ROC)
curve: %.3f" % (metrics.areaUnderROC * 100))
#
------------------------------------------------------------------------------
# Step 6:
# Scoring dataset
#
------------------------------------------------------------------------------
score_predictions =
model.predict(transformed_score_df.map(lambda x: x.features))
score_labels_and_predictions = transformed_score_df.map(lambda
x: x.label).zip(score_predictions)
#
------------------------------------------------------------------------------
# Step 7:
# Creating a final hive table for the scored data
#
------------------------------------------------------------------------------
score_df = spark.createDataFrame(score_labels_and_predictions)
score_df.createOrReplaceTempView("scoring_file")
spark.sql("drop table if exists
my_db.pyspark_scored_sample")
spark.sql("create table my_db.pyspark_scored_sample
(Customer_ID bigint, prediction int)")
spark.sql("insert into my_db.pyspark_scored_sample select *
from scoring_file")
Hi Aaron, If you like this content, please visit the new page at https://incipientanalyst.wordpress.com/. Your kind words motivate me to create new posts.
ReplyDelete