Total views

Thursday, February 10, 2022

Moving to a new address

This blog has been moved to a new address at https://incipientanalyst.wordpress.com/

There hasn't been much update to the blog in recent time, but I hope to have more contents more regularly.

Please bookmark the new address: https://incipientanalyst.wordpress.com/

See you in the new platform.

Monday, January 22, 2018

Random Forest in Pyspark

Random Forest is a commonly used classification technique nowadays. In this blog, I'll demonstrate how to run a Random Forest in Pyspark.

We will have three datasets - train data, test data and scoring data.
The train data will be the data on which the Random Forest model will be trained. The test data will be the one where we will be testing our model performance. And, finally the scoring data will be the one for which we would be predicting the Y variable based on the attributes.

The train data and the test data looks like the following:
Customer_ID
Y_var
X1
X2
X3
.
.
.

The scoring data looks like the following:
Customer_ID
X1
X2
X3
.
.
.

 where the Y_var is the dependent variable and X1 to Xn are the independent attributes.

The complete code is saved in the github location:

The first step would be to import the data in pyspark. As we can see, when we import using sqlContext.sql, the resulting database is a sql.DataFrame. We will then convert these to rdd format

# ------------------------------------------------------------------------------
# Step 1:
# Creating an Pyspark dataframe from a hive table
# Importing the train data, the test data and the scoring data
# ------------------------------------------------------------------------------

data_train = sqlContext.sql("SELECT * FROM my_db.Sample_50pct_train")
data_test = sqlContext.sql("SELECT * FROM my_db.Sample_50pct_test")
data_score = sqlContext.sql("SELECT * FROM my_db.Sample_scoring")


# ------------------------------------------------------------------------------
# Type of object
# ------------------------------------------------------------------------------
type(data_train)
# <class 'pyspark.sql.dataframe.DataFrame'>


# ------------------------------------------------------------------------------
# Step 2:
# Converting to a pyspark RDD from pyspark Dataframe
# ------------------------------------------------------------------------------

data_train_rdd = data_train.rdd
data_test_rdd = data_test.rdd
data_score_rdd = data_score.rdd
type(data_train_rdd)
#<class 'pyspark.rdd.RDD'>


# In order to run the Random Forest in Pyspark, we need to convert the Data Frame to an RDD of LabeledPoint.
# The LabeledPoint rdd looks as follows:

# [LabeledPoint(0.0, [100.45, 31.25, 76.12 ]), LabeledPoint(1.0, [110.45, 25.53, 70.0])]
# where 0.0 and 1.0 are the Y variables for the two records, and the next vector is a vector of X1, X2,X3.
#So, every record is re-written as a LabeledPoint. We can convert a rdd to a LabeledPoint as follows:


# ------------------------------------------------------------------------------
# Step 3:
# Importing libraries for converting the data frame to a dense vector
# We need to convert this Data Frame to an RDD of LabeledPoint.
# ------------------------------------------------------------------------------

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

# ------------------------------------------------------------------------------
# Step 4(a):
# For the train and test data, the structure is as follows:
#    The first row is Customer_ID; second row is the Y variable; and the third row onwards are the X Variables
# ------------------------------------------------------------------------------

transformed_train_df = data_train_rdd.map(lambda row: LabeledPoint(row[1], Vectors.dense(row[2:])))
transformed_test_df = data_test_rdd.map(lambda row: LabeledPoint(row[1], Vectors.dense(row[2:])))


# ------------------------------------------------------------------------------
# Step 4(b):
# For the scoring data, the structure is as follows:
#   The first row is the Customer_ID; and the second row onwards are the X Variables
# ------------------------------------------------------------------------------

transformed_score_df = data_score_rdd.map(lambda row: LabeledPoint(row[0], Vectors.dense(row[1:])))


# ------------------------------------------------------------------------------
# Step 5:
# Random Forest model
# ------------------------------------------------------------------------------

from pyspark.mllib.tree import RandomForest

