8 min read

Creating a data pipeline with Azure and Snowflake

What is Snowflake?

Snowflake is a cloud-based data warehousing platform that enables users to efficiently store, organize, and analyze vast amounts of structured and semi-structured data. It is designed to be highly scalable and flexible, allowing quick and easy access to query data. It offers a comprehensive set of tools for data integration, data management, and data warehousing, and includes advanced features such as data sharing and data exchange, which allows sharing and exchanging data with other organizations or individuals.

What is a data pipeline?

A data pipeline is a set of processes and systems that are used to extract data from various sources, transform the data into a format that is suitable for analysis, and load the data into Snowflake for storage and querying. In the context of this article, a data pipeline may involve extracting data from external sources such as databases, flat files, ERP, CRM and other kind of systems which are located on-prem and not in the cloud. Using some form of ETL (extract, transform, load), the data is transformed and cleaned and then loaded into Snowflake for storage and querying using SQL. Snowflake also provides a range of tools and features to help with building and managing data pipelines.

Prequisites

We need a couple of things before we get started, the Github repo is also available here.

Required

Optional

  • Snowsql to interact with your snowflake account from your pc

Defining the Azure infrastructure as code

To create a blob storage container with the current az cli user as a Storage Blob Data Contributor, create the following two terraform files

hcl
main.tf
terraform {
required_providers {
azurerm = {
source = "hashicorp/azurerm"
version = "3.30.0"
}
}
}
provider "azurerm" {
features {}
skip_provider_registration = true
}
resource "azurerm_resource_group" "lab" {
name = var.resource_group_name
location = var.azure_region
}
resource "azurerm_storage_account" "lab" {
name = var.azurerm_storage_account_name
resource_group_name = azurerm_resource_group.lab.name
location = azurerm_resource_group.lab.location
account_tier = "Standard"
account_replication_type = "LRS"
}
resource "azurerm_storage_container" "lab" {
name = var.azurerm_storage_container_name
storage_account_name = azurerm_storage_account.lab.name
container_access_type = "private"
}
data "azurerm_client_config" "current" {
}
resource "azurerm_role_assignment" "lab" {
scope = azurerm_storage_account.lab.id
role_definition_name = "Storage Blob Data Contributor"
principal_id = data.azurerm_client_config.current.object_id
}
hcl
vars.tf
variable "azure_region" {
description = "Azure region for all resources."
type = string
default = "eastus2"
}
variable "resource_group_name" {
type = string
}
variable "azurerm_storage_account_name" {
type = string
}
variable "azurerm_storage_container_name" {
type = string
}

then initialize and see if everything is looking ok

bash
terraform init
terraform plan

apply the changes to create the infrastructure

bash
terraform apply

You will be asked what the ressource group, storage account and container name should be called. You are free to choose whatever you would like to call them.

