Parallel Processing Large File in Python
Learn various techniques to reduce data processing time by using multiprocessing, joblib, and tqdm concurrent.
Image by Author
For parallel processing, we divide our task into sub-units. It increases the number of jobs processed by the program and reduces overall processing time.
For example, if you are working with a large CSV file and you want to modify a single column. We will feed the data as an array to the function, and it will parallel process multiple values at once based on the number of available workers. These workers are based on the number of cores within your processor.
Note: using parallel processing on a smaller dataset will not improve processing time.
In this blog, we will learn how to reduce processing time on large files using multiprocessing, joblib, and tqdm Python packages. It is a simple tutorial that can apply to any file, database, image, video, and audio.
Note: we are using the Kaggle notebook for the experiments. The processing time can vary from machine to machine.
Getting Started
We will be using the US Accidents (2016 - 2021) dataset from Kaggle which consists of 2.8 million records and 47 columns.
We will import multiprocessing
, joblib
, and tqdm
for parallel processing, pandas
for data ingestions, and re
, nltk
, and string
for text processing.
# Parallel Computing import multiprocessing as mp from joblib import Parallel, delayed from tqdm.notebook import tqdm # Data Ingestion import pandas as pd # Text Processing import re from nltk.corpus import stopwords import string
Before we jump right in, let's set n_workers
by doubling cpu_count()
. As you can see, we have 8 workers.
n_workers = 2 * mp.cpu_count() print(f"{n_workers} workers are available") >>> 8 workers are available
In the next step, we will ingest large CSV files using the pandas read_csv
function. Then, print out the shape of the dataframe, the name of the columns, and the processing time.
Note: Jupyter’s magic function
%%time
can display CPU times and wall time at the end of the process.
%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")
Output
Shape:(2845342, 47) Column Names: Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street', 'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone', 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s Wall time: 46.9 s
Cleaning the Text
The clean_text
is a straightforward function for processing and cleaning the text. We will get English stopwords using nltk.copus
the use it to filter out stop words from the text line. After that, we will remove special characters and extra spaces from the sentence. It will be the baseline function to determine processing time for serial, parallel, and batch processing.
def clean_text(text): # Remove stop words stops = stopwords.words("english") text = " ".join([word for word in text.split() if word not in stops]) # Remove Special Characters text = text.translate(str.maketrans('', '', string.punctuation)) # removing the extra spaces text = re.sub(' +',' ', text) return text
Serial Processing
For serial processing, we can use the pandas .apply()
function, but if you want to see the progress bar, you need to activate tqdm for pandas and then use the .progress_apply()
function.
We are going to process the 2.8 million records and save the result back to the “Description” column column.
%%time tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)
Output
It took 9 minutes and 5 seconds for the high-end processor to serial process 2.8 million rows.
100% 2845342/2845342 [09:05<00:00, 5724.25it/s] CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s Wall time: 9min 5s
Multiprocessing
There are various ways to parallel process the file, and we are going to learn about all of them. The multiprocessing
is a built-in python package that is commonly used for parallel processing large files.
We will create a multiprocessing Pool with 8 workers and use the map function to initiate the process. To display progress bars, we are using tqdm.
The map function consists of two sections. The first requires the function, and the second requires an argument or list of arguments.
Learn more by reading documentation.
%%time p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))
Output
We have improved our processing time by almost 3X. The processing time dropped from 9 minutes 5 seconds to 3 minutes 51 seconds.
100% 2845342/2845342 [02:58<00:00, 135646.12it/s] CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s Wall time: 3min 51s
Parallel
We will now learn about another Python package to perform parallel processing. In this section, we will use joblib’s Parallel and delayed to replicate the map function.
- The Parallel requires two arguments: n_jobs = 8 and backend = multiprocessing.
- Then, we will add clean_text to the delayed function.
- Create a loop to feed a single value at a time.
The process below is quite generic, and you can modify your function and array according to your needs. I have used it to process thousands of audio and video files without any issue.
Recommended: add exception handling using try:
and except:
def text_parallel_clean(array): result = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(clean_text) (text) for text in tqdm(array) ) return result
Add the “Description” column to text_parallel_clean()
.
%%time df['Description'] = text_parallel_clean(df['Description'])
Output
It took our function 13 seconds more than multiprocessing the Pool. Even then, Parallel is 4 minutes and 59 seconds faster than serial processing.
100% 2845342/2845342 [04:03<00:00, 10514.98it/s] CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s Wall time: 4min 4s
Parallel Batch Processing
There is a better way to process large files by splitting them into batches and processing them parallel. Let’s start by creating a batch function that will run a clean_function
on a single batch of values.
Batch Processing Function
def proc_batch(batch): return [ clean_text(text) for text in batch ]
Splitting the File into Batches
The function below will split the file into multiple batches based on the number of workers. In our case, we get 8 batches.
def batch_file(array,n_workers): file_len = len(array) batch_size = round(file_len / n_workers) batches = [ array[ix:ix+batch_size] for ix in tqdm(range(0, file_len, batch_size)) ] return batches batches = batch_file(df['Description'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]
Running Parallel Batch Processing
Finally, we will use Parallel and delayed to process batches.
Note: To get a single array of values, we have to run list comprehension as shown below.
%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(proc_batch) (batch) for batch in tqdm(batches) ) df['Description'] = [j for i in batch_output for j in i]