Organizations are experiencing significant growth in their data asset and leveraging Snowflake to gather data insights to grow their businesses. This data includes structured, semi-structured, and unstructured data coming in batches or streams.

As you’ve seen in previous posts, Data ingestion is the first stage of the data lifecycle. Here, data is collected from various internal sources like databases, CRMs, ERPs, legacy systems, external ones such as surveys, and third-party providers. This is an important step because it ensures the excellent working of subsequent stages in the data lifecycle.

In this stage, raw data are extracted from one or more data sources, replicated, and then ingested into a storage location called stage. Once data is integrated into Snowflake, you can use powerful features such as Snowpark, Data Sharing, and more to derive value from data to send to reporting tools, partners, and customers.

In this article, I will illustrate data ingestion and integration using Snowflake’s first-party methods to meet the different data pipeline needs, from batch to continuous ingestion. These methods include but are not limited to INSERT, COPY, Snowpipe, Snowpipe Streaming, or the Dynamic Tables.

image Snowflake’s ingestion options.

Batch Ingestion

Snowflake supports ingesting data in multiple formats and compression methods at any file volume. Features such as schema detection and schema evolution simplify data loading directly into structured tables without needing to split, merge, or convert files. First-party mechanisms for batch data ingestion are INSERT and COPY.

Insert

The INSERT command is the most straightforward ingestion mechanism for bringing a small amount of data. It updates a table by inserting one or more rows. The values inserted into each column in the table or the query results can be explicitly specified. Bellow the syntax of the INSERT statement:

1
2
3
4
INSERT [ OVERWRITE ] INTO <target_table> [ ( <target_col_name> [ , ... ] ) ]
       {
VALUES ( { <value> | DEFAULT | NULL } [ , ... ] ) [ , ( ... ) ]  |  <query>
       }

You can insert multiple rows using explicitly specified values in a comma-separated list in the VALUES clause:

1
2
3
4
5
6
7
8
9
10
11
12
13
INSERT INTO employees
  VALUES
  ('Lysandra','Reeves','1-212-759-3751','New York',10018),
  ('Michael','Arnett','1-650-230-8467','San Francisco',94116);

SELECT * FROM employees;

+------------+-----------+----------------+---------------+-------------+
| FIRST_NAME | LAST_NAME | WORKPHONE      | CITY          | POSTAL_CODE |
|------------+-----------+----------------+---------------+-------------|
| Lysandra   | Reeves    | 1-212-759-3751 | New York      | 10018       |
| Michael    | Arnett    | 1-650-230-8467 | San Francisco | 94116       |
+------------+-----------+----------------+---------------+-------------

You can also insert multiple rows using a select Query. For example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
SELECT * FROM contractors;

+------------------+-----------------+----------------+---------------+----------+
| CONTRACTOR_FIRST | CONTRACTOR_LAST | WORKNUM        | CITY          | ZIP_CODE |
|------------------+-----------------+----------------+---------------+----------|
| Bradley          | Greenbloom      | 1-650-445-0676 | San Francisco | 94110    |
| Cole             | Simpson         | 1-212-285-8904 | New York      | 10001    |
| Laurel           | Slater          | 1-650-633-4495 | San Francisco | 94115    |
+------------------+-----------------+----------------+---------------+----------+

INSERT INTO employees(first_name, last_name, workphone, city,postal_code)
  SELECT
    contractor_first,contractor_last,worknum,NULL,zip_code
  FROM contractors
  WHERE CONTAINS(worknum,'650');

SELECT * FROM employees;

+------------+------------+----------------+---------------+-------------+
| FIRST_NAME | LAST_NAME  | WORKPHONE      | CITY          | POSTAL_CODE |
|------------+------------+----------------+---------------+-------------|
| Lysandra   | Reeves     | 1-212-759-3751 | New York      | 10018       |
| Michael    | Arnett     | 1-650-230-8467 | San Francisco | 94116       |
| Bradley    | Greenbloom | 1-650-445-0676 | NULL          | 94110       |
| Laurel     | Slater     | 1-650-633-4495 | NULL          | 94115       |
+------------+------------+----------------+---------------+-------------+

