Customer 360

Chetanya-Patil
3 min readNov 27, 2021

--

The following data pipeline explains the steps to import data from MySQL to HDFS using Sqoop, load data into Spark from HDFS, and store results into HDFS But as per requirement we have to put our data in the form table. And table data should be available after terminating the spark job too.

Problem Statement: To find the top 10 customers ( based on total purchase amount) from TX state using spark.

Steps of execution:

1. Importing data from MySQL and one file to HDFS

2. Spark Execution

3. Sink it

As a single file which we will be using here, that file contain multi char delimiter which is challenging thing to make it in structure form and use it for purpose.

Importing data from MySQL to HDFS using Sqoop

The solution requires “customers”,”order_items” tables from MySQL to be imported. And a text file of “orders” to be imported. And imported data will be in parquet format.

#Order items Table: Parquet Format
Sqoop import \
— connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
— username root \
— password cloudera \
— table orders_items \
— warehouse-dir /user/cloudera/Project_Input
— as-parquetfile

#Customers Table: Parquet Format
Sqoop import \
— connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
— username root \
— password cloudera \
— table customers \
— warehouse-dir /user/cloudera/Project_Input
— as-parquetfile

#orders file is directly loaded into hdfs using
→ hadoop fs -put ‘/home/cloudera/Downloads/orders.csv’ ‘/user/cloudera/sparkInput/orders.csv’

Spark Execution

All the required data is now imported to either HDFS. As a source, this data will be used in Spark to get the desired results as per the problem statement.

Spark Code
==========

import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode

object MainCode extends App {

Logger.getLogger(“org”).setLevel(Level.ERROR)

val myregex = “””^(\s+) (\s+)\t(\s+)\,(\s+)”””.r

case class orders(order_id:Int,customer_id:Int,order_status:String)

def parser(line:String) = {
line match {
case myregex(order_id,date,customer_id,order_status) => orders(order_id.toInt,customer_id.toInt,order_status)
}
}


val sparkconf = new SparkConf()
sparkconf.set(“spark.app.name”,”projectapplication”)
sparkconf.set(“spark.master.app”,”local[2]”)

val spark = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._
val customers = spark.sparkContext.textFile(“/home/cloudera/Downloads/customers.csv”)

val structureCustomer = customers.map(parser).toDF().cache()



val ordersdf = spark.read
.format(“parquet”)
.option(“path”,”/user/cloudera/spark_usecase/orders”)
.load()

val order_items = spark.read
.format(“parquet”)
.option(“path”,”/user/cloudera/spark_usecase/order_items”)
.load()

val tx_customers = structureCustomer.filter(structureCustomer(“customer_state”) == “TX”)

val tx_customer_oi = tx_customers.join(ordersdf, tx_customers(“customer_id”) == ordersdf(“order_customer_id”), “inner”)

.join(order_items, ordersdf(“order_id”) == order_items(“order_item_order_id”), “inner”)



val top10_tx_customers = tx_customer_oi
.groupBy(“customer_id”, “customer_fname”, “customer_state”)
.sum(“order_item_subtotal”).as(“total_purchase_amt”)
.orderBy(“total_purchase_amt”).limit(10)


spark.sql(“create database if not exists customer360_Output”)

top10_tx_customers.write
.partitionBy(“customer_state”)
.bucketBy(4,”order_id”)
.mode(SaveMode.Overwrite)
.saveAsTable(“customer360_Output.finalOutput”)


spark.stop()
}

As Requierment was that, that data should be in the form of table at the end and like it should be remain present if we terminate our spark job.

As we are using spark metastore to store the metadata of the particular table and if we terminate the job it will be vanished from spark metastore.

So understanding this requirement we used to see that we should use hive metastore to store the metadata of particular table. It will be remain in metastore always.

If we terminate the spark job still we have metadata of that particular table. Now no need to worry about a metadata which is in hive metastore.(One of the database) and we know that our actual data is in HDFS.

To use Hive Metastore we have to enable hive support during creation of spark session and need to add some jar files.

Data We use for it : -

Check the Table presence in customer360_Output database In Hive

show databases;

use customer360_Output;

show tables;

select * from finalOutput limit 10; — — — -(final output)

--

--