PySpark count rows on condition

count doesn’t sum Trues, it only counts the number of non null values. To count the True values, you need to convert the conditions to 1 / 0 and then sum: import pyspark.sql.functions as F cnt_cond = lambda cond: F.sum(F.when(cond, 1).otherwise(0)) test.groupBy(‘x’).agg( cnt_cond(F.col(‘y’) > 12453).alias(‘y_cnt’), cnt_cond(F.col(‘z’) > 230).alias(‘z_cnt’) ).show() +—+—–+—–+ | x|y_cnt|z_cnt| +—+—–+—–+ | bn| …

Read more

Determining optimal number of Spark partitions based on workers, cores and DataFrame size

Yes, a spark application has one and only Driver. What is the relationship between numWorkerNodes and numExecutors? A worker can host multiple executors, you can think of it like the worker to be the machine/node of your cluster and the executor to be a process (executing in a core) that runs on that worker. So …

Read more

Spark add new column to dataframe with value from previous row

You can use lag window function as follows from pyspark.sql.functions import lag, col from pyspark.sql.window import Window df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF([“id”, “num”]) w = Window().partitionBy().orderBy(col(“id”)) df.select(“*”, lag(“num”).over(w).alias(“new_col”)).na.drop().show() ## +—+—+——-+ ## | id|num|new_col| ## +—+—+——-| ## | 2|3.0| 5.0| ## | 3|7.0| 3.0| ## | 4|9.0| 7.0| ## +—+—+——-+ but …

Read more

Spark RDD – Mapping with extra arguments

You can use an anonymous function either directly in a flatMap json_data_rdd.flatMap(lambda j: processDataLine(j, arg1, arg2)) or to curry processDataLine f = lambda j: processDataLine(j, arg1, arg2) json_data_rdd.flatMap(f) You can generate processDataLine like this: def processDataLine(arg1, arg2): def _processDataLine(dataline): return … # Do something with dataline, arg1, arg2 return _processDataLine json_data_rdd.flatMap(processDataLine(arg1, arg2)) toolz library provides …

Read more

PySpark: modify column values when another column value satisfies a condition

You can use when and otherwise like – from pyspark.sql.functions import * df\ .withColumn(‘Id_New’,when(df.Rank <= 5,df.Id).otherwise(‘other’))\ .drop(df.Id)\ .select(col(‘Id_New’).alias(‘Id’),col(‘Rank’))\ .show() this gives output as – +—–+—-+ | Id|Rank| +—–+—-+ | a| 5| |other| 7| |other| 8| | d| 1| +—–+—-+

How take a random row from a PySpark DataFrame?

You can simply call takeSample on a RDD: df = sqlContext.createDataFrame( [(1, “a”), (2, “b”), (3, “c”), (4, “d”)], (“k”, “v”)) df.rdd.takeSample(False, 1, seed=0) ## [Row(k=3, v=’c’)] If you don’t want to collect you can simply take a higher fraction and limit: df.sample(False, 0.1, seed=0).limit(1) Don’t pass a seed, and you should get a different …

Read more