In this fifth episode of DataFriday, I will dig into the schema linked to each dataframe in Apache Spark. I will rename columns, create columns, and analyze the result.
I am also briefly talking about partitions in Apache Spark.
This post is strictly following the first lab (lab #200) of Spark in Action, 2nd edition. The dataset being used is the Wake County (North Carolina, USA) list of restaurants. As the book is now published, I will go a bit over the book as well.
The code is available on GitHub.
For convenience, the Java code is added here. You will see the main steps of this small application:
- Open an Apache Spark session.
- Load a CSV file in a dataframe.
- Manipulate the dataframe’s schema: rename some columns, drop others.
package net.jgp.books.spark.ch03.lab200_ingestion_schema_manipulation;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;
import org.apache.spark.Partition;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* CSV ingestion in a dataframe and manipulation.
*
* @author jgp
*/
public class IngestionSchemaManipulationApp {
/**
* main() is your entry point to the application.
*
* @param args
*/
public static void main(String[] args) {
IngestionSchemaManipulationApp app =
new IngestionSchemaManipulationApp();
app.start();
}
/**
* The processing code.
*/
private void start() {
// Creates a session on a local master
SparkSession spark = SparkSession.builder()
.appName("Restaurants in Wake County, NC")
.master("local")
.getOrCreate();
// Reads a CSV file with header, called
// Restaurants_in_Wake_County_NC.csv,
// stores it in a dataframe
Dataset<Row> df = spark.read().format("csv")
.option("header", "true")
.load("data/Restaurants_in_Wake_County_NC.csv");
System.out.println("*** Right after ingestion");
df.show(5);
df.printSchema();
System.out.println("We have " + df.count() + " records.");
// Let's transform our dataframe
df = df.withColumn("county", lit("Wake"))
.withColumnRenamed("HSISID", "datasetId")
.withColumnRenamed("NAME", "name")
.withColumnRenamed("ADDRESS1", "address1")
.withColumnRenamed("ADDRESS2", "address2")
.withColumnRenamed("CITY", "city")
.withColumnRenamed("STATE", "state")
.withColumnRenamed("POSTALCODE", "zip")
.withColumnRenamed("PHONENUMBER", "tel")
.withColumnRenamed("RESTAURANTOPENDATE", "dateStart")
.withColumnRenamed("FACILITYTYPE", "type")
.withColumnRenamed("X", "geoX")
.withColumnRenamed("Y", "geoY")
.drop("OBJECTID")
.drop("PERMITID")
.drop("GEOCODESTATUS");
df = df.withColumn("id", concat(
df.col("state"),
lit("_"),
df.col("county"), lit("_"),
df.col("datasetId")));
// Shows at most 5 rows from the dataframe
System.out.println("*** Dataframe transformed");
df.show(5);
// for book only
Dataset<Row> dfUsedForBook = df.drop("address2")
.drop("zip")
.drop("tel")
.drop("dateStart")
.drop("geoX")
.drop("geoY")
.drop("address1")
.drop("datasetId");
dfUsedForBook.show(5, 15);
// end
df.printSchema();
System.out.println("*** Looking at partitions");
Partition[] partitions = df.rdd().partitions();
int partitionCount = partitions.length;
System.out.println("Partition count before repartition: " +
partitionCount);
df = df.repartition(4);
System.out.println("Partition count after repartition: " +
df.rdd().partitions().length);
}
}
This is the first lab of chapter 3, more labs are coming… More details are available in the book as well.
More resources:
The YouTube channel for DataFriday lists all episodes. You can attend the live show every Friday morning at 8 AM EST, check out the DataFriday page.