# ------------------------------------------------------------------------------
# Step 5(a):
# Parameters for the Random Forest model
# ------------------------------------------------------------------------------

RANDOM_SEED = 10904
RF_NUM_TREES = 100
RF_MAX_DEPTH = 4
RF_MAX_BINS = 100


# ------------------------------------------------------------------------------
# Step 5(b):
# Training a Random Forest model on the dataset
# ------------------------------------------------------------------------------

model = RandomForest.trainClassifier(transformed_train_df, \
    numClasses=2, categoricalFeaturesInfo={}, \
    numTrees=RF_NUM_TREES, featureSubsetStrategy="log2", \
    impurity="entropy", maxDepth=RF_MAX_DEPTH,  \
    maxBins=RF_MAX_BINS, seed=RANDOM_SEED)


# ------------------------------------------------------------------------------
# Step 5(c):
# Make predictions and compute accuracy
# ------------------------------------------------------------------------------

predictions = model.predict(transformed_test_df.map(lambda x: x.features))
labels_and_predictions = transformed_test_df.map(lambda x: x.label).zip(predictions)
model_accuracy = labels_and_predictions.filter(lambda x: x[0] == x[1]).count() / float(transformed_test_df.count())
print("Model accuracy: %.3f%%" % (model_accuracy * 100))


# ------------------------------------------------------------------------------
# Step 5(d):
# Model evaluation
# ------------------------------------------------------------------------------

from pyspark.mllib.evaluation import BinaryClassificationMetrics

metrics = BinaryClassificationMetrics(labels_and_predictions)
print("Area under Precision/Recall (PR) curve: %.f" % (metrics.areaUnderPR * 100))
print("Area under Receiver Operating Characteristic (ROC) curve: %.3f" % (metrics.areaUnderROC * 100))


# ------------------------------------------------------------------------------
# Step 6:
# Scoring dataset
# ------------------------------------------------------------------------------
score_predictions = model.predict(transformed_score_df.map(lambda x: x.features))
score_labels_and_predictions = transformed_score_df.map(lambda x: x.label).zip(score_predictions)


# ------------------------------------------------------------------------------
# Step 7:
# Creating a final hive table for the scored data
# ------------------------------------------------------------------------------

score_df = spark.createDataFrame(score_labels_and_predictions)
score_df.createOrReplaceTempView("scoring_file")
spark.sql("drop table if exists my_db.pyspark_scored_sample")
spark.sql("create table my_db.pyspark_scored_sample (Customer_ID bigint, prediction int)")
spark.sql("insert into my_db.pyspark_scored_sample select * from scoring_file")


Tuesday, August 8, 2017

Generic Python Code for Classification Techniques

Here is a generic python code to run different classification techniques like Logistic Regression, Decision Tree, Random Forest and Support Vector Machines (SVM). The code is automated to get different metrics like Concordance and Discordance, Classification table, Precision and Recall rates, Accuracy as well as the estimates of coefficients or Variable Importance.

To get a detailed look at a sample data-set and output in python notebook format, you can visit my github page.

The code contains two parts - firstly, the generic function to run the technique and get all the metrics, and secondly, the code to call the function with right parameters.

We'll start with importing the necessary libraries and packages.

# Import required libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import LogisticRegression
from sklearn.cross_validation import train_test_split
from sklearn.cross_validation import cross_val_score
from sklearn.cross_validation import KFold   #For K-fold cross validation
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier, export_graphviz
from sklearn import svm
from sklearn import metrics
from scipy import stats as stat
from ggplot import *
import statsmodels.formula.api as smf

Part 1: The generic function:

#Generic function for making a classification model and accessing performance:
def classification_model(model, data, predictors, outcome, testdata):        
        
    #Fit the model:
    model.fit(data[predictors],data[outcome])
    
    #THE FOLLOWING CODE CALCULATES THE ESTIMATES OF COEFFICIENTS
    def MLE_coefficient():
        if name_model in ['LinearSVC','LogisticRegression']:           
            Coefficient = pd.DataFrame(model.coef_.tolist()).transpose()
            Intercept = pd.DataFrame(model.intercept_)
            Variable = Intercept.append(Coefficient, ignore_index=True)
            df1= pd.DataFrame(['Intercept'])
            df2 = pd.DataFrame(predictors)
            Predictor = df1.append(df2, ignore_index=True)
            MLE = pd.merge(Predictor, Variable, how='inner', left_index=True, right_index=True)
            MLE = pd.merge(Predictor, Variable, how='inner', left_index=True, right_index=True)
            MLE.columns = ['Independent_Variables','Beta_Values']
            print('\nMaximum Likelihood Estimator Table: \n==================================== \n', MLE)
        elif name_model in ['RandomForestClassifier','DecisionTreeClassifier']:
            Variable = pd.DataFrame(model.feature_importances_.tolist())
            Predictor = pd.DataFrame(predictors)
            MLE = pd.merge(Predictor, Variable, how='inner', left_index=True, right_index=True)
            MLE.columns = ['Independent_Variables','Feature_Importance']
            print('\nFeature Importance Table: \n==================================== \n', MLE)

    #THE FOLLOWING CODE CALCULATES CONCORDANCE AND DISCORDANCE
    def concordance_discordance():
        Probability = model.predict_proba(data[predictors])
        Probability1 = pd.DataFrame(Probability)
        Probability1.columns = ['Prob_0','Prob_1']
        TruthTable = pd.merge(data[[outcome]], Probability1[['Prob_1']], how='inner', left_index=True, right_index=True)
        zeros = TruthTable[(TruthTable[outcome]==0)].reset_index().drop(['index'], axis = 1)
        ones = TruthTable[(TruthTable[outcome]==1)].reset_index().drop(['index'], axis = 1)
        from bisect import bisect_left, bisect_right
        zeros_list = sorted([zeros.iloc[j,1] for j in zeros.index])
        zeros_length = len(zeros_list)
        disc = 0
        ties = 0
        conc = 0
        for i in ones.index:
            cur_conc = bisect_left(zeros_list, ones.iloc[i,1])
            cur_ties = bisect_right(zeros_list, ones.iloc[i,1]) - cur_conc
            conc += cur_conc
            ties += cur_ties
        pairs_tested = zeros_length * len(ones.index)
        disc = pairs_tested - conc - ties
        print("Pairs = ", pairs_tested)
        print("Conc = ", conc)
        print("Disc = ", disc)
        print("Tied = ", ties)
        concordance = round(conc/pairs_tested,2)
        discordance = round(disc/pairs_tested,2)
        ties_perc = round(ties/pairs_tested,2)
        Somers_D = round((conc - disc)/pairs_tested,2)
        print("Concordance = ", concordance, "%")
        print("Discordance = ", discordance, "%")
        print("Tied = ", ties_perc, "%")
        print("Somers D = ", Somers_D)
    
    #THE FOLLOWING CODE GIVES OUT THE CLASSIFICATION TABLE TO DETERMINE P BASED ON ROC CURVE
    def classification_table_func():
        Probability = model.predict_proba(data[predictors])
        Probability1 = pd.DataFrame(Probability)
        Probability1.columns = ['Prob_0','Prob_1']
        prob_min = round(Probability1.Prob_1.min() + 0.01,2)
        prob_max = round(Probability1.Prob_1.max() - 0.01,2)
        prob_val = np.arange(prob_min, prob_max, 0.01).tolist()
        ROC = []
        CT = []
        for p in prob_val:
            TruthTable = pd.merge(data[[outcome]], Probability1, how='inner', left_index=True, right_index=True)
            TruthTable.ix[TruthTable['Prob_1'] > p, 'Predicted'] = 1
            TruthTable.ix[TruthTable['Prob_1'] <= p, 'Predicted'] = 0
            Freq = pd.crosstab(TruthTable[outcome],TruthTable['Predicted'])
            TN = Freq.iloc[0,0]
            FN = Freq.iloc[1,0]
            FP = Freq.iloc[0,1]
            TP = Freq.iloc[1,1]
            Precision = TP/(TP + FP)
            Recall = TP/(TP + FN)
            F_measure = stat.hmean([ Precision,Recall])
            Specificity = TN/(TN + FP)
            Specificity_Inv = 1 - Specificity
            Sensitivity = Recall
            ROC.append({'V1_Prob': p, 'V2_Specificity_Inverse': Specificity_Inv, 'V3_Sensitivity': Sensitivity })
            CT.append({'V1_Prob': p, 'V2_Precision': Precision, 'V3_Recall': Recall, 'V4_F_Measure': F_measure })
        ROC_data = pd.DataFrame(ROC)
        FPR = ROC_data.V2_Specificity_Inverse
        TPR = ROC_data.V3_Sensitivity
        Classification_Table = pd.DataFrame(CT)
        #print('\nClassification Table: \n=============================== \n', Classification_Table)
        global threshold_prob
        threshold_prob = Classification_Table.iloc[Classification_Table['V4_F_Measure'].argmax(),0]
        print('\nThreshold Probability Value: \t', threshold_prob)
        #THE FOLLOWING CODE CREATES THE FREQUENCY TABLE FOR RECALL AND PRECISION CALCULATION
        def frequency_table(prob):
            TruthTable = pd.merge(data[[outcome]], Probability1, how='inner', left_index=True, right_index=True)
            TruthTable.ix[TruthTable['Prob_1'] > prob, 'Predicted'] = 1
            TruthTable.ix[TruthTable['Prob_1'] <= prob, 'Predicted'] = 0
            Freq = pd.crosstab(TruthTable[outcome],TruthTable['Predicted'])
            TN = Freq.iloc[0,0]
            FN = Freq.iloc[1,0]
            FP = Freq.iloc[0,1]
            TP = Freq.iloc[1,1]
            Precision = TP/(TP + FP)
            Recall = TP/(TP + FN)
            F_measure = stat.hmean([ Precision,Recall])
            Correct_Identification = TN + TP
            Obs_Count = Freq[0].sum() + Freq[1].sum()
            Accuracy = Correct_Identification/Obs_Count
            print('\n==================================================')
            print('Accuracy (Using Precision-Recall and F-Measure):')
            print('==================================================')
            print('In-Sample Accuracy: \n=======================================')
            print('\nFrequency Table: \n===============================\n', Freq)
            print('\nAccuracy: %s'% "{0:.3%}".format(Accuracy))
            print('Precision: %s'% "{0:.3%}".format(Precision))
            print('Recall: %s' % "{0:.3%}".format(Recall))
            print('F-Measure: %s'% "{0:.3%}".format(F_measure))
            #print('\nFrequency Table (Percentage): \n=============================== \n', pd.crosstab(TruthTable[outcome],TruthTable['Predicted']).apply(lambda r: r/Obs_Count))
            #print('\nFrequency Table (Percentage of Actual Totals): \n=============================== \n', pd.crosstab(TruthTable[outcome],TruthTable['Predicted']).apply(lambda r: r/r.sum(), axis=1))
            #print('\nFrequency Table (Percentage of Predicted Totals): \n=============================== \n', pd.crosstab(TruthTable[outcome],TruthTable['Predicted']).apply(lambda r: r/r.sum(), axis=0))      
        frequency_table(threshold_prob)
        
        
    #THE FOLLOWING CODE GIVES OUT THE CLASSIFICATION TABLE TO DETERMINE P BASED ON ROC CURVE
    def cross_validation():
        classification_table_func()
        Probability = model.predict_proba(testdata[predictors])
        Probability1 = pd.DataFrame(Probability)
        Probability1.columns = ['Prob_0','Prob_1']

        #THE FOLLOWING CODE CREATES THE FREQUENCY TABLE FOR RECALL AND PRECISION CALCULATION
        def frequency_table(prob):
            TruthTable = pd.merge(testdata[[outcome]], Probability1, how='inner', left_index=True, right_index=True)
            TruthTable.ix[TruthTable['Prob_1'] > prob, 'Predicted'] = 1
            TruthTable.ix[TruthTable['Prob_1'] <= prob, 'Predicted'] = 0
            Freq = pd.crosstab(TruthTable[outcome],TruthTable['Predicted'])
            TN = Freq.iloc[0,0]
            FN = Freq.iloc[1,0]
            FP = Freq.iloc[0,1]
            TP = Freq.iloc[1,1]
            Precision = TP/(TP + FP)
            Recall = TP/(TP + FN)
            F_measure = stat.hmean([ Precision,Recall])
            Correct_Identification = TN + TP
            Obs_Count = Freq[0].sum() + Freq[1].sum()
            Accuracy = Correct_Identification/Obs_Count
            print('\nOut-of-Sample Accuracy: \n=======================================')
            print('\nFrequency Table: \n===============================\n', Freq)
            print('\nAccuracy: %s' % "{0:.3%}".format(Accuracy))
            print('Precision: %s'% "{0:.3%}".format(Precision))
            print('Recall: %s'% "{0:.3%}".format( Recall))
            print('F-Measure: %s'% "{0:.3%}".format( F_measure))
            #print('\nFrequency Table (Percentage): \n=============================== \n', pd.crosstab(TruthTable[outcome],TruthTable['Predicted']).apply(lambda r: r/Obs_Count))    
        frequency_table(threshold_prob)
    
    def validation():
        
        error = []
        accuracy = []
        print('\n==================================================')
        print('Accuracy (Using inbuilt "accuracy_score" function):')
        print('================================================== \n')
        
        model.fit(data[predictors],data[outcome])
        error = (model.score(data[predictors], data[outcome]))
        predictions = model.predict(data[predictors])
        accuracy = (metrics.accuracy_score(predictions,data[outcome]))
        print ("In-Sample Accuracy: %s" % "{0:.3%}".format(accuracy))                    
        
        
        error = []
        accuracy = []        
        error = (model.score(testdata[predictors], testdata[outcome]))
        predictions = model.predict(testdata[predictors])
        accuracy = (metrics.accuracy_score(predictions,testdata[outcome]))
        print ("Out-of-Sample Accuracy: %s" % "{0:.3%}".format(accuracy))                    
    
    name_model = str(ascii(repr(model))).replace('"','').split('(')[0]
    print(name_model)
    MLlist = ['LogisticRegression', 'DecisionTreeClassifier', 'RandomForestClassifier' ,'SVC', 'LinearSVC']
    
    if name_model == 'LogisticRegression':
        MLE_coefficient()
        concordance_discordance()
        cross_validation()
        validation()
        
    elif name_model == 'DecisionTreeClassifier':
        #MLE_coefficient()
        concordance_discordance()
        cross_validation()
        validation()
        
    elif name_model == 'RandomForestClassifier':
        MLE_coefficient()
        concordance_discordance()
        cross_validation()
        validation()
        
    elif name_model == 'SVC':
        #MLE_coefficient()
        concordance_discordance()
        cross_validation()
        validation()

    elif name_model == 'LinearSVC':
        MLE_coefficient()
        #concordance_discordance()
        #cross_validation()
        validation()

Part 2: Calling the function:

outcome_var = 'Response'
predictor_var = ['Var1','Var2','Var3_2','Var3_3','Var3_4']
model = LogisticRegression()
#model = DecisionTreeClassifier()
#model = RandomForestClassifier(n_estimators=10)
#model = svm.SVC(probability=True, C=0.1, gamma='auto', kernel='rbf')
classification_model(model,train,predictor_var,outcome_var,test)