Oops! Something went wrong while submitting the form.
We use cookies to improve your browsing experience on our website, to show you personalised content and to analize our website traffic. By browsing our website, you consent to our use of cookies. Read privacy policy.
The amount of data in our world has been exploding exponentially day by day. Processing and analyzing this Big Data has become key in the current age to make informed, data-driven decisions. Spark is a unified distributed data processing engine used for Big Data. Spark can be used to process Big Data in an efficient manner. Spark lets you process Big Data faster by splitting the work into chunks and assigning those chunks to computation resources across nodes. It can handle up to petabytes of data, which is millions of gigabytes of data. It processes all its data in memory, which makes it faster.
We talked about processing Big Data in Spark, but we know spark doesn’t store any data like other file systems. So, to process data in Spark, we must read data from different data sources, clean or process the data, and again store this data in one of the target data sources. Data sources can be files, APIs, databases, or streams.
Database management systems have been present for a decade. Many applications generate huge amounts of data and store data in database management systems. And a lot of times, we need to connect spark to the database and process that data.
In this blog, we are going to discuss how to use spark to read from and write to databases in parallel. Our focus will be on reading/writing data from/to the database using different methods, which will help us read/write TeraBytes of data in an efficient manner.
Reading / Writing data from/to Database using Spark:
To read data or write data from/to the database, we will need to perform a few basic steps regardless of any programming language or framework we are using. What follows is an overview of the steps to read data from databases.
Step 1: Register Driver or Use Connector
Get the respective driver of your database and register the driver, or use the connector to connect to the database.
Step 2: Make a connection
Next, the driver or connector makes a connection to the database.
Step 3: Run query statement
Using the connection created in the previous step, execute the query, which will return the result.
Step 4: Process result
For the result, we got in the previous step, process it as per your requirement.
This dataset contains details of COVID patients across all states. It has different information such as State, Confirmed, Recovered, Deceased, Other, Tested, and Date.
You can load this dataset in any of the databases you work with and can try out the entire discussion practically.
The following image shows ten records of the entire dataset.
Spark provides an API to read data from a database and is very simple to use. First of all, we will need to create a Spark session. Then add the driver to Spark. It can be added through the program itself, or we can add it using shell also.
The first line of code imports the SparkSession class. This is the entry point to programming Spark with the Dataset and DataFrame API
From the fifth to the ninth line of the above code, we are creating a spark session on a local system with four cores, which will be used for interaction with our spark application. We specify the name for our application using appName(), which in our case, is 'Databases.' This app name will be shown on Webb UI for our cluster. Next, we can specify any configurations for the spark application using config(). In our case, we have specified the configuration of the driver for the Postgres database, which will be used to create a connection with the Postgres database. You can specify the driver of any of the available databases.
To connect to the database, we must have a hostname, port, database name, username, and password with us. Those details are in 10 through 16 lines of the above code.
Refer to the code lines from 19 to 28 in the above snippet. Up until now, we have had our Spark session and all the information that we need to connect to the database. Using the Spark Read API, we read the data from the database. This will create a connection to the Postgres database from one of the cores that we have allocated for the Spark application. And using this connection, it will read the data into the table_data_df dataframe. Even if we have multiple cores for our application, it will still create only one connection from one of the cores. The rest of the cores will not be utilized. While we will discuss how to utilize all cores, our first focus is here.
Refer to the code lines from 29 to 38 in the above snippet. We have the data now, so let's try to write it to the database. Using the Spark Write API, we will write data to the database. This will also create only one connection to the database from one of the cores that we have allocated for the Spark application. Even if we have more cores for the application, it still uses only one core with the above code.
As promised in the last section, we will discuss how we can optimize for resource utilization. In the last section, we had only one connection, utilizing very limited resources and causing resources to be idle or unused. To get over this, the Spark Read and Write API has a way by providing a few extra attributes. And those are partitionColumn, lowerBound, upperBound. These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. For each partition, there will be an individual core with its own connection performing the reads or writes. Thus, making the database operation in parallel.
This is an efficient way of reading and writing data from databases in spark rather than just doing it with one partition.
Partitions are decided by the Spark API in the following way.
Let’s consider an example where:
lowerBound: 0
upperBound: 1000
numPartitions: 10
Stride is equal to 100, and partitions correspond to the following queries:
SELECT * FROM table WHERE partitionColumn BETWEEN 0 AND 100
SELECT * FROM table WHERE partitionColumn BETWEEN 100 AND 200
...
...
SELECT * FROM table WHERE partitionColumn > 9000
BETWEEN here is exclusive on the upper bound.
Now we have data in multiple partitions. Each executor can have one or more partitions based on cluster configuration. Suppose we have 10 cores and 10 partitions. One partition of data can be fetched from one executor using one core. So, 10 partitions of data can be fetched from 10 executors. Each of these executors will create the connection to the database and will read the data.
Note- lowerbound and upperbound does not filter the data. It just helps spark to decide the stride of data.
partitionColumn must be a numeric, date, or timestamp column from the table
Also, there are some attributes that can be used during the write operation to optimize the write operation. One of the attributes is “batchsize”. The JDBC batch size, which determines how many rows to insert per round trip. This can help the performance of JDBC drivers. This option applies only to writing. One more attribute called “truncate” can be helpful to optimize the write operation. This is a JDBC writer-related option. When SaveMode.Overwrite is enabled, it causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient and prevents the table metadata (e.g., indices) from being removed.
We have seen how to read and write data in Spark. Spark is not the only way to connect with databases, right? There are multiple ways we can access databases and try to achieve parallel read-writes. We will discuss this in further sections. We will mainly focus on reading and writing it from python.
To integrate Postgres with Python, we have different libraries or adopters that we can use. But Psycopg is the widely used adopter. First off all, you will need to install the Psycopg2 library. Psycopg2 is a slightly updated version of the Psycopg adapter. You install it using pip or any way you are comfortable with.
To connect with the Postgres database, we need hostname, port, database name, username, and password. We are storing all these details as attributes in class. The create connection method will form a connection with the Postgres database using the connect() method of psycopg2 module. This method will return a connection object. In the read method, we call this connection method and get a connection object. Using this connection object, we create a cursor. This cursor is bound to have a connection with the database for its lifetime and execute all the commands or queries on the database. Using this query object, we execute a read query on the database. Then the data returned by the executing read query can be fetched using the fetchall() method. Then we close the connection.
To run the program, we have specified details of database and query. Next, we create an object of PostgresDbClient and call the read method from class PostgresDbClient. This read method will return as data and we are converting this data into relational format using pandas.
This implementation is very straightforward: this program creates one process in our system and fetches all the data using system resources, CPU, memory, etc. The drawback of this approach is that suppose this program uses 30 percent CPU and memory resources out of 100%, then the remaining 70% of resources are idle. We can maximize this usage by other means like multithreading or multiprocessing.
In the previous section, we discussed the drawback of a single process and single-thread implementation. Let's get started with how to maximize resource usage. Before getting into multithreading, let’s understand a few basic but important concepts.
What is a process?
When you execute any program, the operating system loads it in memory and then starts executing the program. This instance of the program being executed is called a process. Computing and memory resources are associated with each process separately.
What is a thread?
A thread is a sequential flow of execution. A process is also a thread. Usually, the process is called a main thread. Unlike a process, the same computing and memory resources can be shared with multiple threads.
What is multithreading?
This is when a process has multiple threads, along with the main thread, and these threads run independently but concurrently using the same computing and memory resources associated with the process. Such a program is called a multithreaded program or process. Multithreading uses resources very efficiently, which results in maximizing performance.
What is multiprocessing?
When multiple processes run independently, with separate resources associated with each process, it is called multiprocessing. Multiprocessing is achieved with multiple processors running separate processes on each processor.
Let's get back to our program. Here you can see we have a connection and read method. These two methods are exactly the same as from the previous section. Here, we have one new function, which is get_thread(). Be careful, as a method belongs to the class, and afunction, it is not part of this class. So, this get_thred() function is global and acts as a wrapper function for calling the read method from the class PostgresDbClient. This is because we can’t create threads using class methods. Don't get confused if you don't understand it, as it is just how we write the code.
To run the program, we have specified the Postgres database details and queries. In the previous approach, we fetched all the data from the table with one thread only. In this approach, the plan is to fetch one day of data using one thread so that we can maximize resource utilization. Here, each query reads one day’s worth of data from the table using one thread. Having 5 queries will fetch 5 days of data, and 5 threads will be running concurrently.
To create a thread in Python, we will need to use the Thread() method from the threading library. We need to pass the function that we want to run and arguments of that function. The thread() object will create a new thread and return its object. The thread has been created but has not yet started. To start this thread, we will need to use the start() method. In our program, we are starting 5 threads. If you try executing this entire program multiple times, you will end up with different results. Some data will fetch prior, and some will fetch later. And at the time of the next execution, this order will be different again. This is because resource handling is done by the operating system. Depending on what the OS thinks about which thread to give what resources, the output is generated. If you want to know how this is done, you will need to go deep into operating systems concepts.
In our use case, we are just printing the data to the console. To store the data, there are multiple ways. One simple way is to define the global variable and store the result in it, but we will need to achieve synchronization as multiple threads might access the global variable, which can lead to race conditions. Another way is to extend the thread class to your custom class, and you can define a class variable—and you can use this variable to save the data. Again, here, you will need to make sure you are achieving synchronization.
So, whenever you want to store the data in a variable by any available method, you will need to achieve synchronization. So, synchronization will lead to the sequential execution of threads. And this sequential processing is not what we are looking for. To avoid synchronization, we can directly write the data to the target—so that when the thread reads the data, the same thread will write data again back to the target database. This way, we can avoid synchronization and store the data in the database for future use. This function can look as below, where db_client.write(data) is a function that writes the data to a database.
def get_thread(thread_id, db_client, query):
print(f"Starting thread id {thread_id}")
data = pd.DataFrame(db_client.read(query))
print(f"Thread {thread_id} data ", data, sep="\n")
Note that in this blog, we have used a password as a hardcoded string, which is definitely not the way to define passwords. We should use secrets, .env files, etc., as input for passwords. We do not hardcode passwords in the production environment.
Conclusion
After going through the above blog, you might have gotten more familiar with how to perform read and write operations on databases using spark, python, and multithreading concepts. You also know now what are multi processes and what multithreading is. You are now also able to analyze the best way to carry out read-and-write operations on a database based on your requirements.
In general, if you have a small amount of data, you can use a simple python approach to read and write data. If you have a relatively high amount of data, then you can use a multi-threaded approach or a single-partition Spark approach. If you have a huge amount of data, and where reading millions of records per second is a requirement, then you can use the Spark multi-partition approach. In the end, it's just mostly personal preference, and using which approach depends on your requirements and availability of resources.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Parallelizing Heavy Read and Write Queries to SQL Datastores using Spark and more!
The amount of data in our world has been exploding exponentially day by day. Processing and analyzing this Big Data has become key in the current age to make informed, data-driven decisions. Spark is a unified distributed data processing engine used for Big Data. Spark can be used to process Big Data in an efficient manner. Spark lets you process Big Data faster by splitting the work into chunks and assigning those chunks to computation resources across nodes. It can handle up to petabytes of data, which is millions of gigabytes of data. It processes all its data in memory, which makes it faster.
We talked about processing Big Data in Spark, but we know spark doesn’t store any data like other file systems. So, to process data in Spark, we must read data from different data sources, clean or process the data, and again store this data in one of the target data sources. Data sources can be files, APIs, databases, or streams.
Database management systems have been present for a decade. Many applications generate huge amounts of data and store data in database management systems. And a lot of times, we need to connect spark to the database and process that data.
In this blog, we are going to discuss how to use spark to read from and write to databases in parallel. Our focus will be on reading/writing data from/to the database using different methods, which will help us read/write TeraBytes of data in an efficient manner.
Reading / Writing data from/to Database using Spark:
To read data or write data from/to the database, we will need to perform a few basic steps regardless of any programming language or framework we are using. What follows is an overview of the steps to read data from databases.
Step 1: Register Driver or Use Connector
Get the respective driver of your database and register the driver, or use the connector to connect to the database.
Step 2: Make a connection
Next, the driver or connector makes a connection to the database.
Step 3: Run query statement
Using the connection created in the previous step, execute the query, which will return the result.
Step 4: Process result
For the result, we got in the previous step, process it as per your requirement.
This dataset contains details of COVID patients across all states. It has different information such as State, Confirmed, Recovered, Deceased, Other, Tested, and Date.
You can load this dataset in any of the databases you work with and can try out the entire discussion practically.
The following image shows ten records of the entire dataset.
Spark provides an API to read data from a database and is very simple to use. First of all, we will need to create a Spark session. Then add the driver to Spark. It can be added through the program itself, or we can add it using shell also.
The first line of code imports the SparkSession class. This is the entry point to programming Spark with the Dataset and DataFrame API
From the fifth to the ninth line of the above code, we are creating a spark session on a local system with four cores, which will be used for interaction with our spark application. We specify the name for our application using appName(), which in our case, is 'Databases.' This app name will be shown on Webb UI for our cluster. Next, we can specify any configurations for the spark application using config(). In our case, we have specified the configuration of the driver for the Postgres database, which will be used to create a connection with the Postgres database. You can specify the driver of any of the available databases.
To connect to the database, we must have a hostname, port, database name, username, and password with us. Those details are in 10 through 16 lines of the above code.
Refer to the code lines from 19 to 28 in the above snippet. Up until now, we have had our Spark session and all the information that we need to connect to the database. Using the Spark Read API, we read the data from the database. This will create a connection to the Postgres database from one of the cores that we have allocated for the Spark application. And using this connection, it will read the data into the table_data_df dataframe. Even if we have multiple cores for our application, it will still create only one connection from one of the cores. The rest of the cores will not be utilized. While we will discuss how to utilize all cores, our first focus is here.
Refer to the code lines from 29 to 38 in the above snippet. We have the data now, so let's try to write it to the database. Using the Spark Write API, we will write data to the database. This will also create only one connection to the database from one of the cores that we have allocated for the Spark application. Even if we have more cores for the application, it still uses only one core with the above code.
As promised in the last section, we will discuss how we can optimize for resource utilization. In the last section, we had only one connection, utilizing very limited resources and causing resources to be idle or unused. To get over this, the Spark Read and Write API has a way by providing a few extra attributes. And those are partitionColumn, lowerBound, upperBound. These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. For each partition, there will be an individual core with its own connection performing the reads or writes. Thus, making the database operation in parallel.
This is an efficient way of reading and writing data from databases in spark rather than just doing it with one partition.
Partitions are decided by the Spark API in the following way.
Let’s consider an example where:
lowerBound: 0
upperBound: 1000
numPartitions: 10
Stride is equal to 100, and partitions correspond to the following queries:
SELECT * FROM table WHERE partitionColumn BETWEEN 0 AND 100
SELECT * FROM table WHERE partitionColumn BETWEEN 100 AND 200
...
...
SELECT * FROM table WHERE partitionColumn > 9000
BETWEEN here is exclusive on the upper bound.
Now we have data in multiple partitions. Each executor can have one or more partitions based on cluster configuration. Suppose we have 10 cores and 10 partitions. One partition of data can be fetched from one executor using one core. So, 10 partitions of data can be fetched from 10 executors. Each of these executors will create the connection to the database and will read the data.
Note- lowerbound and upperbound does not filter the data. It just helps spark to decide the stride of data.
partitionColumn must be a numeric, date, or timestamp column from the table
Also, there are some attributes that can be used during the write operation to optimize the write operation. One of the attributes is “batchsize”. The JDBC batch size, which determines how many rows to insert per round trip. This can help the performance of JDBC drivers. This option applies only to writing. One more attribute called “truncate” can be helpful to optimize the write operation. This is a JDBC writer-related option. When SaveMode.Overwrite is enabled, it causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient and prevents the table metadata (e.g., indices) from being removed.
We have seen how to read and write data in Spark. Spark is not the only way to connect with databases, right? There are multiple ways we can access databases and try to achieve parallel read-writes. We will discuss this in further sections. We will mainly focus on reading and writing it from python.
To integrate Postgres with Python, we have different libraries or adopters that we can use. But Psycopg is the widely used adopter. First off all, you will need to install the Psycopg2 library. Psycopg2 is a slightly updated version of the Psycopg adapter. You install it using pip or any way you are comfortable with.
To connect with the Postgres database, we need hostname, port, database name, username, and password. We are storing all these details as attributes in class. The create connection method will form a connection with the Postgres database using the connect() method of psycopg2 module. This method will return a connection object. In the read method, we call this connection method and get a connection object. Using this connection object, we create a cursor. This cursor is bound to have a connection with the database for its lifetime and execute all the commands or queries on the database. Using this query object, we execute a read query on the database. Then the data returned by the executing read query can be fetched using the fetchall() method. Then we close the connection.
To run the program, we have specified details of database and query. Next, we create an object of PostgresDbClient and call the read method from class PostgresDbClient. This read method will return as data and we are converting this data into relational format using pandas.
This implementation is very straightforward: this program creates one process in our system and fetches all the data using system resources, CPU, memory, etc. The drawback of this approach is that suppose this program uses 30 percent CPU and memory resources out of 100%, then the remaining 70% of resources are idle. We can maximize this usage by other means like multithreading or multiprocessing.
In the previous section, we discussed the drawback of a single process and single-thread implementation. Let's get started with how to maximize resource usage. Before getting into multithreading, let’s understand a few basic but important concepts.
What is a process?
When you execute any program, the operating system loads it in memory and then starts executing the program. This instance of the program being executed is called a process. Computing and memory resources are associated with each process separately.
What is a thread?
A thread is a sequential flow of execution. A process is also a thread. Usually, the process is called a main thread. Unlike a process, the same computing and memory resources can be shared with multiple threads.
What is multithreading?
This is when a process has multiple threads, along with the main thread, and these threads run independently but concurrently using the same computing and memory resources associated with the process. Such a program is called a multithreaded program or process. Multithreading uses resources very efficiently, which results in maximizing performance.
What is multiprocessing?
When multiple processes run independently, with separate resources associated with each process, it is called multiprocessing. Multiprocessing is achieved with multiple processors running separate processes on each processor.
Let's get back to our program. Here you can see we have a connection and read method. These two methods are exactly the same as from the previous section. Here, we have one new function, which is get_thread(). Be careful, as a method belongs to the class, and afunction, it is not part of this class. So, this get_thred() function is global and acts as a wrapper function for calling the read method from the class PostgresDbClient. This is because we can’t create threads using class methods. Don't get confused if you don't understand it, as it is just how we write the code.
To run the program, we have specified the Postgres database details and queries. In the previous approach, we fetched all the data from the table with one thread only. In this approach, the plan is to fetch one day of data using one thread so that we can maximize resource utilization. Here, each query reads one day’s worth of data from the table using one thread. Having 5 queries will fetch 5 days of data, and 5 threads will be running concurrently.
To create a thread in Python, we will need to use the Thread() method from the threading library. We need to pass the function that we want to run and arguments of that function. The thread() object will create a new thread and return its object. The thread has been created but has not yet started. To start this thread, we will need to use the start() method. In our program, we are starting 5 threads. If you try executing this entire program multiple times, you will end up with different results. Some data will fetch prior, and some will fetch later. And at the time of the next execution, this order will be different again. This is because resource handling is done by the operating system. Depending on what the OS thinks about which thread to give what resources, the output is generated. If you want to know how this is done, you will need to go deep into operating systems concepts.
In our use case, we are just printing the data to the console. To store the data, there are multiple ways. One simple way is to define the global variable and store the result in it, but we will need to achieve synchronization as multiple threads might access the global variable, which can lead to race conditions. Another way is to extend the thread class to your custom class, and you can define a class variable—and you can use this variable to save the data. Again, here, you will need to make sure you are achieving synchronization.
So, whenever you want to store the data in a variable by any available method, you will need to achieve synchronization. So, synchronization will lead to the sequential execution of threads. And this sequential processing is not what we are looking for. To avoid synchronization, we can directly write the data to the target—so that when the thread reads the data, the same thread will write data again back to the target database. This way, we can avoid synchronization and store the data in the database for future use. This function can look as below, where db_client.write(data) is a function that writes the data to a database.
def get_thread(thread_id, db_client, query):
print(f"Starting thread id {thread_id}")
data = pd.DataFrame(db_client.read(query))
print(f"Thread {thread_id} data ", data, sep="\n")
Note that in this blog, we have used a password as a hardcoded string, which is definitely not the way to define passwords. We should use secrets, .env files, etc., as input for passwords. We do not hardcode passwords in the production environment.
Conclusion
After going through the above blog, you might have gotten more familiar with how to perform read and write operations on databases using spark, python, and multithreading concepts. You also know now what are multi processes and what multithreading is. You are now also able to analyze the best way to carry out read-and-write operations on a database based on your requirements.
In general, if you have a small amount of data, you can use a simple python approach to read and write data. If you have a relatively high amount of data, then you can use a multi-threaded approach or a single-partition Spark approach. If you have a huge amount of data, and where reading millions of records per second is a requirement, then you can use the Spark multi-partition approach. In the end, it's just mostly personal preference, and using which approach depends on your requirements and availability of resources.
Velotio Technologies is an outsourced software product development partner for top technology startups and enterprises. We partner with companies to design, develop, and scale their products. Our work has been featured on TechCrunch, Product Hunt and more.
We have partnered with our customers to built 90+ transformational products in areas of edge computing, customer data platforms, exascale storage, cloud-native platforms, chatbots, clinical trials, healthcare and investment banking.
Since our founding in 2016, our team has completed more than 90 projects with 220+ employees across the following areas:
Building web/mobile applications
Architecting Cloud infrastructure and Data analytics platforms