All posts by Caius Brindescu

How Etleap and Amazon Redshift Serverless optimize costs for ETL

Post Syndicated from Caius Brindescu original https://aws.amazon.com/blogs/big-data/how-etleap-and-amazon-redshift-serverless-optimize-costs-for-etl/

Amazon Redshift Serverless lets you avoid managing infrastructure while only paying for what you use. Etleap provides data integration software that is natively built on AWS. It’s an AWS Advanced Technology Partner with the AWS Data & Analytics Competency and Amazon Redshift Service Ready designation.

In this post, we share how you can minimize the usage of resources for some workload patterns and maximize savings while seamlessly managing data pipelines. We illustrate an example of how Redshift Serverless and Etleap’s load synchronization feature can reduce active Redshift Serverless time, further optimizing extract, transform, and load (ETL) costs.

Introduction to Redshift Serverless

Redshift Serverless makes it easy to run and scale analytics in seconds without the need to set up and manage data warehouse clusters. With Redshift Serverless, you pay for the compute only when the data warehouse is in use. This is ideal when it’s difficult to predict compute needs such as variable workloads, periodic workloads with idle time, and steady-state workloads with spikes. As your demand evolves with new workloads and more concurrent users, Redshift Serverless automatically provisions the right compute resources, and your data warehouse scales seamlessly and automatically.

You can create a Redshift Serverless data warehouse either using the default settings or custom settings. Redshift Serverless creates a default workgroup and associates that to the default namespace. You can also create multiple Redshift Serverless endpoints per AWS account and Region using namespaces and workgroups.

A namespace is a collection of database objects and users, with properties such as database name and password, permissions, and encryption and security. The following screenshot shows an example of a namespace configuration on the Redshift Serverless console.

Namespace-Amazon Redshift Serverless

A workgroup is a collection of compute resources, which includes network and security settings. Workgroup configuration allows you to create a private or public serverless endpoint that you can use to connect with your applications. The following screenshot shows an example workgroup on the Redshift Serverless console.

Workgroup - Amazon Redshift Serverless

When the Redshift Serverless endpoint is available, choose Query data to launch the Amazon Redshift Query Editor v2 to create database objects, load data, and analyze and visualize data. You can also connect to Redshift Serverless endpoints using your preferred SQL client tools via Amazon Redshift JDBC/ODBC drivers.

With Redshift Serverless, you pay separately for the compute and storage you use. Compute capacity is measured in Redshift Processing Units (RPUs), and you pay for the workloads in RPU-hours with a minimum charge of 60 seconds, metered on a per-second basis. Data lake queries are also part of the same RPU-hours, and Redshift Serverless doesn’t charge separately for the per-TB based pricing of Amazon Redshift Spectrum. The default base capacity is 128 RPUs, but you can adjust it from 32 RPUs to 512 RPUs in units of 8 using the Redshift Serverless console. For storage, you pay for data stored in Amazon Redshift-managed storage and storage used for manual snapshots, similar to what you would pay with Amazon Redshift provisioned RA3 instances.

To control your costs, you can specify usage limits and define actions that Amazon Redshift automatically takes if those limits are reached. You can specify usage limits in RPU-hours and associated with a daily, weekly, or monthly duration. Setting higher usage limits can improve the overall throughput of the system, especially for workloads that need to handle high concurrency while maintaining consistently high performance.

Why Etleap customers need Redshift Serverless

Etleap gives customers robust and flexible pipelines without the hassle of coding and managing infrastructure. Redshift Serverless has a similar benefit, letting you run Amazon Redshift without worrying about provisioning and maintaining data warehouse.

With the close Etleap-AWS integration, you can get started working with multiple data sources in Redshift Serverless in minutes.

Redshift Serverless can also reduce users’ costs because it automatically scales data warehouse capacity up and down to match usage and only charges when the serverless instance is active. ETL workloads are often batch-based and characterized by spikes, so the dynamic scaling of Redshift Serverless reduces unnecessary costs.

