Skip to content

Execute queries in parallel against Snowflake using AWS Glue

Recently, I was working with a customer that wanted to pull large amounts of data from multiple databases hosted in many locations into Amazon S3. Running glue jobs on these data sources in parallel required crawling these databases—something we deemed not feasible. In most instances, we also needed to execute a specific query containing filters and joins. In other words, the solution had to be flexible, reliable, and scalable. I had to figure out how to address reading the data in parallel using only AWS serverless services to keep job runs short and cost manageable.

AWS Glue 2.0 was announced about a year ago and delivers on its promise to be the serverless solution for batch processing in both forms of ETL and ELT. Glue excels at loading entire tables and allows the ability to leverage Spark processing, functions, and SQL queries. This capability makes it suitable for large scale transformations without needing to manage any infrastructure.

But in most cases, especially in a change data capture scenario, we only need to return a subset of a table, especially when that table contains billions of records. This would not only ease the load on the source database but would also save on job run time and consequently cost.

While glue offers a native push_down_predicate parameter, it is reserved for reading from tables available in the Glue catalogue. Here is an example using the push_down_predicate parameter:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "testdata", table_name = "sampletable", transformation_ctx = "datasource0",push_down_predicate = "(product_category == 'Video')")

In this article, I will be presenting an approach, in Glue 2.0, for running a query against a table in parallel using a custom Snowflake connection.

We will be modularly building the glue job following these steps:

  1. Instantiate the Spark environment
  2. Configure a connection to Snowflake
  3. Execute the query
  4. Parallelize reads using a partition column
  5. Write to Amazon S3

Instantiate the Spark environment

We first start by importing some necessary libraries:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

from awsglue.utils import getResolvedOptions

After that we explicitly configure a couple of our Spark environment parameters:


#Configuring spark Job run environment
conf = SparkConf() 

#When writing in Parquet to s3, we enable the optimization committer, read more here
conf.set("spark.sql.parquet.fs.optimized.committer.optimization-enabled", "true")

## Using the EMRFS S3-optimized committer version 2
conf.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")

#Instanciating the spark and gluecontext
sc = SparkContext("local", "Read Snowflake", conf=conf)
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Read Snowflake')

Configure a connection to Snowflake

To connect to a database that is not supported natively by Glue (supported db list), we must make a compatible JDBC connector and other necessary jar files available for the Glue Job at runtime. Unlike Glue Databrew and Glue Studio, we do not need to create a connector or connection ahead of time. We will instead instantiate the connection inside the job.

To connect to a Snowflake warehouse from Glue 2.0, two jar files are needed: A JDBC connector and a Spark connector. I ended up using the snowflake-jdbc-3.2.4 connector, which can be downloaded from the Maven repo (complete list here), and the spark-snowflake_2.11-2.2.6 connector, which can be found here (complete list here).

After downloading the jar files, we need to upload them to an S3 bucket that the Glue Job can access. This can be achieved by adding the S3 paths of both jar files to the “Dependent jars paths” separated by a comma like so:

The assumption here is that you already have a secret created in AWS Secrets Manager and know how to retrieve the necessary values in the Glue Job. The required values are:

## Here I am using the Snowflake test database Snowflake_Sample_Data to execute my query

sfURL = "yoursnowflakeaccountname.snowflakecomputing.com"
sfUser = "yourdbusername"
sfPassword = "yourdbpassword"
sfDatabase = "SNOWFLAKE_SAMPLE_DATA"
sfSchema = "TPCH_SF1000"
sfWarehouse = "COMPUTE_WH"

