pyspark
Pyspark End-to-end example¶
```python
from comet_ml import Experiment
from pyspark.ml.linalg import Vectors from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext
from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
def run_logistic_regression(training_data, test_data): experiment = Experiment(project_name='pyspark-example')
# models
lr = LogisticRegression(
maxIter=10,
regParam=0.3,
elasticNetParam=0.8)
model = lr.fit(training_data)
training_summary = model.summary
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
metrics = {
'train_auc_score': training_summary.areaUnderROC,
'train_accuracy': training_summary.accuracy,
'test_auc_roc_score': evaluator.evaluate(predictions),
'test_auc_pr_score': evaluator.evaluate(
predictions, {evaluator.metricName: "areaUnderPR"})
}
experiment.log_parameters(lr._input_kwargs) #logging hyperparams to Comet.ml
experiment.log_metrics(metrics) #logging metric to Comet.ml
def main(): df = sqlContext.read.format('com.databricks.spark.csv').options( header='true', inferschema='true').load('./data/breast_cancer.csv')
# Spliting in train and test set. Beware : It sorts the dataset
(train_df, test_df) = df.randomSplit([0.7, 0.3])
training_data = train_df.rdd.map(lambda x: (
Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])
test_data = test_df.rdd.map(lambda x: (
Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])
run_logistic_regression(training_data, test_data)
if name == 'main': main() ```