The following diagram illustrates this solution architecture.

Etleap Integration with Amazon Redshift Serverless

Etleap uses Amazon Database Migration Service (AWS DMS), Amazon EMR, and Amazon Simple Storage Service (Amazon S3) to process data from databases, files, applications, and streams into Redshift Serverless.

Optimize costs for Redshift Serverless

One of the main sources of cost savings when using Redshift Serverless comes from its auto-pausing feature. When a Redshift Serverless instance is idle, it will auto-pause and you aren’t charged during this period of inactivity.

However, high frequency ETL pipelines (such as those from streams or CDC sources) can constantly resume the Redshift Serverless instance, negating the cost benefit. To maximize the advantages of the auto-pausing feature of Redshift Serverless, Etleap provides the option of load synchronization. As shown in the following figure, this reduces the number of load batches, thereby lowering active Redshift Serverless instance time and cost.

Etleap Load Synchronization

It sometimes makes sense to maximize the frequency of data ingestion, but not all use cases justify the higher cost of an always-on Amazon Redshift instance. Etleap users can set their load frequency at a cost-efficient once-per-hour or as frequently as every 5 minutes.

Amazon Redshift users typically run some SQL transformations after data is loaded in the warehouse. Etleap’s models feature lets you define the SQL transformations and their dependencies and control when these transformations are run. As with data loading, however, if these aren’t designed thoughtfully, there is a risk that models will trigger updates that unnecessarily wake up an idle Redshift Serverless instance, negating the cost savings of the Redshift Serverless auto-pausing feature.

To avoid this, Etleap schedules the models to update immediately after all the dependent tables have been updated. This maximizes the instance usage while it’s awake and allows it to pause when the loads and updates have completed.

Cost savings example

Let’s illustrate the cost savings benefits of Redshift Serverless by means of an example. A customer has set a 1-hour load synchronization schedule and has 100 pipelines and 10 models. Although by default Redshift Serverless has a provisioned base capacity of 128 RPUs, a provisioned base capacity of 32 RPUs is sufficient for the load requirements of this example. A typical average load time for Etleap customers into Amazon Redshift is 6 seconds. In Etleap, we perform a maximum of five loads at a time to avoid overloading the Redshift Serverless instance.

Here is an example of how the sequence would work for the pipelines:

  1. When the hourly schedule triggers, Etleap begins the extraction and transformation of source data for all pipelines with new data to process.
  2. After all the pipelines have finished extraction and transformation, Etleap begins to load the data into Amazon Redshift. This resumes the serverless instance. At an average of 6 seconds per load and five loads running in parallel, it takes 120 seconds to load all the pipelines (100 / 5 pipeline cycles * 6 seconds each).
  3. When the load is complete, Etleap triggers the model updates. A typical model in Etleap takes about 130 seconds to update. As with loads, Etleap limits models to five simultaneous updates to reduce the load on the Redshift Serverless instance. Therefore, updating all 10 models takes 260 seconds of total instance run time (130 seconds * 10/5 model cycles).
  4. At this point, you’re being charged for 380 seconds of active workload, and Redshift Serverless will become idle after some time.

Additionally, Etleap runs daily vacuum operations on applicable tables to minimize storage and improve query efficiency. The length of this process depends on the tables and the number of updates and deletes. For a customer with this amount of pipeline volume, 20 minutes is a typical length of time to vacuum the tables, adding that much daily runtime for the instance.

This results in a total daily runtime of 172 minutes ((380 seconds * 24 daily cycles / 60) + 20 minutes), which translates into a cost of $34.40 per day for a 32 RPU serverless instance. This is 88% lower cost than a comparable Amazon Redshift provisioned environment without the benefits of Etleap and Redshift Serverless: an always-on provisioned Amazon Redshift cluster with similar performance (1 year reserved instance pricing for 16 ra3.xlplus nodes running 24 hours/day).

Other ETL optimizations on Etleap using Redshift Serverless

