PySpark count rows on condition

count doesn’t sum Trues, it only counts the number of non null values. To count the True values, you need to convert the conditions to 1 / 0 and then sum: import pyspark.sql.functions as F cnt_cond = lambda cond: F.sum(F.when(cond, 1).otherwise(0)) test.groupBy(‘x’).agg( cnt_cond(F.col(‘y’) > 12453).alias(‘y_cnt’), cnt_cond(F.col(‘z’) > 230).alias(‘z_cnt’) ).show() +—+—–+—–+ | x|y_cnt|z_cnt| +—+—–+—–+ | bn| …

Read more

Mind blown: RDD.zip() method

It is not true that RDDs are always unordered. An RDD has a guaranteed order if it is the result of a sortBy operation, for example. An RDD is not a set; it can contain duplicates. Partitioning is not opaque to the caller, and can be controlled and queried. Many operations do preserve both partitioning …

Read more

Determining optimal number of Spark partitions based on workers, cores and DataFrame size

Yes, a spark application has one and only Driver. What is the relationship between numWorkerNodes and numExecutors? A worker can host multiple executors, you can think of it like the worker to be the machine/node of your cluster and the executor to be a process (executing in a core) that runs on that worker. So …

Read more

Spark add new column to dataframe with value from previous row

You can use lag window function as follows from pyspark.sql.functions import lag, col from pyspark.sql.window import Window df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF([“id”, “num”]) w = Window().partitionBy().orderBy(col(“id”)) df.select(“*”, lag(“num”).over(w).alias(“new_col”)).na.drop().show() ## +—+—+——-+ ## | id|num|new_col| ## +—+—+——-| ## | 2|3.0| 5.0| ## | 3|7.0| 3.0| ## | 4|9.0| 7.0| ## +—+—+——-+ but …

Read more

How can I update a broadcast variable in spark streaming?

Extending the answer By @Rohan Aletty. Here is a sample code of a BroadcastWrapper that refresh broadcast variable based on some ttl public class BroadcastWrapper { private Broadcast<ReferenceData> broadcastVar; private Date lastUpdatedAt = Calendar.getInstance().getTime(); private static BroadcastWrapper obj = new BroadcastWrapper(); private BroadcastWrapper(){} public static BroadcastWrapper getInstance() { return obj; } public JavaSparkContext getSparkContext(SparkContext sc) …

Read more

Spark RDD – Mapping with extra arguments

You can use an anonymous function either directly in a flatMap json_data_rdd.flatMap(lambda j: processDataLine(j, arg1, arg2)) or to curry processDataLine f = lambda j: processDataLine(j, arg1, arg2) json_data_rdd.flatMap(f) You can generate processDataLine like this: def processDataLine(arg1, arg2): def _processDataLine(dataline): return … # Do something with dataline, arg1, arg2 return _processDataLine json_data_rdd.flatMap(processDataLine(arg1, arg2)) toolz library provides …

Read more

Apache Spark does not delete temporary directories

Three SPARK_WORKER_OPTS exists to support the worker application folder cleanup, copied here for further reference: from Spark Doc spark.worker.cleanup.enabled, default value is false, Enable periodic cleanup of worker / application directories. Note that this only affects standalone mode, as YARN works differently. Only the directories of stopped applications are cleaned up. spark.worker.cleanup.interval, default is 1800, …

Read more