Note down the storage_account's id (it starts with id=) as well as the storage_container's URL (it starts with id=https://). We will use this information in the following steps.

Linking the External staging Snowflake

Now that our staging area is setup from Azure's side. We need the tenant id to register the Azure account with Snowflake, you can use the az cli to get it quick

bash
az account list

use the one listed under your active Azure subscription.

Now we need to authenticate for Azure. To create an Azure storage integration, either sign in on Snowflake and use a worksheet or use snowsql for the following. I'm using a worksheet (when signed in to Snowflake) for the next two Snowflake sql queries and for the rest I will be using snowsql (you can use the .snowsql/config to authenticate with your main account or use the environmental variables/cli for new user).

sql
use role accountadmin; -- only accountadmin has the privilige, can grant this to different roles as needed
CREATE STORAGE INTEGRATION AzureStorage
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'AZURE'
ENABLED = TRUE
AZURE_TENANT_ID = 'your_tenant_id'
STORAGE_ALLOWED_LOCATIONS = ('azure://your_storage_account.blob.core.windows.net/your_storage_container/')

You can specifically limit the paths with STORAGE_ALLOWED_LOCATIONS or block paths with STORAGE_BLOCKED_LOCATIONS. Here use URL from the storage container (that we got from terraform) with azure:// as prefix.

Execute the following query to get the consent url

sql
desc storage integration AzureStorage;

Copy and paste the AZURE_CONSENT_URL in the browser and click Accept when prompted.

Now use the Azure CLI to set the role for the service principal, using the AZURE_MULTI_TENANT_APP_NAME before the underscore. In my case

bash
az ad sp list --display-name "hcttivsnowflakepacint" --query "[].{displayName:displayName, id:id}"

To set the role with the CLI, do

bash
az role assignment create --assignee "id" --role "Storage Blob Data Contributor" --scope "/subscriptions/some_number/resourceGroups/your_resource_group/providers/Microsoft.Storage/storageAccounts/your_storage_account"

where the id comes from the az ad sp list command and the scope is from the storage_account's output we generated earlier with terraform.

You can also alter the allowed locations within the storage container if needed. I only need one location, call it staging

sql
ALTER STORAGE INTEGRATION AzureStorage SET
STORAGE_ALLOWED_LOCATIONS = ('azure://your_storage_account.blob.core.windows.net/your_storage_container/staging')

Setting up Snowflake for loading data

Now we need a role and preferably a new user to use the staging area. We are also creating a new database and schema to load in our data as well as the warehouse which is going to do the compute work, then we are creating the external stage for snowflake to load data.

Note that we are changing roles when issuing commands that requires higher or lower priviliges. This is good practice in my opinion where the user should always try and use a role with the least amount of priviliges.

sql
set my_role = 'rawdataloader';
set my_username = 'loader';
set my_warehouse = 'loaddw';
set my_database = 'mydb';
set my_schema = 'raw';
set my_storage_integration = 'AzureStorage';
set my_external_staging = 'ExternalStaging';
-- set the password
set my_password = 'my_super_secure_password';
begin;
use role securityadmin;
create role if not exists identifier($my_role);
grant role identifier($my_role) to role SYSADMIN;
-- create user
create user if not exists identifier($my_username)
password = $my_password
default_role = $my_role
default_warehouse = $my_warehouse;
grant role identifier($my_role) to user identifier($my_username);
use role sysadmin;
-- create Warehouse
create warehouse if not exists identifier($my_warehouse)
warehouse_size = xsmall
warehouse_type = standard
auto_suspend = 60
auto_resume = true
initially_suspended = true;
-- create database
create database if not exists identifier($my_database);
-- grant warehouse access
grant USAGE
on warehouse identifier($my_warehouse)
to role identifier($my_role);
-- grant database access
grant OWNERSHIP
on database identifier($my_database)
to role identifier($my_role);
commit;
begin;
USE DATABASE identifier($my_database);
-- create schema for data
CREATE SCHEMA IF NOT EXISTS identifier($my_schema);
commit;
begin;
-- grant schema access
grant OWNERSHIP
on schema identifier($my_schema)
to role identifier($my_role);
--grant OWNERSHIP on schema public to role identifier($my_role); we will use another role for the ownership
commit;
begin;
grant create stage on schema identifier($my_schema) to role identifier($my_role);
use role securityadmin;
grant usage on integration identifier($my_storage_integration) to role identifier($my_role);
commit;
begin;
use role identifier($my_ROLE);
USE DATABASE identifier($my_database);
USE SCHEMA identifier($my_SCHEMA);
create stage identifier($my_external_staging)
storage_integration = AzureStorage
url = 'azure://your_storage_account.blob.core.windows.net/your_storage_container/staging';
commit;

To test I can list the files within the stage

sql
list @ExternalStaging;

There should be nothing in there for now. Let's create a random file and upload it to our container. From the command line do the following, entering your newly created storage account and storage container as variables

bash
touch randomfile
export your_storage_account=your_storage_account
export your_storage_container=your_storage_container
az storage blob upload -f randomfile --account-name $your_storage_account --container-name $your_storage_container --auth-mode login --name staging/randomfile

Now again in Snowflake

sql
list @ExternalStaging;

Cool, everything looks good. Let's create our loading file formats. I'm usually using csv and parquet files for loading.

sql
create or replace file format my_csv_format
type = csv
field_delimiter = ','
skip_header = 1
null_if = ('NULL', 'null')
empty_field_as_null = true
compression = AUTO
FIELD_OPTIONALLY_ENCLOSED_BY = '0x22'
;
create or replace file format my_parquet_format
type = parquet;

The csv format I defined to skip the header and the FIELD_OPTIONALLY_ENCLOSED_BY option is that Snowflake know's my data enclosed by the " character.

Creating the data-pipeline from on-premise to Snowflake

In order to use data we are gathering from different kinds of systems on-premise (be it ERP, CRM, PDM systems or flat files on a network). We need some application running on a VM which is collecting the data and pushing the files to the staging area. In order to do this we need to create a Azure service principal with the Storage Blob Data Contributor role.

If our dataloader can run within the ressource group we can use a managed identity (for the case if we are using webapi's etc), but for an application which has to run on-prem, a service principal is the nicest option.

To create a service principal for our dataloader

bash
az ad sp create-for-rbac --name dataloader

Note this information, as we are going to use this as environmental variables for the app, which is going to upload data to our storage container.

To see all our service principal accounts (if you want need to remove it later)

bash
az ad sp list --show-mine --output table

In order for our new service principal to use the storage account, we also need to assign it the Storage Blob Data Contributor role. For this we need the appID from the previous step and also the storage account id from the output of terraform.

bash
az role assignment create --assignee $appID --role "Storage Blob Data Contributor" --scope $storage_account_id

So it looks like

bash
az role assignment create --assignee "yourappid" --role "Storage Blob Data Contributor" --scope "/subscriptions/some_number/resourceGroups/your_resource_group/providers/Microsoft.Storage/storageAccounts/your_storage_account"

Now we can start creating a small upload application with our newly created service principal, you can create a env.sh file with the following (only use it locally, don't commit it to version control).

bash
export AZURE_CLIENT_ID="appID"
export AZURE_TENANT_ID="tenant"
export AZURE_CLIENT_SECRET="password"

I created a small python class to upload files to the staging area

python
az.py
import os
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
from pathlib import Path
class Az:
""""scans folder and uploads all files to unstaged in container"""
def __init__(self, account_url="https://your_storage_account.blob.core.windows.net", container_name="your_storage_container"):
# self.account_url = account_url
# self.container_name = container_name
default_credential = DefaultAzureCredential()
service_client = BlobServiceClient(
account_url, credential=default_credential)
self.client = service_client.get_container_client(
container_name)
def list_files(self, local_path):
return os.listdir(local_path)
def upload(self, source, dest):
'''
Upload a file or directory to a path inside the container
'''
if (os.path.isdir(source)):
self.upload_dir(source, dest)
else:
self.upload_file(source, dest)
def upload_file(self, source, dest):
'''
Upload a single file to a path inside the container
'''
print(f'Uploading {source} to {dest}')
with open(source, 'rb') as data:
try:
self.client.upload_blob(
name=dest, data=data) # , overwrite=True
except Exception as e:
print(e)
def upload_dir(self, source, dest):
'''
Upload the files of a directory to a path inside the storage container
'''
prefix = '' if dest == '' else dest + '/'
for root, dirs, files in os.walk(source):
for name in files:
dir_part = os.path.relpath(root, source)
dir_part = '' if dir_part == '.' else dir_part + '/'
file_path = os.path.join(root, name)
blob_path = prefix + name
self.upload_file(file_path, blob_path)
def ls_dirs(self, path, recursive=False):
'''
List directories under a path, optionally recursively
'''
if not path == '' and not path.endswith('/'):
path += '/'
blob_iter = self.client.list_blobs(name_starts_with=path)
dirs = []
for blob in blob_iter:
relative_dir = os.path.dirname(os.path.relpath(blob.name, path))
if relative_dir and (recursive or not '/' in relative_dir) and not relative_dir in dirs:
dirs.append(relative_dir)
return dirs
def ls_files(self, path, recursive=False):
'''
List files under a path, optionally recursively
'''
if not path == '' and not path.endswith('/'):
path += '/'
blob_iter = self.client.list_blobs(name_starts_with=path)
files = []
for blob in blob_iter:
relative_path = os.path.relpath(blob.name, path)
if recursive or not '/' in relative_path:
files.append(relative_path)
return files
def ls_files_sorted(self, path):
if not path == '' and not path.endswith('/'):
path += '/'
blob_iter = self.client.list_blobs(name_starts_with=path)
sortedByCreation = sorted(
blob_iter, key=lambda x: x.creation_time, reverse=True)
files = []
for blob in sortedByCreation:
files.append(blob.name)
return files
def download_first_match(self, paths, search, dest):
match = next(f for f in paths if search in f)
# [START download_a_blob]
DESTDIR = dest
Path(DESTDIR).mkdir(parents=True, exist_ok=True)
with open(DESTDIR + "/" + os.path.basename(match), "wb") as my_blob:
download_stream = self.client.download_blob(match)
my_blob.write(download_stream.readall())
# [END download_a_blob]

Using it to upload files in an example directory to the staging area

python
az = Az(account_url="https://your_storage_account.blob.core.windows.net",
container_name="your_storage_container")
az.upload_dir("./example_dir", "staging")

To schedule loading data, you would either use Apache Airflow, and store the environmental variables as secrets in Airflow, or you can use Cron/Systemd for running scheduled jobs on Linux and for the windows systems, task scheduler.

When I execute the above I can upload all the files within the directory

Now with Snowsql (or in a Snowflake worksheet) you can see that the file is there

You can query the file directly with Snowflake

To load the data to a table, create a target table and use the copy into command with a small transformation for the pesky american-style dates.

sql
-- Create the target table
create or replace table SalesExample (
ORDERNUMBER int,
QUANTITYORDERED decimal,
PRICEEACH decimal,
ORDERLINENUMBER int,
SALES decimal,
ORDERDATE timestamp_ntz
);
-- Convert the staged CSV column data to the specified data types before loading it into the destination table
copy into SalesExample
from (
select t.$1, t.$2, t.$3, t.$4, t.$5, to_timestamp_ntz(t.$6,'mm/dd/yyyy hh24:mi') from @azure_stage/sales_data_sample.csv t
)
file_format = 'my_csv_format';

And now we can use SQL to query

In a follow-up post, I will show how to use dbt to create transformations with the data which was loaded in Snowflake and create roles for doing the data-visualization (using PowerBI for example). This post's repo is found here.

© 2022 Jacques du Preez. All rights reserved.