Creating Your First Data Pipeline in Google Cloud with Apache Sqoop and Apache Airflow
Welcome to taking the first steps to create your first data pipeline. By the end of it, you will have a data pipeline that takes data from Mysql, we do some preprocessing on it and then store it in google Bigquery.
In this exercise, we will be creating an imaginary pipeline for sales data that is present in transactional databases, in this case, cloud SQL. We are basing this exercise on the following Kaggle challenge, https://www.kaggle.com/c/competitive-data-science-predict-future-sales/data .
Here are the steps that we will be taking to setup this data pipeline.
a) Creating a cloud MySQL instance in google cloud and upload the database to it.
b) Create a pipeline to get the data from cloud SQL into Google Bigquery
Creating a SQL Database and Upload Data
For our first step, we will create a Cloud SQL database on google cloud, we will upload all the files we got from Kaggle into google cloud storage and then create tables based on those files.
Once the Mysql Instances is created we will create SQL tables for the dataset.
After your SQL instance is created you can import the CSV data sets from Google cloud storage and create tables inside Mysql.
One your database is created you will need to connect to using a computer instance or google cloud shell to create the sales table.
gcloud sql connect {name of your server} --user=root --quiet
Here is the command to create the sales table
CREATE TABLE `sales` (
`date` DATE NOT NULL,
`date_block_num` int NOT NULL,
`shop_id` int NOT NULL,
`item_id` int NOT NULL,
`item_price` DECIMAL(9,3) NOT NULL,
`item_cnt_day` DECIMAL(9,3) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
When the sales table is created you can import data into it using the google cloud console.

Create an Airflow Script to create the Pipeline
In this example, we will use Airflow to get the data from MySql instance using Sqoop, then store the data in cloud storage and finally store it in google Bigquery.
Here are the steps your script needs to cover:-
a) Create a Dataproc Cluster, that can use Sqoop to connect to cloud SQL
In the first step, we will create a data proc cluster, that will have 1 master node and 2 worker nodes. We use the hive: hive.metastore.warehouse.dir command to change the default storage directory for hive data to cloud storage, this way it persists even after the data proc cluster is deleted.
Another important point to note is the init_actions_uris and service_account_scopes, that are added to the cluster can communicate with cloud SQL.
create_cluster = DataprocClusterCreateOperator(
task_id='create_sqoop_cluster',
cluster_name="ephemeral-spark-cluster",
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1',
init_actions_uris=["gs://dataproc-initialization-actions/cloud-sql-proxy/cloud-sql-proxy.sh"],
num_workers=2,
region='us-central1',
zone='us-central1-a',
service_account_scopes=["https://www.googleapis.com/auth/sqlservice.admin"],
properties={"hive:hive.metastore.warehouse.dir":BUCKET+"/hive-warehouse"},
metadata={"additional-cloud-sql-instances":instance_name+"=tcp:3307","enable-cloud-sql-hive-metastore":"false"},
image_version="1.2"
)
b) Execute a bash script to copy data to google cloud storage in Avro format
Here is the command that we use so that it simply, some important points to look here are:
Dmapreduce.output.basename=”sales” – This sets the first part of all the files that will be added to cloud storage.
target-dir $target_dir \ – Where the data will be stored in GCS
password-file – Use Passwords file stored in GCS rather than adding than the password in the command
–as-avrodatafile – The format in which the files need to be added to GCS
bucket="gs://airflow12"
pwd_file=$bucket/passwords/password.txt
table_name="sales"
target_dir=$bucket/sqoop_output
# Simple table import - Text Format :
gcloud dataproc jobs submit hadoop \
--cluster=$1 --region=us-central1 \
--class=org.apache.sqoop.Sqoop \
--jars=$bucket/sqoop_jars/sqoop_sqoop-1.4.7.jar,$bucket/sqoop_jars/sqoop_avro-tools-1.8.2.jar,file:///usr/share/java/mysql-connector-java-5.1.42.jar \
-- import \
-Dmapreduce.job.user.classpath.first=true \
-Dmapreduce.output.basename="sales" \
--driver com.mysql.jdbc.Driver \
--connect="jdbc:mysql://localhost:3307/Sales" \
--username=root --password-file=$pwd_file\
--split-by id \
--table $table_name \
-m 4 \
--target-dir $target_dir \
--as-avrodatafile
After this step executes with the Airflow script, you can see the files in google cloud storage.