You can also insert multiple JSON objects into a VARIANT column in a table:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
INSERT INTO prospects
  SELECT PARSE_JSON(column1)
  FROM VALUES
  ('{
    "_id": "57a37f7d9e2b478c2d8a608b",
    "name": {
      "first": "Lydia",
      "last": "Williamson"
    },
    "company": "Miralinz",
    "email": "lydia.williamson@miralinz.info",
    "phone": "+1 (914) 486-2525",
    "address": "268 Havens Place, Dunbar, Rhode Island, 7725"
  }')
  , ('{
    "_id": "57a37f7d622a2b1f90698c01",
    "name": {
      "first": "Denise",
      "last": "Holloway"
    },
    "company": "DIGIGEN",
    "email": "denise.holloway@digigen.net",
    "phone": "+1 (979) 587-3021",
    "address": "441 Dover Street, Ada, New Mexico, 5922"
  }');

Finally, if you use the OVERWRITE clause with a multi-row insert, the statement rebuilds and overrides the table with the content of the VALUES clause.

As you can see, the INSERT statement is the simplest way to ingest data into Snowflake, however, it has scalability and error-handling limitations when dealing with data sets exceeding the single-digit MiB range. For larger data sets, data engineers typically leverage the option to use an ETL/ELT tool to ingest data or preferably use object storage as an intermediate step alongside COPY or Snowpipe.

COPY

The COPY command enables loading batches of data from staged files to an existing table. The files must already be staged in one of the following locations:

  • Named internal stage (or table/user stage). Files can be staged using the PUT command.
  • Named external stage that references an external location (Amazon S3, Google Cloud Storage, or Microsoft Azure).
  • External location (Amazon S3, Google Cloud Storage, or Microsoft Azure).

COPY provides increased control than INSERT but requires the customer to manage the compute (via settings such as warehouse size and job duration). In fact, this command uses a predefined, customer-managed virtual warehouse to read the data from the remote storage, optionally transform its structure, and write it to native Snowflake tables.

These on-the-fly transformations may include:

  • Column reordering
  • Column omission
  • Casts
  • Text truncation

COPY fits nicely in an existing infrastructure where one or more warehouses are managed for size and suspension/resumption to achieve peak price-to-performance of various workloads, such as SELECT queries or data transformations. Here is the syntax of a simple COPY command:

1
2
3
4
5
6
7
8
COPY INTO [<namespace>.]<table_name>
     FROM { internalStage | externalStage | externalLocation }
[ FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] ) ]
[ PATTERN = '<regex_pattern>' ]
[ FILE_FORMAT = ( { FORMAT_NAME = '[<namespace>.]<file_format_name>' |
                    TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ]
[ copyOptions ]
[ VALIDATION_MODE = RETURN_<n>_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS ]

Where:

1
2
3
4
internalStage ::=
    @[<namespace>.]<int_stage_name>[/<path>]
  | @[<namespace>.]%<table_name>[/<path>]
  | @~[/<path>]

For example, to load files from a named internal stage into a table, the command is:

1
2
COPY INTO mytable
FROM @my_int_stage;

Using the CREATE STAGE command, you can load files from a named external stage you already created. The named external stage references an external location (Amazon S3, Google Cloud Storage, or Microsoft Azure). For example:

1
2
3
4
5
6
COPY INTO mytable
  FROM s3://mybucket/data/files
  CREDENTIALS=(AWS_KEY_ID='$AWS_ACCESS_KEY_ID' AWS_SECRET_KEY='$AWS_SECRET_ACCESS_KEY')
  STORAGE_INTEGRATION = myint
  ENCRYPTION=(MASTER_KEY = 'eSx...')
  FILE_FORMAT = (FORMAT_NAME = my_csv_format);
1
2
3
4
COPY INTO mytable
  FROM 'gcs://mybucket/data/files'
  STORAGE_INTEGRATION = myint
  FILE_FORMAT = (FORMAT_NAME = my_csv_format);
1
2
3
4
5
COPY INTO mytable
  FROM 'azure://myaccount.blob.core.windows.net/mycontainer/data/files'
  CREDENTIALS=(AZURE_SAS_TOKEN='?sv=2016-05-31&ss=b&srt=sco&sp=rwdl&se=2018-06-27T10:05:50Z&st=2017-06-27T02:05:50Z&spr=https,http&sig=bgqQwoXwxzuD2GJfagRg7VOS8hzNr3QLT7rhS8OFRLQ%3D')
  ENCRYPTION=(TYPE='AZURE_CSE' MASTER_KEY = 'kPx...')
  FILE_FORMAT = (FORMAT_NAME = my_csv_format);

For data load with transformation, the command syntax is as follows:

1
2
3
4
5
6
7
8
COPY INTO [<namespace>.]<table_name> [ ( <col_name> [ , <col_name> ... ] ) ]
     FROM ( SELECT [<alias>.]$<file_col_num>[.<element>] [ , [<alias>.]$<file_col_num>[.<element>] ... ]
            FROM { internalStage | externalStage } )
[ FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] ) ]
[ PATTERN = '<regex_pattern>' ]
[ FILE_FORMAT = ( { FORMAT_NAME = '[<namespace>.]<file_format_name>' |
                    TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ]
[ copyOptions ]

The COPY command relies on a customer-managed warehouse, so there are some elements to consider when choosing the appropriate warehouse size. The most critical aspect is the degree of parallelism, as each thread can ingest a single file simultaneously. The XS Warehouse provides eight threads, and each increment of warehouse size doubles the amount of available threads. The simplified conclusion is that for a significantly large number of files, you would expect optimal parallelism for each given warehouse size, halving the time to ingest the large batch of files for every upsize step. However, this speedup can be limited by factors such as networking or I/O delays in real-life scenarios. These factors should be considered for larger ingestion jobs and might require individual benchmarking during the planning phase.

image Parallel loading of files into Snowflake.

There is a fixed, per-file overhead charge for Snowpipe in addition to compute usage costs. As a result, for smaller single-digit MiB or smaller file sizes, Snowpipe can be less cost-effective (in credits/TiB) than COPY, depending on file arrival rate, size of the warehouse, and non-COPY use of the Cloud Services Layer. Correspondingly, larger file sizes of at least 100 MiB are typically more efficient.

Generally, we recommend file sizes above 10 MiB, with the 100 to 250 MiB range typically offering the best price/performance. As a result, we recommend aggregating smaller data files for batch ingestion. We also recommend not exceeding 5 GiB file sizes and splitting into smaller files to take advantage of parallelization. With a larger file, an erroneous record at the end may cause an ingestion job to fail and restart depending on the ON_ERROR option.

Finally, using the most explicit path allows COPY to list and load data without traversing the entire bucket, thereby saving compute and network usage.

Continuous Data Ingestion

This option is designed to load small volumes of data (i.e. micro-batches) and incrementally make them available for analysis. For example, Snowpipe loads data within minutes after files are added to a stage and submitted for ingestion. This ensures users have the latest results as soon as the raw data is available.

Snowpipe

Snowpipe is a serverless service that enables loading data from files as soon as they’re available in a Snowflake stage (locations where data files are stored for loading/unloading). Snowpipe can load data from files in micro-batches rather than executing COPY statements on a schedule. Unlike COPY, which is a synchronous process that returns the load status, Snowpipe file ingestion is asynchronous, and processing status must be observed explicitly.

Snowpipe uses compute resources provided by Snowflake (a serverless compute model). These Snowflake-provided resources are automatically resized and scaled up or down as required, and they are charged and itemized using per-second billing. Data ingestion is charged based upon the actual workloads.

A pipe is a named, first-class Snowflake object that contains a COPY statement used by Snowpipe. The COPY statement identifies the source location of the data files (i.e., a stage) and a target table and supports the same transformation options as when bulk loading data. All data types are supported, including semi-structured data types such as JSON and Avro.

In addition, data pipelines can leverage Snowpipe to continuously load micro-batches of data into staging tables for transformation and optimization using automated tasks and the change data capture (CDC) information in streams. For instance, auto-ingesting Snowpipe is the preferred approach. This approach continuously loads new data to the target table by reacting to newly created files in the source bucket.

image Auto-ingest Snowpipe setup.

In the example above, Snowpipe relies on the cloud vendor-specific system for event distribution, such as AWS SQS or SNS, Azure Event Grid, or GCP Pub/Sub. This setup requires corresponding privileges to the cloud account to deliver event notifications from the source bucket to Snowpipe.

The following example creates a stage named mystage in the active schema for the user session. The cloud storage URL includes the path files. The stage references a storage integration named my_storage_int. First, we create the S3 storage integration and the stage:

CREATE STORAGE INTEGRATION my_storage_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::001234567890:role/myrole'
  STORAGE_ALLOWED_LOCATIONS = ('s3://mybucket1/mypath1/', 's3://mybucket2/mypath2/')
  STORAGE_BLOCKED_LOCATIONS = ('s3://mybucket1/mypath1/sensitivedata/', 's3://mybucket2/mypath2/sensitivedata/');
USE SCHEMA snowpipe_db.public;
CREATE STAGE mystage
  URL = 's3://mybucket/load/files'
  STORAGE_INTEGRATION = my_storage_int;

Then, we create a pipe named mypipe in the active schema for the user session. The pipe loads the data from files staged in the mystage stage into the mytable table and subscribes to the SNS topic ARN that propagates the notification:

create pipe snowpipe_db.public.mypipe
  auto_ingest=true
  aws_sns_topic='<sns_topic_arn>'
  as
    copy into snowpipe_db.public.mytable
    from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

But in cases where an event service can not be set up, or an existing data pipeline infrastructure is in place, a REST API-triggered Snowpipe is a suitable alternative. It is currently the only option if an internal stage is used to store the raw files. Most commonly, the REST API approach is used by ETL/ELT tools that don’t want to put the burden of creating object storage on the end user and instead use a Snowflake-managed Internal Stage.

image API-triggered Snowpipe setup.

Similar to the Auto-ingest setup, you need to create a stage, and an integration to S3 storage. The pipe is created without the SNS topic arn and the auto_ingest keyword.

create pipe snowpipe_db.public.mypipe as
    copy into snowpipe_db.public.mytable
    from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

Then, we create a user with key-pair authentication. The user credentials will be used when calling the Snowpipe API endpoints:

use role securityadmin;
create user snowpipeuser 
  login_name = 'snowpipeuser'
  default_role = SYSADMIN
  default_namespace = snowpipe_test.public
  rsa_public_key = '<RSA Public Key value>' ;

Connecting via SnowSQL validates that the user has been successfully created. Use the --private-key-path switch to tell SnowSQL to use key-pair authentication.

snowsql -a sedemo.us-east-1-gov.aws -u snowpipeuser --private-key-path rsa_key.p8

Authentication via the REST endpoint expects a valid JSON Web Token (JWT). These tokens are generally valid for about 60 minutes and must be regenerated. If you want to test the REST API using Postman or curl, you must generate one from the RSA certificate.

Once you generate the JWT, the REST endpoint should reference your Snowflake account and the fully qualified pipe name. The call you’re testing is a POST to the insertFiles method.

curl -H 'Accept: application/json' -H "Authorization: Bearer ${TOKEN}" -d @path/to/data.csv https://sedemo.us-east-1-gov.aws.snowflakecomputing.com/v1/data/pipes/snowpipe_db.public.mypipe/insertFiles

The responseCode should be SUCCESS. It’s important to remember that Snowpipe will not ingest the same file twice. The call will succeed, but no data will be ingested. This is by design. To retest, either use a different filename or drop and recreate the table.

The ingestion should already be finished, so you can return to the Snowflake UI and run a select statement on the table.

Snowpipe Streaming

Snowpipe Streaming enables serverless streaming data ingestion directly into Snowflake tables without requiring staging files (bypassing cloud object storage) with exact-once and ordered delivery. This architecture results in lower latencies and correspondingly lower costs for loading any volume of data, making it a powerful tool for handling near real-time data streams.

The API is intended to complement Snowpipe, not replace it. Use the Snowpipe Streaming API in streaming scenarios where data is streamed via rows (for example, Apache Kafka topics) instead of writing to files. The API fits into an ingest workflow, including an existing custom Java application that produces or receives records. The API removes the need to create files to load data into Snowflake tables and enables the automatic, continuous loading of data streams into Snowflake as the data becomes available. You also get new functionality, such as exactly-once delivery, ordered ingestion, and error handling with dead-letter queue (DLQ) support.

Snowpipe Streaming is also available for the Snowflake Connector for Kafka, which offers an easy upgrade path to take advantage of the lower latency and lower cost loads.

image Snowpipe Streaming API.

The API ingests rows through one or more channels. A channel represents a logical, named streaming connection to Snowflake for loading data into a table. A single channel maps to exactly one table in Snowflake; however, multiple channels can point to the same table. The Client SDK can open multiple channels to multiple tables; however, the SDK cannot open channels across accounts. The ordering of rows and their corresponding offset tokens are preserved within a channel but not across channels that point to the same table.

Channels are meant to be long-lived when a client actively inserts data and should be reused as offset token information is retained. Data inside the channel is automatically flushed every 1 second by default and doesn’t need to be closed.

image Streaming channels.

Ingesting Kafka topics into Snowflake tables

Files are a common denominator across processes that produce data—whether they’re on-premises or in the cloud. Most ingestion happens in batches, where a file forms a physical and sometimes logical batch. Today, file-based ingestion utilizing COPY or auto-ingest Snowpipe is the primary source for data that is ingested into Snowflake.

Kafka (or its cloud-specific equivalents) provides an additional data collection and distribution infrastructure to write and read streams of records. If event records need to be distributed to multiple sinks—mostly as streams—then such an arrangement makes sense. Stream processing (in contrast to batch processing) typically allows for lower data volumes at more frequent intervals for near real-time latency.

In the case of the Snowflake Connector for Kafka, the same file size consideration mentioned earlier still applies due to its use of Snowpipe for data ingestion. However, there may be a trade-off between the desired maximum latency and a larger file size for cost optimization. The right file size for your application may not fit the above guidance, and that is acceptable as long as the cost implications are measured and considered.

In addition, the amount of memory available in a Kafka Connect cluster node may limit the buffer size and, therefore, the file size. In that case, configuring the timer value (buffer.flush.time) is still a good idea to ensure that files smaller than the buffer size are less likely.

Two elements—Buffer.flush.time and Buffer.flush.size—decide the total number of files per minute that you are sending to Snowflake via the Kafka connector. So, tuning these parameters is very beneficial in terms of performance. Here’s a look at two examples:

  • If you set buffer.flush.time to 240 seconds instead of 120 seconds without changing anything else, it will reduce the base files/minute rate by a factor of 2 (reaching buffer size earlier than time will affect these calculations).
  • If you increase the Buffer.flush.size to 100 MB without changing anything else, the base files/minute rate will be reduced by 20 (reaching the max buffer size earlier than the max buffer time will affect these calculations).

For testing this setup locally, we will need:

  • Open-source Apache Kafka 2.13-3.1.0 installed locally,
  • Snowflake Kafka Connector 1.9.1.jar (or new version),
  • OpenJDK <= 15.0.2,
  • a Snowflake user for streaming Snowpipe with an SSH key defined as the authentication method.

First, you need to create a separate user that you are going to use for Streaming Snowpipe. Please remember to replace with the corresponding details. In this case, you need to remove the begin/end comment lines from the key file (e.g. —–BEGIN PUBLIC KEY—–), but please keep the new-line characters.

1
2
create user snowpipe_streaming_user password='',  default_role = accountadmin, rsa_public_key='<YOURPUBLICKEY>';
grant role accountadmin  to user snowpipe_streaming_user;

Here, you will create the database you will use later on.

1
2
3
CREATE OR REPLACE DATABASE hol_streaming;
USE DATABASE hol_streaming;
CREATE OR REPLACE WAREHOUSE hol_streaming_wh WITH WAREHOUSE_SIZE = 'XSMALL' MIN_CLUSTER_COUNT = 1 MAX_CLUSTER_COUNT = 1 AUTO_SUSPEND = 60;

Then, let’s open the terminal and run the following commands to download Kafka and Snowflake Kafka connector:

mkdir HOL_kafka
cd HOL_kafka

curl https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz --output kafka_2.13-3.3.1.tgz
tar -xzf kafka_2.13-3.3.1.tgz

cd kafka_2.13-3.3.1/libs
curl https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.9.1/snowflake-kafka-connector-1.9.1.jar --output snowflake-kafka-connector-1.9.1.jar

Create the configuration file config/SF_connect.properties with the following parameters. Remember to replace <YOURACCOUNT> & <YOURPRIVATEKEY> with the corresponding details. Also, please note when adding a private key, you need to remove all new line characters as well as beginning and ending comments (e.g., —–BEGIN PRIVATE KEY—–):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
name=snowpipe_streaming_ingest
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=1
topics=customer_data_topic
snowflake.topic2table.map=customer_data_topic:customer_data_stream_stg
buffer.count.records=1
buffer.flush.time=10
buffer.size.bytes=20000000
snowflake.url.name=<YOURACCOUNT>.snowflakecomputing.com:443
snowflake.user.name=SNOWPIPE_STREAMING_USER
snowflake.private.key=<YOURPRIVATEKEY>
snowflake.database.name=HOL_STREAMING
snowflake.schema.name=PUBLIC
snowflake.role.name=ACCOUNTADMIN
snowflake.ingestion.method=SNOWPIPE_STREAMING
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

Now, this is out of the way. Let’s start this all together. Please note that you might get errors for this step if you use JDK>=v15. And you might need a few separate terminal sessions for this:

Session 1:

bin/zookeeper-server-start.sh config/zookeeper.properties

Session 2:

bin/kafka-server-start.sh config/server.properties

Session 3:

bin/connect-standalone.sh ./config/connect-standalone.properties ./config/SF_connect.properties

Now, open another terminal session (Session 4) and run the kafka-console-producer. This utility allows you to manually enter data into the topic.

bin/kafka-console-producer.sh --topic customer_data_topic --bootstrap-server localhost:9092

Let’s get back to Snowsight and run the following query to generate some sample customer data in JSON format:

SELECT object_construct(*)
  FROM snowflake_sample_data.tpch_sf10.customer limit 200;

As you can see, Snowpipe Streaming is a fantastic new capability that can significantly reduce integration latency and improve pipeline efficiency. It also opens up new opportunities for your business, providing near-real-time insights and operational reporting, among other benefits.

image

Snowpipe Streaming and Dynamic Tables for Real-Time Ingestion

Dynamic tables are the building blocks of declarative data transformation pipelines. They significantly simplify data engineering in Snowflake and provide a reliable, cost-effective, and automated way to transform your data for consumption. Instead of defining data transformation steps as a series of tasks and having to monitor dependencies and scheduling, you can determine the end state of the transformation using dynamic tables and leave the complex pipeline management to Snowflake.

Dynamic tables.

Hereafter, we will take you through a scenario of using Snowflake’s Snowpipe Streaming to ingest a simulated stream. Then, we will utilize Dynamic tables to transform and prepare the raw ingested JSON payloads into ready-for-analytics datasets. These are two of Snowflake’s powerful Data Engineering innovations for ingestion and transformation.

The simulated data feed will be Stock Limit Orders, with new, changed, and canceled orders represented as RDBMS transaction logs captured from INSERT, UPDATE, and DELETE database events. These events will be transmitted as JSON payloads and land into a Snowflake table with a variant data column. This is the same type of stream ingestion typically created by Change-Data-Capture (CDC) agents that parse transaction logs of a database or event notification mechanisms of modern applications. However, this could simulate any type of stream in any industry. This streaming ingestion use case was modeled similarly to one previously handled with Snowflake’s Kafka Connector. Still, no Kafka is necessary for this use case as a Snowpipe Streaming client can enable replacing the Kafka middleware infrastructure, saving cost & complexity. Once landed, Dynamic Tables are purpose-built Snowflake objects for Data Engineering to transform the raw data into data ready for insights.

Snowpipe Streaming API and Dynamic Tables for Change Data Capture.

Our Source ‘database’ has stock trades for the Dow Jones Industrials, 30 US stocks. On average 200M-400M stock trades are executed per day. Our agent will capture Limit Order transaction events for these 30 stocks, which are new orders, updates to orders (changes in quantity or the limit price), and canceled orders. For this simulation, there are three new orders for every two updates and one cancellation. This scenario’s datastream will first reproduce a heavy workload of an initial market opening session and a more modest continuous flow. Snowflake data consumers want to see three perspectives on limit orders: what is the “current” list of orders that filters out stale and canceled orders, a historical table showing every event on the source (in a traditional slowly changing dimension format), and current orders summarized by stock ticker symbol and by long or short position. Latency needs to be minimized, 1-2 minutes would be ideal for the end-to-end process.

More Snowflake capabilities can further enrich your incoming data using Snowflake Data Marketplace data, train and deploy machine learning models, perform fraud detection, and other use cases. We will cover these in future posts.

First, you need to extract this file, creating a CDCSimulatorApp directory and many files within it.

From your terminal, navigate to your working directory, then the directory extracted (CDCSimulatorApp), and run these two commands:

1
2
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

In Snowflake, create a dedicated role for your Streaming Application. For this, run these commands using the Public Key generated in the previous step (the content of rsa_key.pub):

1
2
3
create role if not exists VHOL_CDC_AGENT;
create or replace user vhol_streaming1 COMMENT="Creating for VHOL";
alter user vhol_streaming1 set rsa_public_key='<Paste Your Public Key Here>';

You will need to edit the snowflake.properties file to match your Snowflake account name (two places):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
user=vhol_streaming1
role=VHOL_CDC_AGENT
account=<MY_SNOWFLAKE_ACCOUNT>
warehouse=VHOL_CDC_WH
private_key_file=rsa_key.p8
host=<ACCOUNT_IDENTIFIER>.snowflakecomputing.com
database=VHOL_ENG_CDC
schema=ENG
table=CDC_STREAMING_TABLE
channel_name=channel_1
AES_KEY=O90hS0k9qHdsMDkPe46ZcQ==
TOKEN_KEY=11
DEBUG=FALSE
SHOW_KEYS=TRUE
NUM_ROWS=1000000

Create new roles for this tutorial and grant permissions:

1
2
3
4
5
use role ACCOUNTADMIN;
set myname = current_user();
create role if not exists VHOL;
grant role VHOL to user identifier($myname);
grant role VHOL_CDC_AGENT to user vhol_streaming1;

Create a dedicated virtual compute warehouse (size XS), then create the database used throughout this tutorial:

1
2
create or replace warehouse VHOL_CDC_WH WAREHOUSE_SIZE = XSMALL, AUTO_SUSPEND = 5, AUTO_RESUME= TRUE;
grant all privileges on warehouse VHOL_CDC_WH to role VHOL;
1
2
3
4
5
create database VHOL_ENG_CDC;
use database VHOL_ENG_CDC;
grant ownership on schema PUBLIC to role VHOL;
revoke all privileges on database VHOL_ENG_CDC from role ACCOUNTADMIN;
grant ownership on database VHOL_ENG_CDC to role VHOL;
1
2
3
4
5
6
7
8
9
use role VHOL;
use database VHOL_ENG_CDC;
create schema ENG;
use VHOL_ENG_CDC.ENG;
use warehouse VHOL_CDC_WH;
grant usage on database VHOL_ENG_CDC to role VHOL_CDC_AGENT;
grant usage on schema ENG to role VHOL_CDC_AGENT;
grant usage on database VHOL_ENG_CDC to role PUBLIC;
grant usage on schema PUBLIC to role PUBLIC;

Create a staging/landing table where all incoming data will land initially. Each row will contain a transaction, but JSON will be stored as a VARIANT datatype within Snowflake.

1
2
create or replace table ENG.CDC_STREAMING_TABLE (RECORD_CONTENT variant);
grant insert on table ENG.CDC_STREAMING_TABLE to role VHOL_CDC_AGENT;

You can run Test.sh to ensure that everything is set correctly. You are now ready to Stream data into Snowflake!

To execute the streaming simulator, run Run_MAX.sh.

./Run_MAX.sh

Which should take 10-20 seconds and returns:

image

Then, you have 1 million records in the CDC_STREAMING_TABLE table.

image

Each record is a JSON payload received via the Snowpipe Streaming Ingestion API and stored in a Snowflake table as rows and variant datafields.

Now, you can create a more finished Dynamic Table sourcing from the landing table that reflects the “CURRENT STATE” of the source table. In this pattern, for each source table, you create a Dynamic Table:

CREATE OR REPLACE DYNAMIC TABLE ENG.LIMIT_ORDERS_CURRENT_DT
LAG = '1 minute'
WAREHOUSE = 'VHOL_CDC_WH'
AS
SELECT * EXCLUDE (score,action) from (  
  SELECT
    RECORD_CONTENT:transaction:primaryKey_tokenized::varchar as orderid_tokenized,
    RECORD_CONTENT:transaction:record_after:orderid_encrypted::varchar as orderid_encrypted,
    TO_TIMESTAMP_NTZ(RECORD_CONTENT:transaction:committed_at::number/1000) as lastUpdated,
    RECORD_CONTENT:transaction:action::varchar as action,
    RECORD_CONTENT:transaction:record_after:client::varchar as client,
    RECORD_CONTENT:transaction:record_after:ticker::varchar as ticker,
    RECORD_CONTENT:transaction:record_after:LongOrShort::varchar as position,
    RECORD_CONTENT:transaction:record_after:Price::number(38,3) as price,
    RECORD_CONTENT:transaction:record_after:Quantity::number(38,3) as quantity,
    RANK() OVER (
        partition by orderid_tokenized order by RECORD_CONTENT:transaction:committed_at::number desc) as score
  FROM ENG.CDC_STREAMING_TABLE 
    WHERE 
        RECORD_CONTENT:transaction:schema::varchar='PROD' AND RECORD_CONTENT:transaction:table::varchar='LIMIT_ORDERS'
) 
WHERE score = 1 and action != 'DELETE';

Warning: If you run this immediately, it will show a warning that the table is not yet ready. You have to wait a bit for the refresh period and the Dynamic Table to be built

Wait for Lag Period (1 minute), then check the table:

SELECT count(*) FROM LIMIT_ORDERS_CURRENT_DT;

Let’s work with dynamic data going forward and return to the stream simulator to provide a continuous stream. Run Run_Slooow.sh, and the application will stream 10 records/second until you stop the application (using Ctrl-C). If you want more volume, run the Run_Sloow.sh for 100/second or Run_Slow.sh for 1000/second stream rate. Note that the simulator is designed to run only one of these simultaneously (the channel name is configured in the property file).

Streaming the first table is done perfectly, but you can also analyze how orders/records have changed and keep a historical record, for example, in a Slowly Changing Dimensions (SCD). In this case, you can do that by adding additional fields to each record to track and group them:

CREATE OR REPLACE DYNAMIC TABLE ENG.LIMIT_ORDERS_SCD_DT
LAG = '1 minute'
WAREHOUSE = 'VHOL_CDC_WH'
AS
SELECT * EXCLUDE score from ( SELECT *,
  CASE when score=1 then true else false end as Is_Latest,
  LAST_VALUE(score) OVER (
            partition by orderid_tokenized order by valid_from desc)+1-score as version
  FROM (  
      SELECT
        RECORD_CONTENT:transaction:primaryKey_tokenized::varchar as orderid_tokenized,
        --IFNULL(RECORD_CONTENT:transaction:record_after:orderid_encrypted::varchar,RECORD_CONTENT:transaction:record_before:orderid_encrypted::varchar) as orderid_encrypted,
        RECORD_CONTENT:transaction:action::varchar as action,
        IFNULL(RECORD_CONTENT:transaction:record_after:client::varchar,RECORD_CONTENT:transaction:record_before:client::varchar) as client,
        IFNULL(RECORD_CONTENT:transaction:record_after:ticker::varchar,RECORD_CONTENT:transaction:record_before:ticker::varchar) as ticker,
        IFNULL(RECORD_CONTENT:transaction:record_after:LongOrShort::varchar,RECORD_CONTENT:transaction:record_before:LongOrShort::varchar) as position,
        RECORD_CONTENT:transaction:record_after:Price::number(38,3) as price,
        RECORD_CONTENT:transaction:record_after:Quantity::number(38,3) as quantity,
        RANK() OVER (
            partition by orderid_tokenized order by RECORD_CONTENT:transaction:committed_at::number desc) as score,
        TO_TIMESTAMP_NTZ(RECORD_CONTENT:transaction:committed_at::number/1000) as valid_from,
        TO_TIMESTAMP_NTZ(LAG(RECORD_CONTENT:transaction:committed_at::number/1000,1,null) over 
                         (partition by orderid_tokenized order by RECORD_CONTENT:transaction:committed_at::number desc)) as valid_to
      FROM ENG.CDC_STREAMING_TABLE
      WHERE 
            RECORD_CONTENT:transaction:schema::varchar='PROD' AND RECORD_CONTENT:transaction:table::varchar='LIMIT_ORDERS'
    ))
;

Wait for the lag period (~ 1 minute), then recheck the table. You should now see more than the 1,000,000 initial records we loaded.

This data is now ready for public use! To create access for users to consume, let’s use views to allow access (note: JSON path syntax is not seen or needed except from the landing table). For our “Current View” Table:

create or replace view PUBLIC.CURRENT_LIMIT_ORDERS_VW
  as select orderid_tokenized, lastUpdated, client, ticker, position, quantity, price
  FROM ENG.LIMIT_ORDERS_CURRENT_DT order by orderid_tokenized;
grant select on view PUBLIC.CURRENT_LIMIT_ORDERS_VW to role PUBLIC;

No need to wait.. Your consumers can now view and analyze Limit Orders in real time!

select * from PUBLIC.CURRENT_LIMIT_ORDERS_VW limit 1000;

Summary

Snowflake offers various building blocks for working with both batch and streaming data. There is no one-size-fits-all approach, so it is important to understand the differences to address requirements effectively. In this post, we explored the ingestion options, best practices, and how you can implement each concretely.

image

Regardless of the ingestion method you choose, the thorny question that remains legit is the one about ingestion time and cost. Both of them depend on various factors, including:

  • Size of the file: The core ingestion time is relative to the content, so the costs tend to be proportional to the number of records and file size but do not have an exact correlation.
  • The amount of pre-processing required: Some ingestion jobs invoke complex UDFs that take significant time per row and occasionally can even run out of memory if the data size is not correctly anticipated.
  • File format, compression, nested structures, etc., impact how efficiently we can decompress and load the data. An uncompressed file with a large number of columns may take the same amount of time as a compressed file with a small number of columns but has highly nested data structures.

Therefore, it is impossible to answer the time and cost question without measuring it for each specific case.

Finally, as mentioned in previous posts, there are many approaches to data ingestion, but the best practice is to reduce complexity while achieving your business requirements. Batch and Streaming ingestion can work together to provide the simplest and most cost-effective solution to your data pipelines. Streaming ingestion is not meant to replace file-based ingestion but rather to augment it for data-loading scenarios that better fit your business needs.

References