Building a Web Analytics System Using Kafka and Spark Streaming in Google Cloud
The aim of the project is to create a data pipeline, that will receive hits from the website using a Flask rest API.
The rest API we publish the data to Kafka topics. We subscribe to these topics using a Google Dataproc cluster.
Then we use spark streaming to read the data from the Kafka topic and push it into Google Bigquery.
STEP 1 – Pushing data into Kafka Topics from the Rest Api Endpoints
Here is the code of the Javascript snippet that I put on the website and the Flask API code.
<script>
// Generate a unique Id for the user
function uuidv4() {
return ([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g, c =>
(c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16)
);
}
function getCookie(cname)
{
var name = cname + "=";
var decodedCookie = decodeURIComponent(document.cookie);
var ca = decodedCookie.split(';');
for(var i = 0; i <ca.length; i++) {
var c = ca[i];
while (c.charAt(0) == ' ') {
c = c.substring(1);
}
if (c.indexOf(name) == 0) {
return c.substring(name.length, c.length);
}
}
return "";
}
function setCookie(cname, cvalue)
{
var d = new Date();
d.setTime(d.getTime() + (30*60*1000));
var expires = "expires="+ d.toUTCString();
document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/";
}
// Seeing if the user is already registered.
var retrievedUiud = localStorage.getItem('Uiud');
if (retrievedUiud)
{
var Uiud = retrievedUiud;
}
else
{
var Uiud = uuidv4();
localStorage.setItem('Uiud', Uiud);
}
window.onload = function() {
// Session Id is unique for every session, the cookies expire after 30 minutes of inactivity.
var sessionId;
if(getCookie("sessionId") !=""){
sessionId = getCookie("sessionId");
setCookie("sessionId", sessionId);
}
else
{
sessionId = Math.floor((Math.random() * 1000000) + 1);
setCookie("sessionId", sessionId);
}
var referrer = document.referrer;
if(referrer ==""){
referrer = "direct";
landingPage = 1;
}
else if(referrer.indexOf(window.location.hostname) == -1)
{
landingPage = 1;
referrer = document.referrer
}
else
{
landingPage = 0;
referrer ="direct";
}
// send the data to the rest api
data ={
"timestamp" : Date.now(),
"lastPageVisited" : encodeURI(document.referrer),
"pageUrl" : encodeURI(window.location.href),
"pageTitle" : encodeURI(document.title),
"eventType" : "Pageview",
"landingPage" : landingPage,
"referrer" : encodeURI(referrer),
"uiud" : Uiud,
"sessionId" : sessionId
};
url = "http://35.192.211.4:5000/api/webeventspub"
data = JSON.stringify(data);
const other_params = {
headers : { "content-type" : "application/json; charset=UTF-8" },
body : data,
method : "POST",
mode : "cors"
};
fetch(url, other_params);
console.log(data);
};
</script>
Here is the code for the Flask Api, for Kafka producer, look into resources/webevents.py
Here is the code for the file.
import json
from flask_restful import Resource, reqparse
from flask import Flask, jsonify, request
from flask_jwt_extended import jwt_required
from kafka import KafkaProducer
import datetime
class WebEvents(Resource):
parser = reqparse.RequestParser()
parser.add_argument('pageUrl',
type=str,
required=True,
help='This cannot be empty')
parser.add_argument('lastPageVisited',
type=str,
required=True,
help='This cannot be empty')
parser.add_argument('timestamp',
type=lambda x:datetime.datetime.fromtimestamp(x/1000.0).strftime('%Y-%m-%d %H:%M:%S'),
required=True,
help='Input wasn\'t valid!')
parser.add_argument('eventType',
type=str,
required=True,
help='This cannot be empty')
parser.add_argument('uiud',
type=str,
required=True,
help='This cannot be empty')
parser.add_argument('sessionId',
type=str,
required=True,
help='This cannot be empty')
parser.add_argument('referrer',
type=str,
required=True,
help='This cannot be empty')
parser.add_argument('landingPage',
type=str,
required=True,
help='This cannot be empty')
producer = KafkaProducer(bootstrap_servers=['10.128.0.28:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
#parser.addar
def post(self):
data = WebEvents.parser.parse_args()
page_url = data.pageUrl
last_page_visited = data.lastPageVisited
timestamp = data.timestamp
eventType = data.eventType
uiud = data.uiud
#print(timestamp)
#data = {"timestamp":timestamp,"lastPageVisited":"https://kickassdataprojects.com/complete-guide-to-mastering-and-optimizing-google-bigquery/","pageUrl":"https://kickassdataprojects.com/about-me/","eventType":"Pageview","uiud":data.uiud}
WebEvents.producer.send("webevent", value = data)
print(data)
return {'id': timestamp}, 200
The bootstrap servers in case of Dataproc are the worker nodes, the kafka by default works on the node 9092, you can connect to the Dataproc cluster using the internal IP of the worker nodes.
Some things that I struggled at this point that in my Dataproc cluster for some reason the Kafka cluster was not running.
Here are some commands that could be useful.
Start the Kafka server – sudo bin/kafka-server-start.sh config/server.properties
Check list of brokers– ./bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids
Get the list of topics – ../bin/kafka-topics.sh –zookeeper localhost:2181 –list
Create a kafka topic – ../bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic TutorialTopic
Check the data that has been published to a Kafka Topic – kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic first_topic –from-beginning –property print.key=true –property key.separator=,
STEP 2 Writing Spark Streaming script to read the data from the Kafka cluster
After you have published the data to Kafka the next step is to write a spark streaming. Here is the complete script.
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import datetime
conf = SparkConf(). \
setAppName("Streaming Data"). \
setMaster("yarn-client")
#Setup a Spark Session
sc = SparkContext(conf=conf)
sqlcontext = SQLContext(sc)
spark = SparkSession \
.builder \
.appName("webevent-analysis") \
.getOrCreate()
# Read data from the Kafka Topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","cluster-e977-w-0:9092") \
.option("subscribe","webevent") \
.load() \
.selectExpr("CAST(value as STRING)")
# Create a DataFrame From the Data
df_parsed = df.select("value")
schema = StructType(
[
StructField('timestamp', TimestampType(), True),
StructField('lastPageVisited', StringType(), True),
StructField('pageUrl', StringType(), True),
StructField('eventType', StringType(), True),
StructField('uiud', StringType(), True),
StructField('sessionId', StringType(), True),
StructField('referrer', StringType(), True),
StructField('landingPage', StringType(), True),
]
)
df_streaming_visits = df_parsed.withColumn("data", from_json("value",schema)).select(col('data.*'))
# Add Watermark for late data and apply aggreagations to the dataframe
df_pageviews = df_streaming_visits \
.withWatermark(df_streaming_visits.timestamp, "200 seconds")
.groupBy(df_streaming_visits.uiud, df_streaming_visits.sessionId, window(df_streaming_visits.timestamp), "1800 seconds") \
.agg( \
collect_list(df_streaming_visits.pageUrl).alias("Pages"),
collect_list(df_streaming_visits.referrer).alias("referrer"),
collect_list(df_streaming_visits.landingPage).alias("LandingPageCode"),
collect_list(df_streaming_visits.timestamp).alias("sessionTimestamps"),
count(df_streaming_visits.eventType).alias("Pageviews")
)
# For each step we trigger this fnction which add data to
def foreach_batch_function(df, epoch_id):
df.show()
df.coalesce(1).write \
.format("avro") \
.mode("append") \
.option("checkpointLocation",bucket_name+"/spark-agg-checkpoints/") \
.option("path",bucket_name+"/stateless_aggregations/") \
.save()
query = ( df_pageviews.writeStream.trigger(processingTime = "180 seconds")
.foreachBatch(foreach_batch_function)
.outputMode("append")
.format("console")
.start()
)
query.awaitTermination()
Here are some important points to understand from the above code.
- Structured Streaming is the first API to build stream processing on top of the SQL engine. Hence you need a SQL context when you use spark streaming.
- The first part of the code is pretty self-explanatory, you are simply subscribing to a Kafka topic and reading the data.
- From the Kafka topic, you are basically getting a key and a value, so depending on your use case you need to cast it as a string. https://medium.com/analytics-vidhya/spark-select-and-select-expr-deep-dive-d63ef5e04c87
- Next, you convert the JSON string into the Dataframe. Since DataFrame is immutable, you need to create a new Dataframe after each operation.
- Watermarking is used to deal with late data in spark.
- We group by multiple columns and aggregate the required columns as lists or counts
Here is the final command you need to use to execute the code.
gcloud dataproc jobs submit pyspark teststreaming1.py --cluster=cluster-add1 --region=us-central1 --jars=gs://webev
ents/jars/spark-streaming-kafka-0-10-assembly_2.11-2.4.5.jar,gs://webevents/jars/spark-sql-kafka-0-10_2.11-2.4.5.jar --properties spark.jars.packages=org.apache.
spark:spark-avro_2.11:2.4.2
Make sure that the jar files that you add are according to the version of spark on your Dataproc cluster and while creating the Dataporc cluster you took the required initialization actions.

Once you execute the code you should be able to see the final outcome in the command line and the files will show up in google cloud storage.

STEP 3 Writing the data to Google Bigquery
For the final step, we will use cloud functions to send data from cloud storage to Bigquery.
#Cloud-Functions
"""Import avro file into BigQuery."""
import os
from google.cloud import bigquery
def update_user_cart(data,context) :
client = bigquery.Client()
bucketname = data['bucket']
filename = data['name']
timeCreated = data['timeCreated']
dataset_id = "analytics_streaming_data"
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.autodetect = True
job_config.ignore_unknown_values = True
job_config.source_format = bigquery.SourceFormat.AVRO
uri = 'gs://%s/%s' % (bucketname,filename)
load_job = client.load_table_from_uri(uri,dataset_ref.table('sessions'),job_config=job_config)
print("Starting Job {}".format(load_job.job_id))
load_job.result()
print("Job Finished")
destination_table = client.get_table(dataset_ref.table('sessions'))
print('Loaded {} rows.' .format(destination_table.num_rows))
This cloud function puts the data into bigquery and you can setup triggers also that when new files are added to the folder they are pushed to bigquery.

No comments yet.
Add your comment