Apache Spark is an open source data processing framework which can perform analytic operations on Big Data in a distributed environment. It was an academic project in UC Berkley and was initially started by Matei Zaharia at UC Berkeley’s AMPLab in 2009. Apache Spark was created on top of a cluster management tool known as Mesos. This was later modified and upgraded so that it can work in a cluster based environment with distributed processing.
We will be using Maven to create a sample project for the demonstration. To create the project, execute the following command in a directory that you will use as workspace:
mvn archetype:generate -DgroupId=com.journaldev.sparkdemo -DartifactId=JD-Spark-WordCount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
If you are running maven for the first time, it will take a few seconds to accomplish the generate command because maven has to download all the required plugins and artifacts in order to make the generation task. Once you have created the project, feel free to open it in your favourite IDE. Next step is to add appropriate Maven Dependencies to the project. Here is the pom.xml
file with the appropriate dependencies:
<dependencies>
<!-- Import Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.geekcap.javaworld.sparkexample.WordCount</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>install</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
As this is a maven-based project, there is actually no need to install and setup Apache Spark on your machine. When we run this project, a runtime instance of Apache Spark will be started and once the program has done executing, it will be shutdown. Finally, to understand all the JARs which are added to the project when we added this dependency, we can run a simple Maven command which allows us to see a complete Dependency Tree for a project when we add some dependencies to it. Here is a command which we can use:
mvn dependency:tree
When we run this command, it will show us the following Dependency Tree:
shubham:JD-Spark-WordCount shubham$ mvn dependency:tree
[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.journaldev:java-word-count:jar:1.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 41, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] -------------------< com.journaldev:java-word-count >-------------------
[INFO] Building java-word-count 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ java-word-count ---
[INFO] com.journaldev:java-word-count:jar:1.0-SNAPSHOT
[INFO] +- org.apache.spark:spark-core_2.11:jar:1.4.0:compile
[INFO] | +- com.twitter:chill_2.11:jar:0.5.0:compile
[INFO] | | \- com.esotericsoftware.kryo:kryo:jar:2.21:compile
[INFO] | | +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile
[INFO] | | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] | | \- org.objenesis:objenesis:jar:1.2:compile
[INFO] | +- com.twitter:chill-java:jar:0.5.0:compile
[INFO] | +- org.apache.hadoop:hadoop-client:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-common:jar:2.2.0:compile
[INFO] | | | +- commons-cli:commons-cli:jar:1.2:compile
[INFO] | | | +- org.apache.commons:commons-math:jar:2.1:compile
[INFO] | | | +- xmlenc:xmlenc:jar:0.52:compile
[INFO] | | | +- commons-io:commons-io:jar:2.1:compile
[INFO] | | | +- commons-logging:commons-logging:jar:1.1.1:compile
[INFO] | | | +- commons-lang:commons-lang:jar:2.5:compile
[INFO] | | | +- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] | | | | +- commons-collections:commons-collections:jar:3.2.1:compile
[INFO] | | | | +- commons-digester:commons-digester:jar:1.8:compile
[INFO] | | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] | | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] | | | +- org.codehaus.jackson:jackson-core-asl:jar:1.8.8:compile
[INFO] | | | +- org.apache.avro:avro:jar:1.7.4:compile
[INFO] | | | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
[INFO] | | | +- org.apache.hadoop:hadoop-auth:jar:2.2.0:compile
[INFO] | | | \- org.apache.commons:commons-compress:jar:1.4.1:compile
[INFO] | | | \- org.tukaani:xz:jar:1.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-hdfs:jar:2.2.0:compile
[INFO] | | | \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile
[INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.2.0:compile
[INFO] | | | +- org.apache.hadoop:hadoop-mapreduce-client-common:jar:2.2.0:compile
[INFO] | | | | +- org.apache.hadoop:hadoop-yarn-client:jar:2.2.0:compile
[INFO] | | | | | +- com.google.inject:guice:jar:3.0:compile
[INFO] | | | | | | +- javax.inject:javax.inject:jar:1:compile
[INFO] | | | | | | \- aopalliance:aopalliance:jar:1.0:compile
[INFO] | | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-framework-grizzly2:jar:1.9:compile
[INFO] | | | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-framework-core:jar:1.9:compile
[INFO] | | | | | | | +- javax.servlet:javax.servlet-api:jar:3.0.1:compile
[INFO] | | | | | | | \- com.sun.jersey:jersey-client:jar:1.9:compile
[INFO] | | | | | | \- com.sun.jersey:jersey-grizzly2:jar:1.9:compile
[INFO] | | | | | | +- org.glassfish.grizzly:grizzly-https:jar:2.1.2:compile
[INFO] | | | | | | | \- org.glassfish.grizzly:grizzly-framework:jar:2.1.2:compile
[INFO] | | | | | | | \- org.glassfish.gmbal:gmbal-api-only:jar:3.0.0-b023:compile
[INFO] | | | | | | | \- org.glassfish.external:management-api:jar:3.0.0-b012:compile
[INFO] | | | | | | +- org.glassfish.grizzly:grizzly-http-server:jar:2.1.2:compile
[INFO] | | | | | | | \- org.glassfish.grizzly:grizzly-rcm:jar:2.1.2:compile
[INFO] | | | | | | +- org.glassfish.grizzly:grizzly-http-servlet:jar:2.1.2:compile
[INFO] | | | | | | \- org.glassfish:javax.servlet:jar:3.1:compile
[INFO] | | | | | +- com.sun.jersey:jersey-json:jar:1.9:compile
[INFO] | | | | | | +- org.codehaus.jettison:jettison:jar:1.1:compile
[INFO] | | | | | | | \- stax:stax-api:jar:1.0.1:compile
[INFO] | | | | | | +- com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile
[INFO] | | | | | | | \- javax.xml.bind:jaxb-api:jar:2.2.2:compile
[INFO] | | | | | | | \- javax.activation:activation:jar:1.1:compile
[INFO] | | | | | | +- org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile
[INFO] | | | | | | \- org.codehaus.jackson:jackson-xc:jar:1.8.3:compile
[INFO] | | | | | \- com.sun.jersey.contribs:jersey-guice:jar:1.9:compile
[INFO] | | | | \- org.apache.hadoop:hadoop-yarn-server-common:jar:2.2.0:compile
[INFO] | | | \- org.apache.hadoop:hadoop-mapreduce-client-shuffle:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-yarn-api:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.2.0:compile
[INFO] | | | \- org.apache.hadoop:hadoop-yarn-common:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-jobclient:jar:2.2.0:compile
[INFO] | | \- org.apache.hadoop:hadoop-annotations:jar:2.2.0:compile
[INFO] | +- org.apache.spark:spark-launcher_2.11:jar:1.4.0:compile
[INFO] | +- org.apache.spark:spark-network-common_2.11:jar:1.4.0:compile
[INFO] | +- org.apache.spark:spark-network-shuffle_2.11:jar:1.4.0:compile
[INFO] | +- org.apache.spark:spark-unsafe_2.11:jar:1.4.0:compile
[INFO] | +- net.java.dev.jets3t:jets3t:jar:0.7.1:compile
[INFO] | | +- commons-codec:commons-codec:jar:1.3:compile
[INFO] | | \- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] | +- org.apache.curator:curator-recipes:jar:2.4.0:compile
[INFO] | | +- org.apache.curator:curator-framework:jar:2.4.0:compile
[INFO] | | | \- org.apache.curator:curator-client:jar:2.4.0:compile
[INFO] | | +- org.apache.zookeeper:zookeeper:jar:3.4.5:compile
[INFO] | | | \- jline:jline:jar:0.9.94:compile
[INFO] | | \- com.google.guava:guava:jar:14.0.1:compile
[INFO] | +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:compile
[INFO] | +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] | +- org.apache.commons:commons-math3:jar:3.4.1:compile
[INFO] | +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] | +- org.slf4j:slf4j-api:jar:1.7.10:compile
[INFO] | +- org.slf4j:jul-to-slf4j:jar:1.7.10:compile
[INFO] | +- org.slf4j:jcl-over-slf4j:jar:1.7.10:compile
[INFO] | +- log4j:log4j:jar:1.2.17:compile
[INFO] | +- org.slf4j:slf4j-log4j12:jar:1.7.10:compile
[INFO] | +- com.ning:compress-lzf:jar:1.0.3:compile
[INFO] | +- org.xerial.snappy:snappy-java:jar:1.1.1.7:compile
[INFO] | +- net.jpountz.lz4:lz4:jar:1.2.0:compile
[INFO] | +- org.roaringbitmap:RoaringBitmap:jar:0.4.5:compile
[INFO] | +- commons-net:commons-net:jar:2.2:compile
[INFO] | +- org.spark-project.akka:akka-remote_2.11:jar:2.3.4-spark:compile
[INFO] | | +- org.spark-project.akka:akka-actor_2.11:jar:2.3.4-spark:compile
[INFO] | | | \- com.typesafe:config:jar:1.2.1:compile
[INFO] | | +- io.netty:netty:jar:3.8.0.Final:compile
[INFO] | | +- org.spark-project.protobuf:protobuf-java:jar:2.5.0-spark:compile
[INFO] | | \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
[INFO] | +- org.spark-project.akka:akka-slf4j_2.11:jar:2.3.4-spark:compile
[INFO] | +- org.scala-lang:scala-library:jar:2.11.6:compile
[INFO] | +- org.json4s:json4s-jackson_2.11:jar:3.2.10:compile
[INFO] | | \- org.json4s:json4s-core_2.11:jar:3.2.10:compile
[INFO] | | +- org.json4s:json4s-ast_2.11:jar:3.2.10:compile
[INFO] | | \- org.scala-lang:scalap:jar:2.11.0:compile
[INFO] | | \- org.scala-lang:scala-compiler:jar:2.11.0:compile
[INFO] | | +- org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile
[INFO] | | \- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.1:compile
[INFO] | +- com.sun.jersey:jersey-server:jar:1.9:compile
[INFO] | | \- asm:asm:jar:3.1:compile
[INFO] | +- com.sun.jersey:jersey-core:jar:1.9:compile
[INFO] | +- org.apache.mesos:mesos:jar:shaded-protobuf:0.21.1:compile
[INFO] | +- io.netty:netty-all:jar:4.0.23.Final:compile
[INFO] | +- com.clearspring.analytics:stream:jar:2.7.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-core:jar:3.1.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-jvm:jar:3.1.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-json:jar:3.1.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.4.4:compile
[INFO] | | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.4.0:compile
[INFO] | | \- com.fasterxml.jackson.core:jackson-core:jar:2.4.4:compile
[INFO] | +- com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.4.4:compile
[INFO] | | +- org.scala-lang:scala-reflect:jar:2.11.2:compile
[INFO] | | \- com.thoughtworks.paranamer:paranamer:jar:2.6:compile
[INFO] | +- org.apache.ivy:ivy:jar:2.4.0:compile
[INFO] | +- oro:oro:jar:2.0.8:compile
[INFO] | +- org.tachyonproject:tachyon-client:jar:0.6.4:compile
[INFO] | | \- org.tachyonproject:tachyon:jar:0.6.4:compile
[INFO] | +- net.razorvine:pyrolite:jar:4.4:compile
[INFO] | +- net.sf.py4j:py4j:jar:0.8.2.1:compile
[INFO] | \- org.spark-project.spark:unused:jar:1.0.0:compile
[INFO] \- junit:junit:jar:4.11:test
[INFO] \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.987 s
[INFO] Finished at: 2018-04-07T15:50:34+05:30
[INFO] ------------------------------------------------------------------------
With just two added dependencies, Spark collected all the required dependencies in the project which includes Scala dependencies as well as Apache Spark is written in Scala itself.
As we’re going to create a Word Counter program, we will create a sample input file for our project in the root directory of our project with name input.txt. Put any content inside it, we use the following text:
Hello, my name is Shubham and I am author at JournalDev . JournalDev is a great website to ready
great lessons about Java, Big Data, Python and many more Programming languages.
Big Data lessons are difficult to find but at JournalDev , you can find some excellent
pieces of lessons written on Big Data.
Feel free to use any text in this file.
Before we move on and start working on the code for the project, let’s present here the project structure we will have once we’re finished adding all the code to the project: [caption id=“attachment_20349” align=“aligncenter” width=“399”] Project Structure[/caption]
Now, we’re ready to start writing our program. When you start working with Big Data programs, imports can create a lot of confusion. To avoid this, here are all the imports we will use in our project:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
Next, here is the structure of our class which we will be using:
package com.journaldev.sparkdemo;
...imports...
public class WordCounter {
private static void wordCount(String fileName) {
...
}
public static void main(String[] args) {
...
}
}
All the logic will lie inside the wordCount
method. We will start by defining an object for the SparkConf
class. The object this class is used to set various Spark parameters as key-value pairs for the program. We provide just simple parameters:
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");
The master
specifies local which means that this program should connect to Spark thread running on the localhost
. App name is just a way to provide Spark with the application metadata. Now, we can construct a Spark Context object with this configuration object:
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
Spark considers every resource it gets to process as an RDD (Resilient Distributed Datasets) which helps it to organise the data in a find data structure which is much more efficient to be analysed. We will now convert the input file to a JavaRDD
object itself:
JavaRDD<String> inputFile = sparkContext.textFile(fileName);
We will now use Java 8 APIs to process the JavaRDD
file and split the words the file contains into separate words:
JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));
Again, we make use of Java 8 mapToPair(...)
method to count the words and provide a word, number
pair which can be presented as an output:
JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);
Now, we can save the output file as a text file:
countData.saveAsTextFile("CountData");
Finally, we can provide the entry point to our program with the main()
method:
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("No files provided.");
System.exit(0);
}
wordCount(args[0]);
}
The complete file looks like:
package com.journaldev.sparkdemo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class WordCounter {
private static void wordCount(String fileName) {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> inputFile = sparkContext.textFile(fileName);
JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));
JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);
countData.saveAsTextFile("CountData");
}
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("No files provided.");
System.exit(0);
}
wordCount(args[0]);
}
}
We will now move forward to run this program using Maven itself.
To run the application, go inside the root directory of the program and execute the following command:
mvn exec:java -Dexec.mainClass=com.journaldev.sparkdemo.WordCounter -Dexec.args="input.txt"
In this command, we provide Maven with the fully-qualified name of the Main class and the name for input file as well. Once this command is done executing, we can see a new directory is created in our project: [caption id=“attachment_20345” align=“aligncenter” width=“527”] Project Output Directory[/caption] When we open the directory and the file named “part-00000.txt” inside it, its contents are as follows: [caption id=“attachment_20346” align=“aligncenter” width=“668”] Word Counter Output[/caption]
In this lesson, we saw how we can use Apache Spark in a Maven-based project to make a simple but effective Word counter program. Read more Big Data Posts to gain deeper knowledge of available Big Data tools and processing frameworks.
Download Spark WordCounter Project: JD-Spark-WordCount
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
While we believe that this content benefits our community, we have not yet thoroughly reviewed it. If you have any suggestions for improvements, please let us know by clicking the “report an issue“ button at the bottom of the tutorial.
Hi, part-00000 is created but not as text file and also its empty. Please suggest what might have gone wrong. P.S.: I am running this on windows.
- Priya
i get the error java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
- KB
``` JavaRDD wordsFromFile = inputFile.flatMap( fileContent -> Arrays.asList(fileContent.split(" “))); ``` should have a `.iterator()` at last. ``` JavaRDD wordsFromFile = inputFile.flatMap( fileContent -> Arrays.asList(fileContent.split(” ")).iterator()); ```
- Dennis
i m getting this error: No suitable method found for mapToPair() method like JavaRDD… argument mismatch, cannot be converted to PairFunction
- nitish kumar