Building a Web Analytics System Using Kafka and Spark Streaming in Google Cloud

The aim of the project is to create a data pipeline, that will receive hits from the website using a Flask rest API.

The rest API we publish the data to Kafka topics. We subscribe to these topics using a Google Dataproc cluster.

Then we use spark streaming to read the data from the Kafka topic and push it into Google Bigquery.

STEP 1 – Pushing data into Kafka Topics from the Rest Api Endpoints

Here is the code of the Javascript snippet that I put on the website and the Flask API code.

<script>
        // Generate a unique Id for the user  
	function uuidv4() {
	  return ([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g, c =>
	    (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16)
	  );
	}
           function getCookie(cname) 
    {
		  var name = cname + "=";
		  var decodedCookie = decodeURIComponent(document.cookie);
		  var ca = decodedCookie.split(';');
		  for(var i = 0; i <ca.length; i++) {
		    var c = ca[i];
		    while (c.charAt(0) == ' ') {
		      c = c.substring(1);
		    }
		    if (c.indexOf(name) == 0) {
		      return c.substring(name.length, c.length);
		    }
		  }
		  return "";
	}
	

	function setCookie(cname, cvalue) 
	{
		  var d = new Date();
		  d.setTime(d.getTime() + (30*60*1000));
		  var expires = "expires="+ d.toUTCString();
		  document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/";
	}
        // Seeing if the user is already registered.
	var retrievedUiud = localStorage.getItem('Uiud');
	if (retrievedUiud)
	{ 
		var Uiud = retrievedUiud;
	}
	else
	{
		var Uiud = uuidv4(); 
		localStorage.setItem('Uiud', Uiud);
	}
       window.onload = function() {
  // Session Id is unique for every session, the cookies expire after 30 minutes of inactivity. 
  var sessionId;
    if(getCookie("sessionId") !=""){
    	sessionId = getCookie("sessionId");
    	setCookie("sessionId", sessionId);

    }
    else
    {
    	sessionId = Math.floor((Math.random() * 1000000) + 1);
    	setCookie("sessionId", sessionId);
    }
	

   var referrer = document.referrer;
	if(referrer ==""){
		referrer = "direct";
		landingPage = 1;
	}
	else if(referrer.indexOf(window.location.hostname) == -1)
	{

       landingPage = 1;
       referrer = document.referrer
	}
	else
	{
		landingPage = 0;
		referrer ="direct";
	}
        // send the data to the rest api
	data ={
        "timestamp" : Date.now(),
       "lastPageVisited" : encodeURI(document.referrer),
        "pageUrl" : encodeURI(window.location.href),
       "pageTitle" : encodeURI(document.title),
       "eventType" : "Pageview",
       "landingPage" : landingPage,
       "referrer" : encodeURI(referrer),
       "uiud" : Uiud,
       "sessionId" : sessionId
	};
   	url = "http://35.192.211.4:5000/api/webeventspub"
        data = JSON.stringify(data);
        const other_params = {
            headers : { "content-type" : "application/json; charset=UTF-8" },
            body : data,
            method : "POST",
            mode : "cors"
        };
        fetch(url, other_params);
        console.log(data);
	};
  
</script>

Here is the code for the Flask Api, for Kafka producer, look into resources/webevents.py

Here is the code for the file.

import json
from flask_restful import Resource, reqparse
from flask import Flask, jsonify, request
from flask_jwt_extended import jwt_required
from kafka import KafkaProducer
import datetime

class WebEvents(Resource):
	parser = reqparse.RequestParser()

	parser.add_argument('pageUrl',
						type=str,
						required=True,
						help='This cannot be empty')

	parser.add_argument('lastPageVisited',
						type=str,
						required=True,
						help='This cannot be empty')
	
	parser.add_argument('timestamp',
						type=lambda x:datetime.datetime.fromtimestamp(x/1000.0).strftime('%Y-%m-%d %H:%M:%S'),
						required=True, 
						help='Input wasn\'t valid!')

	parser.add_argument('eventType',
						type=str,
						required=True,
						help='This cannot be empty')

	parser.add_argument('uiud',
						type=str,
						required=True,
						help='This cannot be empty')

	parser.add_argument('sessionId',
						type=str,
						required=True,
						help='This cannot be empty')

	parser.add_argument('referrer',
						type=str,
						required=True,
						help='This cannot be empty')

	parser.add_argument('landingPage',
						type=str,
						required=True,
						help='This cannot be empty')


	producer = KafkaProducer(bootstrap_servers=['10.128.0.28:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))

	#parser.addar
	def post(self):
		data = WebEvents.parser.parse_args()
		page_url = data.pageUrl
		last_page_visited = data.lastPageVisited
		timestamp = data.timestamp
		eventType = data.eventType
		uiud = data.uiud
		#print(timestamp)
		#data = {"timestamp":timestamp,"lastPageVisited":"https://kickassdataprojects.com/complete-guide-to-mastering-and-optimizing-google-bigquery/","pageUrl":"https://kickassdataprojects.com/about-me/","eventType":"Pageview","uiud":data.uiud}
		WebEvents.producer.send("webevent", value = data)  
		print(data)
		return {'id': timestamp}, 200

The bootstrap servers in case of Dataproc are the worker nodes, the kafka by default works on the node 9092, you can connect to the Dataproc cluster using the internal IP of the worker nodes.

Some things that I struggled at this point that in my Dataproc cluster for some reason the Kafka cluster was not running.

Here are some commands that could be useful.

Start the Kafka server – sudo bin/kafka-server-start.sh config/server.properties

Check list of brokers– ./bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids

Get the list of topics – ../bin/kafka-topics.sh –zookeeper localhost:2181 –list

Create a kafka topic – ../bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic TutorialTopic

Check the data that has been published to a Kafka Topic – kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic first_topic –from-beginning –property print.key=true –property key.separator=,

STEP 2 Writing Spark Streaming script to read the data from the Kafka cluster

After you have published the data to Kafka the next step is to write a spark streaming. Here is the complete script.

from pyspark import SparkConf,SparkContext 
from pyspark.sql import SparkSession,SQLContext 
from pyspark.sql.functions import *
from pyspark.sql.types import * 
import time 
import datetime 

conf = SparkConf(). \
setAppName("Streaming Data"). \
setMaster("yarn-client")

#Setup a Spark Session
sc = SparkContext(conf=conf)
sqlcontext = SQLContext(sc)
spark = SparkSession \
        .builder \
        .appName("webevent-analysis") \
        .getOrCreate()

# Read data from the Kafka Topic
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers","cluster-e977-w-0:9092") \
    .option("subscribe","webevent") \
    .load() \
    .selectExpr("CAST(value as STRING)")

# Create a DataFrame From the Data
df_parsed = df.select("value")
schema = StructType(
                [
                
                StructField('timestamp', TimestampType(), True),
                StructField('lastPageVisited', StringType(), True),
                StructField('pageUrl', StringType(), True),
                StructField('eventType', StringType(), True),
                StructField('uiud', StringType(), True),
                StructField('sessionId', StringType(), True),
                StructField('referrer', StringType(), True),
                StructField('landingPage', StringType(), True),
                ]
        )

df_streaming_visits = df_parsed.withColumn("data", from_json("value",schema)).select(col('data.*'))

# Add Watermark for late data and apply aggreagations to the dataframe
df_pageviews  = df_streaming_visits    \
       .withWatermark(df_streaming_visits.timestamp, "200 seconds")
                                .groupBy(df_streaming_visits.uiud, df_streaming_visits.sessionId, window(df_streaming_visits.timestamp), "1800 seconds")        \
                                .agg(             \
                             collect_list(df_streaming_visits.pageUrl).alias("Pages"),
                               collect_list(df_streaming_visits.referrer).alias("referrer"),
                                collect_list(df_streaming_visits.landingPage).alias("LandingPageCode"),
                                collect_list(df_streaming_visits.timestamp).alias("sessionTimestamps"),
                                count(df_streaming_visits.eventType).alias("Pageviews")
                                        )

# For each step we trigger this fnction which add data to 
def foreach_batch_function(df, epoch_id):
    df.show()
    df.coalesce(1).write \
    .format("avro") \
    .mode("append") \
    .option("checkpointLocation",bucket_name+"/spark-agg-checkpoints/") \
    .option("path",bucket_name+"/stateless_aggregations/") \
    .save()

query = ( df_pageviews.writeStream.trigger(processingTime = "180 seconds")  
          .foreachBatch(foreach_batch_function)
          .outputMode("append") 
          .format("console") 
          .start()
          )
query.awaitTermination() 

Here are some important points to understand from the above code.

  1. Structured Streaming is the first API to build stream processing on top of the SQL engine. Hence you need a SQL context when you use spark streaming.
  2. The first part of the code is pretty self-explanatory, you are simply subscribing to a Kafka topic and reading the data.
  3. From the Kafka topic, you are basically getting a key and a value, so depending on your use case you need to cast it as a string. https://medium.com/analytics-vidhya/spark-select-and-select-expr-deep-dive-d63ef5e04c87
  4. Next, you convert the JSON string into the Dataframe. Since DataFrame is immutable, you need to create a new Dataframe after each operation.
  5. Watermarking is used to deal with late data in spark.
  6. We group by multiple columns and aggregate the required columns as lists or counts

Here is the final command you need to use to execute the code.

gcloud dataproc jobs submit pyspark teststreaming1.py --cluster=cluster-add1 --region=us-central1 --jars=gs://webev
ents/jars/spark-streaming-kafka-0-10-assembly_2.11-2.4.5.jar,gs://webevents/jars/spark-sql-kafka-0-10_2.11-2.4.5.jar  --properties spark.jars.packages=org.apache.
spark:spark-avro_2.11:2.4.2

Make sure that the jar files that you add are according to the version of spark on your Dataproc cluster and while creating the Dataporc cluster you took the required initialization actions.

This image has an empty alt attribute; its file name is image-11-1024x576.png

Once you execute the code you should be able to see the final outcome in the command line and the files will show up in google cloud storage.

This image has an empty alt attribute; its file name is image-12-1024x576.png

STEP 3 Writing the data to Google Bigquery

For the final step, we will use cloud functions to send data from cloud storage to Bigquery.

#Cloud-Functions

"""Import avro file into BigQuery."""

import os 

from google.cloud import bigquery

def update_user_cart(data,context) :

    client = bigquery.Client()

    bucketname = data['bucket']

    filename = data['name']

    timeCreated = data['timeCreated']

    dataset_id = "analytics_streaming_data"

    dataset_ref = client.dataset(dataset_id)

    job_config = bigquery.LoadJobConfig()

    job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND

    job_config.autodetect = True 

    job_config.ignore_unknown_values = True

    job_config.source_format = bigquery.SourceFormat.AVRO


    uri = 'gs://%s/%s' % (bucketname,filename)

    load_job = client.load_table_from_uri(uri,dataset_ref.table('sessions'),job_config=job_config)

    print("Starting Job {}".format(load_job.job_id))

    load_job.result()

    print("Job Finished")

    destination_table = client.get_table(dataset_ref.table('sessions'))
    print('Loaded {} rows.' .format(destination_table.num_rows))

This cloud function puts the data into bigquery and you can setup triggers also that when new files are added to the folder they are pushed to bigquery.

About the author

admin

Mastering Data Engineering/ Data science one project at a time. I have worked and developed multiple startups before, and this blog is for my journey as a startup in itself where I iterate and learn.

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

Copyright © 2023. Created by Meks. Powered by WordPress.