
In this episode, you will learn about doing a basic ETL (extract, transform, and load) operation using Apache Spark. You will load a basic CSV file with Apache Spark, make a basic transformation using static functions, and save the result in a PostgreSQL. The tools used are Eclipse and Java.
If anyone can explain me why the L in ETL stands for load, please educate me… I would have gone for ETS with S as Save or ETP with P standing for Publish. It is a mistery for me…
The code is available on GitHub. The full description of the process is available in the second chapter of Spark in Action, 2nd edition. You can have a look at it on Manning’s live book website.
For convenience, the Java code is added here. You will see the main steps of this small application. After getting a Spark session, I will:
- Read the CSV file (extract).
- Perform a basic data transformation.
- Save the result to a PostgreSQL database (load).
package net.jgp.books.spark.ch02.lab100_csv_to_db; import static org.apache.spark.sql.functions.concat; import static org.apache.spark.sql.functions.lit; import java.util.Properties; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; /** * CSV to a relational database. * * @author jgp */ public class CsvToRelationalDatabaseApp { /** * main() is your entry point to the application. * * @param args */ public static void main(String[] args) { CsvToRelationalDatabaseApp app = new CsvToRelationalDatabaseApp(); app.start(); } /** * The processing code. */ private void start() { // Creates a session on a local master SparkSession spark = SparkSession.builder() .appName("CSV to DB") .master("local") .getOrCreate(); // Step 1: Ingestion // --------- // Reads a CSV file with header, called authors.csv, stores it in a // dataframe Dataset<Row> df = spark.read() .format("csv") .option("header", true) .load("data/authors.csv"); // Step 2: Transform // --------- // Creates a new column called "name" as the concatenation of lname, a // virtual column containing ", " and the fname column df = df.withColumn( "name", concat(df.col("lname"), lit(", "), df.col("fname"))); // Step 3: Save // ---- // The connection URL, assuming your PostgreSQL instance runs locally on // the // default port, and the database we use is "spark_labs" String dbConnectionUrl = "jdbc:postgresql://localhost/spark_labs"; // Properties to connect to the database, the JDBC driver is part of our // pom.xml Properties prop = new Properties(); prop.setProperty("driver", "org.postgresql.Driver"); prop.setProperty("user", "jgp"); prop.setProperty("password", "Spark<3Java"); // Write in a table called ch02 df.write() .mode(SaveMode.Overwrite) .jdbc(dbConnectionUrl, "ch02", prop); System.out.println("Process complete"); } }
This is a basic ETL process, but it illustrates the basics and simplicity with which Apache Spark can transfer and modify data.
More resources:
The YouTube channel for DataFriday lists all episodes. You can attend the live show every Friday morning at 8 AM EST on Zoom.