Here are a few quick recipes to solve some common issues with Apache Spark. All examples are based on Java 8 (although I do not use consciously any of the version 8 features) and Spark v1.6.2 and Spark v2.0.0. The examples (and more) are clonable from GitHub : https://github.com/jgperrin/net.jgp.labs.spark.git.
Build a DataFrame from a Text File
Spark v2.0.0
package net.jgp.labs.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class TextFileToDataFrame {
public static void main(String[] args) {
System.out.println("Working directory = " + System.getProperty("user.dir"));
TextFileToDataFrame app = new TextFileToDataFrame();
app.start();
}
private void start() {
SparkSession spark = SparkSession.builder()
.appName("DataFrame from Text File")
.master("local[*]")
.getOrCreate();
String filename = "data/simple-data-file.txt";
Dataset<Row> df = spark.read().text(filename);
df.show();
}
}
Spark v1.6.2
package net.jgp.labs.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
public class FirstTextFile {
public static void main(String[] args) {
System.out.println("Working directory = " + System.getProperty("user.dir"));
FirstTextFile app = new FirstTextFile();
app.start();
}
private void start() {
SparkConf conf = new SparkConf().setAppName("DataFrame from Text File").setMaster("local");
SparkContext sc = new SparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
String filename = "data/simple-data-file.txt";
DataFrame df = sqlContext.read().text(filename);
df.show();
}
}
Line 11 dumps the current directory, always useful if you lose your files (it never happens to me).
Output
And the output is:
Working Directory = /Users/jgp/git/net.jgp.labs.spark +-----+ |value| +-----+ | 1| | 2| | 3| | 4| | 5| | 6| | 7| +-----+
Given that data/simple-data-file.txt contains simply a list of integers from 1 to 7.
Connect Locally
Spark v2.0.0
package net.jgp.labs.spark;
import org.apache.spark.sql.SparkSession;
public class ConnectLocally {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Hello Spark").master("local").getOrCreate();
System.out.println("Hello, Spark v." + spark.version());
}
}
The output is:
Hello, Spark v.2.0.0
Spark v1.6.2
package net.jgp.labs.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
public class HelloSpark {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Hello Spark").setMaster("local");
SparkContext sc = new SparkContext(conf);
System.out.println("Hello, Spark v." + sc.version());
}
}
Connect Remotely
Example on how to connect to a remote Spark server/cluster to see if it is live.
Spark v2.0.0
package net.jgp.labs.spark;
import org.apache.spark.sql.SparkSession;
public class ConnectRemotely {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("myApp").master("spark://10.0.100.120:7077").getOrCreate();
System.out.println("Hello, Spark v." + spark.version());
}
}
Spark v1.6.2
package net.jgp.labs.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
public class ConnectRemotely {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("myApp").setMaster("spark://10.0.100.120:7077");
SparkContext sc = new SparkContext(conf);
System.out.println("Hello, Remote Spark v." + sc.version());
}
}
Reading a CSV file in a Dataset
Read a CSV file composed of tuples and add them to a Dataset. This example is using the new Apache Spark v2.0.0 feature for reading CSV.
Spark v2.0.0
package net.jgp.labs.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class CsvToDataset {
public static void main(String[] args) {
System.out.println("Working directory = " + System.getProperty("user.dir"));
CsvToDataset app = new CsvToDataset();
app.start();
}
private void start() {
SparkSession spark = SparkSession.builder().appName("CSV to Dataset").master("local").getOrCreate();
String filename = "data/tuple-data-file.csv";
Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
.option("header", "false").load(filename);
df.show();
}
}
Output
+---+---+ |_c0|_c1| +---+---+ | 1| 15| | 2| 25| | 3| 35| | 4| 45| | 5| 55| | 6| 65| | 7| 75| +---+---+
Migration from Databricks CSV parser
Note: using the Databricks version of the CSV parser, columns would be named C0, C1, and so on. Now, using the standard CSV parser, columns are called _c0, _c1…
You can add the following code before df.show() to turn your v2.0.0 code to be compatible with your v1.6.2.
int count = df.columns().length;
for (int i = 0; i < count; i++) {
String oldColName = "_c" + i;
String newColName = "C" + i;
df = df.withColumn(newColName, df.col(oldColName)).drop(oldColName);
}
Output:
+---+---+ | C0| C1| +---+---+ | 1| 15| | 2| 25| | 3| 35| | 4| 45| | 5| 55| | 6| 65| | 7| 75| +---+---+
Register and Call an Internal UDF (User Defined Function)
This basic UDF multiplies the value of an integer column by 2.
Spark v2.0.0
package net.jgp.labs.spark;
import static org.apache.spark.sql.functions.callUDF;
import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
public class BasicUdfFromTextFile implements Serializable {
private static final long serialVersionUID = 3492970200940899011L;
public static void main(String[] args) {
System.out.println("Working directory = " + System.getProperty("user.dir"));
BasicUdfFromTextFile app = new BasicUdfFromTextFile();
app.start();
}
private void start() {
SparkSession spark = SparkSession.builder().appName("CSV to Dataset").master("local").getOrCreate();
spark.udf().register("x2Multiplier", new UDF1<Integer, Integer>() {
private static final long serialVersionUID = -5372447039252716846L;
@Override
public Integer call(Integer x) {
return x * 2;
}
}, DataTypes.IntegerType);
String filename = "data/tuple-data-file.csv";
Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
.option("header", "false").load(filename);
df = df.withColumn("label", df.col("_c0")).drop("_c0");
df = df.withColumn("value", df.col("_c1")).drop("_c1");
df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType)));
df.show();
}
}
Spark v1.6.2
package net.jgp.labs.spark;
import static org.apache.spark.sql.functions.callUDF;
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
public class BasicUdfFromTextFile implements Serializable {
private static final long serialVersionUID = 3492970200940899011L;
public static void main(String[] args) {
BasicUdfFromTextFile app = new BasicUdfFromTextFile();
app.start();
}
private void start() {
SparkConf conf = new SparkConf().setAppName("Basic UDF from Text File").setMaster("local");
SparkContext sc = new SparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
sqlContext.udf().register("x2Multiplier", new UDF1&lt;Integer, Integer&gt;() {
private static final long serialVersionUID = -5372447039252716846L;
@Override
public Integer call(Integer x) {
return x * 2;
}
}, DataTypes.IntegerType);
String filename = "data/tuple-data-file.csv";
DataFrame df = sqlContext.read().format("com.databricks.spark.csv").option("inferSchema", "true")
.option("header", "false").load(filename);
df = df.withColumn("label", df.col("C0")).drop("C0");
df = df.withColumn("value", df.col("C1")).drop("C1");
df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType)));
df.show();
}
}
Output
+-----+-----+---+ |label|value| x2| +-----+-----+---+ | 1| 15| 30| | 2| 25| 50| | 3| 35| 70| | 4| 45| 90| | 5| 55|110| | 6| 65|130| | 7| 75|150| +-----+-----+---+
Register and Call an External UDF (User Defined Function)
This basic UDF multiplies the value of an integer column by 2. This example uses 2 files.
Spark v2.0.0
package net.jgp.labs.spark;
import static org.apache.spark.sql.functions.callUDF;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import net.jgp.labs.spark.udf.Multiplier2;
public class BasicExternalUdfFromTextFile {
public static void main(String[] args) {
System.out.println("Working directory = " + System.getProperty("user.dir"));
BasicExternalUdfFromTextFile app = new BasicExternalUdfFromTextFile();
app.start();
}
private void start() {
SparkSession spark = SparkSession.builder().appName("CSV to Dataset").master("local").getOrCreate();
spark.udf().register("x2Multiplier", new Multiplier2(), DataTypes.IntegerType);
String filename = "data/tuple-data-file.csv";
Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
.option("header", "false").load(filename);
df = df.withColumn("label", df.col("_c0")).drop("_c0");
df = df.withColumn("value", df.col("_c1")).drop("_c1");
df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType)));
df.show();
}
}
Spark v1.6.2
package net.jgp.labs.spark;
import static org.apache.spark.sql.functions.callUDF;
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
import net.jgp.labs.spark.udf.Multiplier2;
public class BasicExternalUdfFromTextFile {
public static void main(String[] args) {
System.out.println("Working directory = " + System.getProperty("user.dir"));
BasicExternalUdfFromTextFile app = new BasicExternalUdfFromTextFile();
app.start();
}
private void start() {
SparkConf conf = new SparkConf().setAppName("Basic UDF from Text File").setMaster("local");
SparkContext sc = new SparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
sqlContext.udf().register("x2Multiplier", new Multiplier2(), DataTypes.IntegerType);
String filename = "data/tuple-data-file.csv";
DataFrame df = sqlContext.read().format("com.databricks.spark.csv").option("inferSchema", "true")
.option("header", "false").load(filename);
df = df.withColumn("label", df.col("C0")).drop("C0");
df = df.withColumn("value", df.col("C1")).drop("C1");
df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType)));
df.show();
}
}
UDF Code
The UDF (User Defined Function) code is the same for both Apache Spark v1.6.2 and v2.0.0.
package net.jgp.labs.spark.udf;
import org.apache.spark.sql.api.java.UDF1;
public class Multiplier2 implements UDF1<Integer, Integer> {
private static final long serialVersionUID = -4519338105113996424L;
@Override
public Integer call(Integer t1) throws Exception {
return t1 * 2;
}
}
NC schools by school district
This example downloads a JSON file from the Open Data portal of Durham and starts some analysis.
You will need:
- The net.jgp.commons.download.DownloadManager class, which you can get from GitHub at https://github.com/jgperrin/net.jgp.commons.download.git.
- A /Pool directory, which you can easily replace by /tmp if you want.
/**
* NC schools by school district analysis.
*/
package net.jgp.labs.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import net.jgp.commons.download.DownloadManager;
public class ListNCSchoolDistricts {
public static void main(String[] args) {
String filename = DownloadManager.getFilename(
"https://opendurham.nc.gov/explore/dataset/north-carolina-school-performance-data/download/?format=json&timezone=America/New_York");
System.out.println("File " + filename + " downloaded");
SparkSession spark = SparkSession.builder().appName("NC Schools").master("local").getOrCreate();
String fileToAnalyze = "/Pool/" + filename;
System.out.println("File to analyze: " + fileToAnalyze);
Dataset<Row> df;
df = spark.read().option("dateFormat", "yyyy-mm-dd").json(fileToAnalyze);
df = df.withColumn("district", df.col("fields.district"));
df = df.groupBy("district").count().orderBy(df.col("district"));
df.show(150, false);
}
}
Partial output:
File 644ae7209488a5a34f6ecf6c6aa27178.dl downloaded File to analyze: /Pool/644ae7209488a5a34f6ecf6c6aa27178.dl +------------------------------+-----+ |district |count| +------------------------------+-----+ |Alamance-Burlington Schools |34 | |Alexander County Schools |10 | |Alleghany County Schools |4 | |Anson County Schools |10 | |Ashe County Schools |5 | |Asheboro City Schools |8 | |Asheville City Schools |8 | |Avery County Schools |10 | |Beaufort County Schools |12 | |Bertie County Schools |8 | |Bladen County Schools |13 | |Brunswick County Schools |18 | |Buncombe County Schools |39 | |Burke County Schools |25 | |Cabarrus County Schools |36 | |Caldwell County Schools |24 | |Camden County Schools |5 | |Carteret County Public Schools|16 | |Caswell County Schools |6 | |Catawba County Schools |27 | |Chapel Hill-Carrboro Schools |18 | ...
More to Come
More are on their way, use the comments if you have a specific one in mind and I’ll see if I can add it.
Updates
- 2016-07-22: First version.
- 2016-07-25: Added 2 UDF examples.
- 2016-07-29: Migration to Spark v2.0.0 started.
- 2016-07-29: NC School example added, parsing JSON into a Dataset.

Comments are closed.