sfOptions = {
"sfURL" : sfURL,
"sfUser" : sfUser,
"sfPassword" : sfPassword,
"sfDatabase" : sfDatabase,
"sfSchema" : sfSchema,
"sfWarehouse" : sfWarehouse,
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

sql_query = "select * from Customer where 1=1 limit 100000" #we only want to return 100k rows

Execute the query

reading_data_frame = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
                               .options(**sfOptions) \
                               .option("query",  sql_query) \
                               .load()

This is all great so far, but the problem is that we would only be leveraging one connection to the database and therefore only utilized one executor in the Glue Job. After all, the whole appeal of using Glue is scalability without worrying about infrastructure.

Parallelize reads using a partition column

To utilize all the available executors for our job and read data in parallel, we need to setup a few options for our spark reader API:

  • partitionColumn is the name of a column used for partitioning. It must be of integer, date, or timestamp type. The best partition columns are ID, primary key, row number, or any column that is as evenly distributed as possible.
  • lowerBound is the minimum value of the ‘partitionColumn’ within the query being executed.
  • upperBound is the maximum value of the ‘partitionColumn’ within the query being executed.
  • numPartitions is the maximum number of partitions that can be used for table parallelism read and consequently when writing to S3. This will also be the number of concurrent connections to the database.
    • When setting this up, we ideally want one partition per executor. I use the number of DPUs as a guide with each having two executors. One executor will be subtracted to be the driver and the number of partitions ends up being: (number_of_DPUs*2) – 1
    • If you are running a 10 DPU job, you will end up with 19 partitions, connections to the database, and files written to S3.

In the Customer table we are using in this example, we will be using c_CUSTKEY column, which is a unique key column. We need to first get the column’s lowerBound and upperBound, then use those two values to execute the query in parallel.


partition_column = "c_CUSTKEY"

num_of_partitions = (number_of_DPUs*2)-1
# Querying partition column lowebound
lowebound_df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", f" SELECT min({partition_column}) as min_lowerbound FROM ({sql_query}) AS inner_query")  \
    .load()
min_lowerbound = lowebound_df.collect()[0]['MIN_LOWERBOUND']
logger.info(f"min_lowerbound is {min_lowerbound}")


# Querying partition column upperbound
upperbound_df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", f" SELECT max({partition_column}) as max_upperbound FROM ({sql_query}) AS inner_query") \
    .load()
max_upperbound = upperbound_df.collect()[0]['MAX_UPPERBOUND']
logger.info(f"max_upperbound is {max_upperbound}")


# Reading data using the sql query parameter, partition column with lowerbound and upperbound
reading_data_frame = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", sql_query) \
    .option("partitionColumn", partition_column) \
    .option("lowerBound", min_lowerbound) \
    .option("upperBound", max_upperbound) \
    .option("numPartitions", num_of_partitions) \
    .load()

Write to S3

Because data was read in parallel, and unless we use spark transformation functions like coalesce or repartition, we will notice little intra-node communication in the Glue Job metrics page. Assuming the partitionColumn is evenly distributed, the written files should be almost identical in size. We will be using the Spark writer API and use mode overwrite, other options are ErrorIfExistsappend, and ignore. Read more about spark writing modes here.

3_output_path = “s3://bucket_name/path_to_writing/”

reading_data_frame.write.mode('overwrite').parquet(s3_output_path)

Putting it all together

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

from awsglue.utils import getResolvedOptions


#Configuring spark Job run environment
conf = SparkConf() 

#When writing in Parquet to s3, we enable the optimization committer, read more here
conf.set("spark.sql.parquet.fs.optimized.committer.optimization-enabled", "true")

## Using the EMRFS S3-optimized committer version 2
conf.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")

#Instanciating the spark and gluecontext
sc = SparkContext("local", "Read Snowflake", conf=conf)
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Read Snowflake')

## Here I am using the Snowflake test database Snowflake_Sample_Data to execute my query

sfURL = "youraccountname.snowflakecomputing.com"
sfUser = "yourdbusername"
sfPassword = "yourdbpassword"
sfDatabase = "SNOWFLAKE_SAMPLE_DATA"
sfSchema = "TPCH_SF1000"
sfWarehouse = "COMPUTE_WH"

sfOptions = {
"sfURL" : sfURL,
"sfUser" : sfUser,
"sfPassword" : sfPassword,
"sfDatabase" : sfDatabase,
"sfSchema" : sfSchema,
"sfWarehouse" : sfWarehouse,
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

sql_query = "select * from Customer where 1=1 limit 100000" #we only want to return 100k rows



partition_column = "c_CUSTKEY"

num_of_partitions = (number_of_DPUs*2)-1
# Querying partition column lowebound
lowebound_df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", f" SELECT min({partition_column}) as min_lowerbound FROM ({sql_query}) AS inner_query")  \
    .load()
min_lowerbound = lowebound_df.collect()[0]['MIN_LOWERBOUND']
logger.info(f"min_lowerbound is {min_lowerbound}")


# Querying partition column upperbound
upperbound_df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", f" SELECT max({partition_column}) as max_upperbound FROM ({sql_query}) AS inner_query") \
    .load()
max_upperbound = upperbound_df.collect()[0]['MAX_UPPERBOUND']
logger.info(f"max_upperbound is {max_upperbound}")


# Reading data using the sql query parameter, partition column with lowerbound and upperbound
reading_data_frame = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", sql_query) \
    .option("partitionColumn", partition_column) \
    .option("lowerBound", min_lowerbound) \
    .option("upperBound", max_upperbound) \
    .option("numPartitions", num_of_partitions) \
    .load()


#Writing data to s3 path 

3_output_path = “s3://bucket_name/path_to_writing/”

reading_data_frame.write.mode('overwrite').parquet(s3_output_path)

Using this approach, you can pass all or some of the parameters used in the job from a step function and run as many of these jobs as you need.

Glue Studio also offers a way to run queries against custom connections but as of the time of writing this blog post, it is not possible to run Glue Studio jobs from AWS Step Functions or pass the query dynamically. I will, nonetheless, be covering the Glue Studio approach in a following blog post and compare the two approaches.

If you feel the process is too complex and could use extra help, feel free to contact an AWS expert here at 1Strategy. Reach out to us at info@1strategy.com today, we would love to discuss how we can help you through this, and many more cloud-related journeys!

Categories

Categories