Etleap natively supports Redshift Serverless by updating its ETL solution to ensure you can continue to seamlessly ingest diverse data sources.

Redshift Serverless offers new system views that are used for tracking and managing ingestion, and Etleap utilizes these new system views to natively handle tracking ingestion loads and vacuuming operations in their platform. For example, Etleap uses sys_query_history to determine which loads are in progress or complete, and thereby helps avoid double loading a batch.

Redshift Serverless automatically initiates optimizations such as sort and vacuum in the background and doesn’t charge for these automatic optimizations. As a best practice, after Etleap load synchronization, Etleap periodically runs the vacuum function on applicable tables, which reduces storage and improves query performance. Etleap uses the vacuum_sort_benefit column in svv_table_info, which provides the statistics for each table, informing which would benefit from vacuuming.

Summary

In this post, we described how Redshift Serverless frees you from managing data warehouse infrastructure and reduces costs. In particular, we illustrated a data integration pattern where Etleap can ensure further cost savings through its load synchronization feature by optimally choosing a cost-efficient once-per-hour load frequency. Although this proves to be an optimal solution for uses cases where you prefer cost efficiency over real-time data insights, Etleap also allows you to set the load frequency as low as 5 minutes for use cases where near-real-time data insights are important.

Start using Redshift Serverless to run and scale analytics without having to manage data warehouse infrastructure and take advantage of further cost savings through Etleap’s load synchronization feature. To get started with Etleap, start a free trial  or request a tailored demo.


About the Authors

Caius Brindescu is an engineer at Etleap with over 4 years of experience in developing ETL software. In addition to development work, he helps customers make the most out of Etleap and Amazon Redshift. He holds a PhD from Oregon State University and one AWS certification (Big Data – Specialty).

Maneesh Sharma is a Senior Database Engineer at AWS with more than a decade of experience designing and implementing large-scale data warehouse and analytics solutions. He collaborates with various Amazon Redshift Partners and customers to drive better integration.

Sathisan Vannadil is a Senior Partner Solutions Architect at Amazon Web Services (AWS). His primary focus is on helping independent software vendor (ISV) partners design and build solutions at scale on AWS. Prior to AWS, Sathisan held diverse technical positions and has over 20 years of experience in the field of data and analytics.

Integrate Etleap with Amazon Redshift Streaming Ingestion (preview) to make data available in seconds

Post Syndicated from Caius Brindescu original https://aws.amazon.com/blogs/big-data/integrate-etleap-with-amazon-redshift-streaming-ingestion-preview-to-make-data-available-in-seconds/

Amazon Redshift is a fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using SQL and your extract, transform, and load (ETL), business intelligence (BI), and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads.

Etleap is an AWS Advanced Technology Partner with the AWS Data & Analytics Competency and Amazon Redshift Service Ready designation. Etleap ETL removes the headaches experienced building data pipelines. A cloud-native platform that seamlessly integrates with AWS infrastructure, Etleap ETL consolidates data without the need for coding. Automated issue detection pinpoints problems so data teams can stay focused on business initiatives, not data pipelines.

In this post, we show how Etleap customers are integrating with the new streaming ingestion feature in Amazon Redshift (currently in limited preview) to load data directly from Amazon Kinesis Data Streams. This reduces load times from minutes to seconds and helps you gain faster data insights.

Amazon Redshift streaming ingestion with Kinesis Data Streams

Traditionally, you had to use Amazon Kinesis Data Firehose to land your stream into Amazon Simple Storage Service (Amazon S3) files and then employ a COPY command to move the data into Amazon Redshift. This method incurs latencies in the order of minutes.

Now, the native streaming ingestion feature in Amazon Redshift lets you ingest data directly from Kinesis Data Streams. The new feature enables you to ingest hundreds of megabytes of data per second and query it at exceptionally low latency—in many cases only 10 seconds after entering the data stream.

Configure Amazon Redshift streaming ingestion with SQL queries