c) Upload the data from google cloud storage into Google Bigquery
The final part of the airflow adds the data from GCS to Bigquery
bq_load_flight_delays = GoogleCloudStorageToBigQueryOperator(
task_id = "bq_load_flight_delays",
bucket=BUCKET,
source_objects=["sqoop_output/*.avro"], destination_project_dataset_table=PROJECT_ID+".data_analysis.flights_delays", autodetect=True,
source_format="AVRO",
create_disposition="CREATE_IF_NEEDED",
skip_leading_rows=0,
write_disposition="WRITE_APPEND",
max_bad_records=0
)
Here is the complete Airflow Script.
from datetime import datetime, timedelta
from airflow import models
from airflow import DAG
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator
BUCKET = "gs://airflow12"
instance_name = "testserver"
project_id = "phrasal-bond-274216"
DEFAULT_DAG_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email_on_failure': False,
'email_on_retry': False,
'project_id': project_id,
'schedule_interval': "30 2 * * *"
}
with DAG('sqoop_pipeline',
default_args=DEFAULT_DAG_ARGS) as dag: # Here we are using dag as context.
create_cluster = DataprocClusterCreateOperator(
task_id='create_sqoop_cluster',
cluster_name="ephemeral-spark-cluster",
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-2',
init_actions_uris=["gs://dataproc-initialization-actions/cloud-sql-proxy/cloud-sql-proxy.sh"],
num_workers=2,
region='us-central1',
zone='us-central1-a',
service_account_scopes=["https://www.googleapis.com/auth/sqlservice.admin"],
properties={"hive:hive.metastore.warehouse.dir":BUCKET+"/hive-warehouse"},
metadata={"additional-cloud-sql-instances":instance_name+"=tcp:3307","enable-cloud-sql-hive-metastore":"false"},
image_version="1.2"
)
sqoop_inc_import = BashOperator(
task_id= 'sqoop_incremental_import',
bash_command="bash /home/airflow/gcs/plugins/sqoop-table-imports.sh ephemeral-spark-cluster",
dag=dag
)
bq_load_flight_delays = GoogleCloudStorageToBigQueryOperator(
task_id = "bq_load_flight_delays",
bucket=BUCKET,
source_objects=["sqoop_output/*.avro"],
destination_project_dataset_table=PROJECT_ID+".data_analysis.flights_delays",
autodetect=True,
source_format="AVRO",
create_disposition="CREATE_IF_NEEDED",
skip_leading_rows=0,
write_disposition="WRITE_APPEND",
max_bad_records=0
)
delete_cluster = DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name="ephemeral-spark-cluster",
region='us-central1',
trigger_rule=TriggerRule.ALL_DONE
)
create_cluster.dag = dag
create_cluster.set_downstream(sqoop_inc_import)
sqoop_inc_import.set_downstream(bq_load_flight_delays)
bq_load_delays_by_distance.set_downstream(delete_cluster)
Create Airflow environment and upload Bash Script
The next step is to first create a new cloud composer environment, just look for a composer it is fairly simple to set it up.
Once you have your environment running click on DAGs icon. This will take you to the GCS bucket associated with the airflow environment.

Upload your Airflow Dag in this folder.

Go back one level and find the plugins folder this is where you need to upload, your bash script.

Once you have uploaded your DAG you are ready to run the pipeline.

The first step will create the dataproc cluster.

At the end you will be able to see your data in Bigquery.

No comments yet.
Add your comment