Mind blown: RDD.zip() method

It is not true that RDDs are always unordered. An RDD has a guaranteed order if it is the result of a sortBy operation, for example. An RDD is not a set; it can contain duplicates. Partitioning is not opaque to the caller, and can be controlled and queried. Many operations do preserve both partitioning … Read more

Determining optimal number of Spark partitions based on workers, cores and DataFrame size

Yes, a spark application has one and only Driver. What is the relationship between numWorkerNodes and numExecutors? A worker can host multiple executors, you can think of it like the worker to be the machine/node of your cluster and the executor to be a process (executing in a core) that runs on that worker. So … Read more

Apache Spark does not delete temporary directories

Three SPARK_WORKER_OPTS exists to support the worker application folder cleanup, copied here for further reference: from Spark Doc spark.worker.cleanup.enabled, default value is false, Enable periodic cleanup of worker / application directories. Note that this only affects standalone mode, as YARN works differently. Only the directories of stopped applications are cleaned up. spark.worker.cleanup.interval, default is 1800, … Read more

What are the differences between saveAsTable and insertInto in different SaveMode(s)?

DISCLAIMER I’ve been exploring insertInto for some time and although I’m far from an expert in this area I’m sharing the findings for greater good. Does insertInto always expect the table to exist? Yes (per the table name and the database). Moreover not all tables can be inserted into, i.e. a (permanent) table, a temporary … Read more

PySpark: modify column values when another column value satisfies a condition

You can use when and otherwise like – from pyspark.sql.functions import * df\ .withColumn(‘Id_New’,when(df.Rank <= 5,df.Id).otherwise(‘other’))\ .drop(df.Id)\ .select(col(‘Id_New’).alias(‘Id’),col(‘Rank’))\ .show() this gives output as – +—–+—-+ | Id|Rank| +—–+—-+ | a| 5| |other| 7| |other| 8| | d| 1| +—–+—-+

How spark read a large file (petabyte) when file can not be fit in spark’s main memory

First of all, Spark only starts reading in the data when an action (like count, collect or write) is called. Once an action is called, Spark loads in data in partitions – the number of concurrently loaded partitions depend on the number of cores you have available. So in Spark you can think of 1 … Read more

Spark DataFrame: count distinct values of every column

In pySpark you could do something like this, using countDistinct(): from pyspark.sql.functions import col, countDistinct df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns)) Similarly in Scala : import org.apache.spark.sql.functions.countDistinct import org.apache.spark.sql.functions.col df.select(df.columns.map(c => countDistinct(col(c)).alias(c)): _*) If you want to speed things up at the potential loss of accuracy, you could also use approxCountDistinct().

How to optimize shuffle spill in Apache Spark application

Learning to performance-tune Spark requires quite a bit of investigation and learning. There are a few good resources including this video. Spark 1.4 has some better diagnostics and visualisation in the interface which can help you. In summary, you spill when the size of the RDD partitions at the end of the stage exceed the … Read more

How to bin in PySpark?

You can use Bucketizer feature transfrom from ml library in spark. values = [(“a”, 23), (“b”, 45), (“c”, 10), (“d”, 60), (“e”, 56), (“f”, 2), (“g”, 25), (“h”, 40), (“j”, 33)] df = spark.createDataFrame(values, [“name”, “ages”]) from pyspark.ml.feature import Bucketizer bucketizer = Bucketizer(splits=[ 0, 6, 18, 60, float(‘Inf’) ],inputCol=”ages”, outputCol=”buckets”) df_buck = bucketizer.setHandleInvalid(“keep”).transform(df) df_buck.show() output … Read more