Amazon Redshift streaming ingestion uses SQL to connect with one or more Kinesis data streams simultaneously. In this section, we walk through the steps to configure streaming ingestion.

Create an external schema

We begin by creating an external schema referencing Kinesis using syntax adapted from Redshift’s support for Federated Queries:

CREATE EXTERNAL SCHEMA MySchema
FROM Kinesis
IAM_ROLE { default | 'iam-role-arn' };

This external schema command creates an object inside Amazon Redshift that acts as a proxy to Kinesis Data Streams. Specifically, to the collection of data streams that are accessible via the AWS Identity and Access Management (IAM) role. You can use either the default Amazon Redshift cluster IAM role or a specified IAM role that has been attached to the cluster previously.

Create a materialized view

You can use Amazon Redshift materialized views to materialize a point-in-time view of a Kinesis data stream, as accumulated up to the time it is queried. The following command creates a materialized view over a stream from the previously defined schema:

CREATE MATERIALIZED VIEW MyView AS
SELECT *
FROM MySchema.MyStream;

Note the use of the dot syntax to pick out the particular stream desired. The attributes of the stream include a timestamp field, partition key, sequence number, and a VARBYTE data payload.

Although the previous materialized view definition simply performs a SELECT *, more sophisticated processing is possible, for instance, applying filtering conditions or shredding JSON data into columns. To demonstrate, consider the following Kinesis data stream with JSON payloads:

{
 “player” : “alice 127”,
 “region” : “us-west-1”,
 “action” : “entered shop”,
}

To demonstrate this, write a materialized view that shreds the JSON into columns, focusing only on the entered shop action:

CREATE MATERIALIZED VIEW ShopEntrances AS
SELECT ApproximateArrivalTimestamp, SequenceNumber,
   json_extract_path_text(from_varbyte(Data, 'utf-8'), 'player') as Player,
   json_extract_path_text(from_varbyte(Data, 'utf-8'), 'region') as Region
FROM MySchema.Actions
WHERE json_extract_path_text(from_varbyte(Data, 'utf-8'), 'action') = 'entered shop';

On the Amazon Redshift leader node, the view definition is parsed and analyzed. On success, it is added to the system catalogs. No further communication with Kinesis Data Streams occurs until the initial refresh.

Refresh the materialized view

The following command pulls data from Kinesis Data Streams into Amazon Redshift:

REFRESH MATERIALIZED VIEW MyView;

You can initiate it manually (via the SQL preceding command) or automatically via a scheduled query. In either case, it uses the IAM role associated with the stream. Each refresh is incremental and massively parallel, storing its progress in each Kinesis shard in the system catalogs so as to be ready for the next round of refresh.

With this process, you can now query near-real-time data from your Kinesis data stream through Amazon Redshift.

Use Amazon Redshift streaming ingestion with Etleap

Etleap pulls data from databases, applications, file stores, and event streams, and transforms it before loading it into an AWS data repository. Data ingestion pipelines typically process batches every 5–60 minutes, so when you query your data in Amazon Redshift, it’s at least 5 minutes out of date. For many use cases, such as ad hoc queries and BI reporting, this latency time is acceptable.

But what about when your team demands more up-to-date data? An example is operational dashboards, where you need to track KPIs in near-real time. Amazon Redshift load times are bottlenecked by COPY commands that move data from Amazon S3 into Amazon Redshift, as mentioned earlier.

This is where streaming ingestion comes in: by staging the data in Kinesis Data Streams rather than Amazon S3, Etleap can reduce data latency in Amazon Redshift to less than 10 seconds. To preview this feature, we ingest data from SQL databases such as MySQL and Postgres that support change data capture (CDC). The data flow is shown in the following diagram.

Etleap manages the end-to-end data flow through AWS Database Migration Service (AWS DMS) and Kinesis Data Streams, and creates and schedules Amazon Redshift queries, providing up-to-date data.

