import pyspark from pyspark.sql import SparkSession from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml import Pipeline, Model from pyspark.ml.classification import RandomForestClassifier import json ''' Read data with Spark SQL ''' spark = SparkSession.builder.getOrCreate() df_data = spark.read.csv(path="german_credit_data_biased_training.csv", sep=",", header=True, inferSchema=True) df_data.head() spark_df = df_data (train_data, test_data) = spark_df.randomSplit([0.8, 0.2], 24) print("Number of records for training: " + str(train_data.count())) print("Number of records for evaluation: " + str(test_data.count())) spark_df.printSchema() si_CheckingStatus = StringIndexer(inputCol = 'CheckingStatus', outputCol = 'CheckingStatus_IX') si_CreditHistory = StringIndexer(inputCol = 'CreditHistory', outputCol = 'CreditHistory_IX') si_LoanPurpose = StringIndexer(inputCol = 'LoanPurpose', outputCol = 'LoanPurpose_IX') si_ExistingSavings = StringIndexer(inputCol = 'ExistingSavings', outputCol = 'ExistingSavings_IX') si_EmploymentDuration = StringIndexer(inputCol = 'EmploymentDuration', outputCol = 'EmploymentDuration_IX') si_Sex = StringIndexer(inputCol = 'Sex', outputCol = 'Sex_IX') si_OthersOnLoan = StringIndexer(inputCol = 'OthersOnLoan', outputCol = 'OthersOnLoan_IX') si_OwnsProperty = StringIndexer(inputCol = 'OwnsProperty', outputCol = 'OwnsProperty_IX') si_InstallmentPlans = StringIndexer(inputCol = 'InstallmentPlans', outputCol = 'InstallmentPlans_IX') si_Housing = StringIndexer(inputCol = 'Housing', outputCol = 'Housing_IX') si_Job = StringIndexer(inputCol = 'Job', outputCol = 'Job_IX') si_Telephone = StringIndexer(inputCol = 'Telephone', outputCol = 'Telephone_IX') si_ForeignWorker = StringIndexer(inputCol = 'ForeignWorker', outputCol = 'ForeignWorker_IX') si_Label = StringIndexer(inputCol="Risk", outputCol="label").fit(spark_df) label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=si_Label.labels) va_features = VectorAssembler(inputCols=["CheckingStatus_IX", "CreditHistory_IX", "LoanPurpose_IX", "ExistingSavings_IX", "EmploymentDuration_IX", "Sex_IX", \ "OthersOnLoan_IX", "OwnsProperty_IX", "InstallmentPlans_IX", "Housing_IX", "Job_IX", "Telephone_IX", "ForeignWorker_IX", \ "LoanDuration", "LoanAmount", "InstallmentPercent", "CurrentResidenceDuration", "LoanDuration", "Age", "ExistingCreditsCount", \ "Dependents"], outputCol="features") ''' Train Model with RF classifier ''' classifier = RandomForestClassifier(featuresCol="features") pipeline = Pipeline(stages=[si_CheckingStatus, si_CreditHistory, si_EmploymentDuration, si_ExistingSavings, si_ForeignWorker, si_Housing, si_InstallmentPlans, si_Job, si_LoanPurpose, si_OthersOnLoan,\ si_OwnsProperty, si_Sex, si_Telephone, si_Label, va_features, classifier, label_converter]) model = pipeline.fit(train_data) predictions = model.transform(test_data) evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol="prediction") area_under_curve = evaluatorDT.evaluate(predictions) # default evaluation is areaUnderROC print("areaUnderROC = %g" % area_under_curve) print(model) print(predictions) # Persistent model, pipeline, and training data model.write().overwrite().save('model') train_data.write.option("header", "true").mode("overwrite").csv('train_data') evaluation_metrics = { 'metrics': [ { "name": "areaUnderROC", "value": area_under_curve, "threshold": 0.7 } ] } with open('evaluation.json', 'w') as f: json.dump(evaluation_metrics, f, indent=2) f.close()