76 lines
3.8 KiB
Python
76 lines
3.8 KiB
Python
import pyspark
|
|
from pyspark.sql import SparkSession
|
|
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
|
|
from pyspark.ml.evaluation import BinaryClassificationEvaluator
|
|
from pyspark.ml import Pipeline, Model
|
|
from pyspark.ml.classification import RandomForestClassifier
|
|
import json
|
|
|
|
''' Read data with Spark SQL '''
|
|
spark = SparkSession.builder.getOrCreate()
|
|
df_data = spark.read.csv(path="german_credit_data_biased_training.csv", sep=",", header=True, inferSchema=True)
|
|
df_data.head()
|
|
|
|
spark_df = df_data
|
|
(train_data, test_data) = spark_df.randomSplit([0.8, 0.2], 24)
|
|
|
|
print("Number of records for training: " + str(train_data.count()))
|
|
print("Number of records for evaluation: " + str(test_data.count()))
|
|
|
|
spark_df.printSchema()
|
|
|
|
si_CheckingStatus = StringIndexer(inputCol = 'CheckingStatus', outputCol = 'CheckingStatus_IX')
|
|
si_CreditHistory = StringIndexer(inputCol = 'CreditHistory', outputCol = 'CreditHistory_IX')
|
|
si_LoanPurpose = StringIndexer(inputCol = 'LoanPurpose', outputCol = 'LoanPurpose_IX')
|
|
si_ExistingSavings = StringIndexer(inputCol = 'ExistingSavings', outputCol = 'ExistingSavings_IX')
|
|
si_EmploymentDuration = StringIndexer(inputCol = 'EmploymentDuration', outputCol = 'EmploymentDuration_IX')
|
|
si_Sex = StringIndexer(inputCol = 'Sex', outputCol = 'Sex_IX')
|
|
si_OthersOnLoan = StringIndexer(inputCol = 'OthersOnLoan', outputCol = 'OthersOnLoan_IX')
|
|
si_OwnsProperty = StringIndexer(inputCol = 'OwnsProperty', outputCol = 'OwnsProperty_IX')
|
|
si_InstallmentPlans = StringIndexer(inputCol = 'InstallmentPlans', outputCol = 'InstallmentPlans_IX')
|
|
si_Housing = StringIndexer(inputCol = 'Housing', outputCol = 'Housing_IX')
|
|
si_Job = StringIndexer(inputCol = 'Job', outputCol = 'Job_IX')
|
|
si_Telephone = StringIndexer(inputCol = 'Telephone', outputCol = 'Telephone_IX')
|
|
si_ForeignWorker = StringIndexer(inputCol = 'ForeignWorker', outputCol = 'ForeignWorker_IX')
|
|
|
|
si_Label = StringIndexer(inputCol="Risk", outputCol="label").fit(spark_df)
|
|
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=si_Label.labels)
|
|
va_features = VectorAssembler(inputCols=["CheckingStatus_IX", "CreditHistory_IX", "LoanPurpose_IX", "ExistingSavings_IX", "EmploymentDuration_IX", "Sex_IX", \
|
|
"OthersOnLoan_IX", "OwnsProperty_IX", "InstallmentPlans_IX", "Housing_IX", "Job_IX", "Telephone_IX", "ForeignWorker_IX", \
|
|
"LoanDuration", "LoanAmount", "InstallmentPercent", "CurrentResidenceDuration", "LoanDuration", "Age", "ExistingCreditsCount", \
|
|
"Dependents"], outputCol="features")
|
|
|
|
''' Train Model with RF classifier '''
|
|
classifier = RandomForestClassifier(featuresCol="features")
|
|
|
|
pipeline = Pipeline(stages=[si_CheckingStatus, si_CreditHistory, si_EmploymentDuration, si_ExistingSavings, si_ForeignWorker, si_Housing, si_InstallmentPlans, si_Job, si_LoanPurpose, si_OthersOnLoan,\
|
|
si_OwnsProperty, si_Sex, si_Telephone, si_Label, va_features, classifier, label_converter])
|
|
model = pipeline.fit(train_data)
|
|
|
|
predictions = model.transform(test_data)
|
|
evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol="prediction")
|
|
area_under_curve = evaluatorDT.evaluate(predictions)
|
|
|
|
# default evaluation is areaUnderROC
|
|
print("areaUnderROC = %g" % area_under_curve)
|
|
print(model)
|
|
print(predictions)
|
|
|
|
# Persistent model, pipeline, and training data
|
|
model.write().overwrite().save('model')
|
|
train_data.write.option("header", "true").mode("overwrite").csv('train_data')
|
|
|
|
evaluation_metrics = {
|
|
'metrics': [
|
|
{
|
|
"name": "areaUnderROC",
|
|
"value": area_under_curve,
|
|
"threshold": 0.7
|
|
}
|
|
]
|
|
}
|
|
|
|
with open('evaluation.json', 'w') as f:
|
|
json.dump(evaluation_metrics, f, indent=2)
|
|
f.close()
|