AWS DMS consumes the replication logs from the source, and produces insert, update, and delete events. These events are written to a Kinesis data stream that has multiple shards in order to handle the event load. Etleap transforms these events according to user-specified rules, and writes them to another data stream. Finally, a sequence of Amazon Redshift commands load data from the stream into a destination table. This procedure takes less than 10 seconds in real-world scenarios.

Configure Amazon Redshift streaming ingestion with Etleap

Previously, we explored how data in Kinesis Data Streams can be accessed in Amazon Redshift using SQL queries. In this section, we see how Etleap uses the streaming ingestion feature to mirror a table from MySQL into Amazon Redshift, and the end-to-end latency we can achieve.

Etleap customers that are part of the Streaming Ingestion Preview Program can ingest data into Amazon Redshift directly from an Etleap-managed Kinesis data stream. All pipelines from a CDC-enabled source automatically use this feature.

The destination table in Amazon Redshift is Type 1, a mirror of the table in the source database.

For example, say you want to mirror a MySQL table in Amazon Redshift. The table represents the online shopping carts that users have open. In this case, low latency is critical so that the platform marketing strategists can instantly identify abandoned carts and high demand items.

The cart table has the following structure:

CREATE TABLE cart (
id int PRIMARY KEY AUTO_INCREMENT, 
user_id INT,
current_price DECIMAL(6,2),
no_items INT,
checked_out TINY_INT(1),
update_date TIMESTAMP
);

Changes from the source table are captured using AWS DMS and then sent to Etleap via a Kinesis data stream. Etleap transforms these records and writes them to another data stream using the following structure:

{
            "id": 8322,
            "user_id": 443,
            "current_price": 22.98,
            "no_items": 3,
            "checked_out": 0,
            "update_date": "2021-11-05 23:11",
            "op": "U"
}

The structure encodes the row that was modified or inserted, as well as the operation type (represented by the op column), which can have three values: I (insert), U (update) or D (delete).

This information is then materialized in Amazon Redshift from the data stream:

CREATE EXTERNAL SCHEMA etleap_stream
FROM KINESIS
IAM_ROLE '<redacted>';

CREATE MATERIALIZED VIEW cart_staging
DISTSTYLE KEY
	DISTKEY(id)
	SORTKEY(etleap_sequence_no)
AS SELECT
	CAST(PartitionKey as bigint) AS etleap_sequence_no,
	CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'id') as bigint) AS id,
	JSON_PARSE(FROM_VARBYTE(Data, 'utf-8')) AS Data
FROM etleap_stream."cart";

In the materialized view, we expose the following columns:

  • PartitionKey represents an Etleap sequence number, to ensure that updates are processed in the correct order.
  • We shred the primary keys of the table (id in the preceding example) from the payload, using them as a distribution key to improve the update performance.
  • The Data column is parsed out into a SUPER type from the JSON object in the stream. This is shredded into the corresponding columns in the cart table when the data is inserted.

With this staging materialized view, Etleap then updates the destination table (cart) that has the following schema:

CREATE TABLE cart ( 
id BIGINT PRIMARY KEY,
user_id BIGINT,
current_price DECIMAL(6,2),
no_items INT,
checked_out BOOLEAN,
update_date VARCHAR(64)
)
DISTSTYLE key
distkey(id);

To update the table, Etleap runs the following queries, selecting only the changed rows from the staging materialized view, and applies them to the cart table:

BEGIN;

REFRESH MATERIALIZED VIEW cart_staging;

UPDATE _etleap_si SET end_sequence_no = (
	SELECT COALESCE(MIN(etleap_sequence_no), (SELECT MAX(etleap_sequence_no) FROM cart_staging)) FROM 
	(
		SELECT 
			etleap_sequence_no, 
			LEAD(etleap_sequence_no, 1) OVER (ORDER BY etleap_sequence_no) - etleap_sequence_no AS diff
		FROM cart_staging 
		WHERE etleap_sequence_no > (SELECT start_sequence_no FROM _etleap_si WHERE table_name = 'cart')
	)
	WHERE diff > 1
) WHERE table_name = 'cart';



