In this post we talk about how you can read data from files using Spark Structured Streaming and store the output in a Hive table

Build a Streaming App
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object StructuredStreamingSaveToHive {
def main(args: Array[String]): Unit = {
println(“Structured Streaming Demo”)
val conf = new SparkConf().setAppName(“Spark Structured Streaming”).setMaster(“local[*]”)
val spark = SparkSession.builder.config(conf).getOrCreate()
println(“Spark Session created”)
val schema = StructType(Array(StructField(“empId”,StringType),StructField(“empName”,StringType)))
// Create a “inputDir” under the
val streamDF = spark.readStream.option(“header”,”true”).schema(schema).csv(“C:\\inputDir”)
val query = streamDF.writeStream.outputMode(OutputMode.Append()).format(“csv”)
.option(“path”,”hivelocation”).option(“checkpointLocation”,”locatoin1″).start()
query.awaitTermination()
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>FuturexMiscSparkScala</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.3</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
- Keep the C:\\inputDir directory initially empty.
- Start the program and it will be waiting to stream.
- Then copy each of the files (fil1, file2, file3 mentioned below) to C:\\inputDir directory one file at a time and see the output in “hivelocation” directory under your project root folder.
fil1.txt
empiId,empName
1,Chris
2,Neil
file2.txt
empiId,empName
3,John
4,Paul
file3.txt
empiId,empName
5,Kathy
6,Ana
You can create a Hive table pointing to the “hivelocation” and see data getting populated incrementally
To Learn more about Spark Scala Coding Framework and Best Practices checkout our Udemy course https://www.udemy.com/course/spark-scala-coding-best-practices-data-pipeline/?referralCode=DBA026944F73C2D356CF