DELETE FROM cart
WHERE id IN (
	SELECT id
	FROM cart_staging
	WHERE etleap_sequence_no > (SELECT start_sequence_no FROM _etleap_si WHERE table_name = 'cart') 
	AND etleap_sequence_no <= (SELECT end_sequence_no FROM _etleap_si WHERE table_name = 'cart')
);

INSERT INTO cart
SELECT 
	DISTINCT(id),
	CAST(Data."timestamp" as timestamp),
	CAST(Data.payload as varchar(256)),
	CAST(Data.etleap_sequence_no as bigint) from
  	(SELECT id, 
  	JSON_PARSE(LAST_VALUE(JSON_SERIALIZE(Data)) OVER (PARTITION BY id ORDER BY etleap_sequence_no ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) AS Data
   	FROM cart_staging
	WHERE etleap_sequence_no > (SELECT start_sequence_no FROM _etleap_si WHERE table_name = 'cart') 
	AND etleap_sequence_no <= (SELECT end_sequence_no FROM _etleap_si WHERE table_name = 'cart'
AND Data.op != 'D')
);


UPDATE _etleap_si SET start_sequence_no = end_sequence_no WHERE table_name = 'cart';

COMMIT;

We run the following sequence of queries:

  1. Refresh the cart_staging materialized view to get new records from the cart stream.
  2. Delete all records from the cart table that were updated or deleted since the last time we ran the update sequence.
  3. Insert all the updated and newly inserted records from the cart_staging materialized view into the cart table.
  4. Update the _etleap_si bookkeeping table with the current position. Etleap uses this table to optimize the query in the staging materialized view.

This update sequence runs continuously to minimize end-to-end latency. To measure performance, we simulated the change stream from a database table that has up to 100,000 inserts, updates, and deletes. We tested target table sizes of up to 1.28 billion rows. Testing was done on a 2-node ra3.xlplus Amazon Redshift cluster and a Kinesis data stream with 32 shards.

The following figure shows how long the update sequence takes on average over 5 runs in different scenarios. Even in the busiest scenario (100,000 changes to a 1.28 billion row table), the sequence takes just over 10 seconds to run. In our experiment, the refresh time was independent of the delta size, and took 3.7 seconds with a standard deviation of 0.4 seconds.

This shows that the update process can keep up with source database tables that have 1 billion rows and 10,000 inserts, updates, and deletes per second.

Summary

In this post, you learned about the native streaming ingestion feature in Amazon Redshift and how it achieves latency in seconds, while ingesting data from Kinesis Data Streams into Amazon Redshift. You also learned about the architecture of Amazon Redshift with the streaming ingestion feature enabled, how to configure it using SQL commands, and use the capability in Etleap.

To learn more about Etleap, take a look at the Etleap ETL on AWS Quick Start, or visit their listing on AWS Marketplace.


About the Authors

Caius Brindescu is an engineer at Etleap with over 3 years of experience in developing ETL software. In addition to development work, he helps customers make the most out of Etleap and Amazon Redshift. He holds a PhD from Oregon State University and one AWS certification (Big Data – Specialty).

Todd J. Green is a Principal Engineer with AWS Redshift. Before joining Amazon, TJ worked at innovative database startups including LogicBlox and RelationalAI, and was an Assistant Professor of Computer Science at UC Davis. He received his PhD in Computer Science from UPenn. In his career as a researcher, TJ won a number of awards, including the 2017 ACM PODS Test-of-Time Award.

Maneesh Sharma is a Senior Database Engineer with Amazon Redshift. He works and collaborates with various Amazon Redshift Partners to drive better integration. In his spare time, he likes running, playing ping pong, and exploring new travel destinations.

Jobin George is a Big Data Solutions Architect with more than a decade of experience designing and implementing large-scale big data and analytics solutions. He provides technical guidance, design advice, and thought leadership to some of the key AWS customers and big data partners.