All posts by Poulomi Dasgupta

Best practices to implement near-real-time analytics using Amazon Redshift Streaming Ingestion with Amazon MSK

Post Syndicated from Poulomi Dasgupta original https://aws.amazon.com/blogs/big-data/best-practices-to-implement-near-real-time-analytics-using-amazon-redshift-streaming-ingestion-with-amazon-msk/

Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, straightforward, and secure analytics at scale. Tens of thousands of customers rely on Amazon Redshift to analyze exabytes of data and run complex analytical queries, making it the most widely used cloud data warehouse. You can run and scale analytics in seconds on all your data, without having to manage your data warehouse infrastructure.

You can use the Amazon Redshift Streaming Ingestion capability to update your analytics databases in near-real time. Amazon Redshift streaming ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use Structured Query Language (SQL) to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams, and pull data directly to Amazon Redshift.

In this post, we discuss the best practices to implement near-real-time analytics using Amazon Redshift streaming ingestion with Amazon MSK.

Overview of solution

We walk through an example pipeline to ingest data from an MSK topic into Amazon Redshift using Amazon Redshift streaming ingestion. We also show how to unnest JSON data using dot notation in Amazon Redshift. The following diagram illustrates our solution architecture.

The process flow consists of the following steps:

  1. Create a streaming materialized view in your Redshift cluster to consume live streaming data from the MSK topics.
  2. Use a stored procedure to implement change data capture (CDC) using the unique combination of Kafka Partition and Kafka Offset at the record level for the ingested MSK topic.
  3. Create a user-facing table in the Redshift cluster and use dot notation to unnest the JSON document from the streaming materialized view into data columns of the table. You can continuously load fresh data by calling the stored procedure at regular intervals.
  4. Establish connectivity between an Amazon QuickSight dashboard and Amazon Redshift to deliver visualization and insights.

As part of this post, we also discuss the following topics:

  • Steps to configure cross-account streaming ingestion from Amazon MSK to Amazon Redshift
  • Best practices to achieve optimized performance from streaming materialized views
  • Monitoring techniques to track failures in Amazon Redshift streaming ingestion

Prerequisites

You must have the following:

Considerations while setting up your MSK topic

Keep in mind the following considerations when configuring your MSK topic:

  • Make sure that the name of your MSK topic is no longer than 128 characters.
  • As of this writing, MSK records containing compressed data can’t be directly queried in Amazon Redshift. Amazon Redshift doesn’t support any native decompression methods for client-side compressed data in an MSK topic.
  • Follow best practices while setting up your MSK cluster.
  • Review the streaming ingestion limitations for any other considerations.

Set up streaming ingestion

To set up streaming ingestion, complete the following steps:

  1. Set up the AWS Identity and Access Management (IAM) role and trust policy required for streaming ingestion. For instructions, refer to the Setting up IAM and performing streaming ingestion from Kafka.
  2. Make sure that data is flowing into your MSK topic using Amazon CloudWatch metrics (for example, BytesOutPerSec).
  3. Launch the query editor v2 from the Amazon Redshift console or use your preferred SQL client to connect to your Redshift cluster for the next steps. The following steps were run in query editor v2.
  4. Create an external schema to map to the MSK cluster. Replace your IAM role ARN and the MSK cluster ARN in the following statement:
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  'iam-role-arn' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn';
    

  5. Optionally, if your topic names are case sensitive, you need to enable enable_case_sensitive_identifier to be able to access them in Amazon Redshift. To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session, user, or cluster level:
    SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;

  6. Create a materialized view to consume the stream data from the MSK topic:
    CREATE MATERIALIZED VIEW Orders_Stream_MV AS
    SELECT kafka_partition, 
     kafka_offset, 
     refresh_time,
     JSON_PARSE(kafka_value) as Data
    FROM custschema."ORDERTOPIC"
    WHERE CAN_JSON_PARSE(kafka_value);
    

The metadata column kafka_value that arrives from Amazon MSK is stored in VARBYTE format in Amazon Redshift. For this post, you use the JSON_PARSE function to convert kafka_value to a SUPER data type. You also use the CAN_JSON_PARSE function in the filter condition to skip invalid JSON records and guard against errors due to JSON parsing failures. We discuss how to store the invalid data for future debugging later in this post.

  1. Refresh the streaming materialized view, which triggers Amazon Redshift to read from the MSK topic and load data into the materialized view:
    REFRESH MATERIALIZED VIEW Orders_Stream_MV;

You can also set your streaming materialized view to use auto refresh capabilities. This will automatically refresh your materialized view as data arrives in the stream. See CREATE MATERIALIZED VIEW for instructions to create a materialized view with auto refresh.

Unnest the JSON document

The following is a sample of a JSON document that was ingested from the MSK topic to the Data column of SUPER type in the streaming materialized view Orders_Stream_MV:

{
   "EventType":"Orders",
   "OrderID":"103",
   "CustomerID":"C104",
   "CustomerName":"David Smith",
   "OrderDate":"2023-09-02",
   "Store_Name":"Store-103",
   "ProductID":"P004",
   "ProductName":"Widget-X-003",
   "Quatity":"5",
   "Price":"2500",
   "OrderStatus":"Initiated"
}

Use dot notation as shown in the following code to unnest your JSON payload:

SELECT 
    data."OrderID"::INT4 as OrderID
    ,data."ProductID"::VARCHAR(36) as ProductID
    ,data."ProductName"::VARCHAR(36) as ProductName
    ,data."CustomerID"::VARCHAR(36) as CustomerID
    ,data."CustomerName"::VARCHAR(36) as CustomerName
    ,data."Store_Name"::VARCHAR(36) as Store_Name
    ,data."OrderDate"::TIMESTAMPTZ as OrderDate
    ,data."Quatity"::INT4 as Quatity
    ,data."Price"::DOUBLE PRECISION as Price
    ,data."OrderStatus"::VARCHAR(36) as OrderStatus
    ,"kafka_partition"::BIGINT  
    ,"kafka_offset"::BIGINT
FROM orders_stream_mv;

The following screenshot shows what the result looks like after unnesting.

If you have arrays in your JSON document, consider unnesting your data using PartiQL statements in Amazon Redshift. For more information, refer to the section Unnest the JSON document in the post Near-real-time analytics using Amazon Redshift streaming ingestion with Amazon Kinesis Data Streams and Amazon DynamoDB.

Incremental data load strategy

Complete the following steps to implement an incremental data load:

  1. Create a table called Orders in Amazon Redshift, which end-users will use for visualization and business analysis:
    CREATE TABLE public.Orders (
        orderid integer ENCODE az64,
        productid character varying(36) ENCODE lzo,
        productname character varying(36) ENCODE lzo,
        customerid character varying(36) ENCODE lzo,
        customername character varying(36) ENCODE lzo,
        store_name character varying(36) ENCODE lzo,
        orderdate timestamp with time zone ENCODE az64,
        quatity integer ENCODE az64,
        price double precision ENCODE raw,
        orderstatus character varying(36) ENCODE lzo
    ) DISTSTYLE AUTO;
    

Next, you create a stored procedure called SP_Orders_Load to implement CDC from a streaming materialized view and load into the final Orders table. You use the combination of Kafka_Partition and Kafka_Offset available in the streaming materialized view as system columns to implement CDC. The combination of these two columns will always be unique within an MSK topic, which makes sure that none of the records are missed during the process. The stored procedure contains the following components:

  • To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session, user, or cluster level.
  • Refresh the streaming materialized view manually if auto refresh is not enabled.
  • Create an audit table called Orders_Streaming_Audit if it doesn’t exist to keep track of the last offset for a partition that was loaded into Orders table during the last run of the stored procedure.
  • Unnest and insert only new or changed data into a staging table called Orders_Staging_Table, reading from the streaming materialized view Orders_Stream_MV, where Kafka_Offset is greater than the last processed Kafka_Offset recorded in the audit table Orders_Streaming_Audit for the Kafka_Partition being processed.
  • When loading for the first time using this stored procedure, there will be no data in the Orders_Streaming_Audit table and all the data from Orders_Stream_MV will get loaded into the Orders table.
  • Insert only business-relevant columns to the user-facing Orders table, selecting from the staging table Orders_Staging_Table.
  • Insert the max Kafka_Offset for every loaded Kafka_Partition into the audit table Orders_Streaming_Audit

We have added the intermediate staging table Orders_Staging_Table in this solution to help with the debugging in case of unexpected failures and trackability. Skipping the staging step and directly loading into the final table from Orders_Stream_MV can provide lower latency depending on your use case.

  1. Create the stored procedure with the following code:
    CREATE OR REPLACE PROCEDURE SP_Orders_Load()
        AS $$
        BEGIN
    
        SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;
        REFRESH MATERIALIZED VIEW Orders_Stream_MV;
    
        --create an audit table if not exists to keep track of Max Offset per Partition that was loaded into Orders table  
    
        CREATE TABLE IF NOT EXISTS Orders_Streaming_Audit
        (
        "kafka_partition" BIGINT,
        "kafka_offset" BIGINT
        )
        SORTKEY("kafka_partition", "kafka_offset"); 
    
        DROP TABLE IF EXISTS Orders_Staging_Table;  
    
        --Insert only newly available data into staging table from streaming View based on the max offset for new/existing partitions
      --When loading for 1st time i.e. there is no data in Orders_Streaming_Audit table then all the data gets loaded from streaming View  
        CREATE TABLE Orders_Staging_Table as 
        SELECT 
        data."OrderID"."N"::INT4 as OrderID
        ,data."ProductID"."S"::VARCHAR(36) as ProductID
        ,data."ProductName"."S"::VARCHAR(36) as ProductName
        ,data."CustomerID"."S"::VARCHAR(36) as CustomerID
        ,data."CustomerName"."S"::VARCHAR(36) as CustomerName
        ,data."Store_Name"."S"::VARCHAR(36) as Store_Name
        ,data."OrderDate"."S"::TIMESTAMPTZ as OrderDate
        ,data."Quatity"."N"::INT4 as Quatity
        ,data."Price"."N"::DOUBLE PRECISION as Price
        ,data."OrderStatus"."S"::VARCHAR(36) as OrderStatus
        , s."kafka_partition"::BIGINT , s."kafka_offset"::BIGINT
        FROM Orders_Stream_MV s
        LEFT JOIN (
        SELECT
        "kafka_partition",
        MAX("kafka_offset") AS "kafka_offset"
        FROM Orders_Streaming_Audit
        GROUP BY "kafka_partition"
        ) AS m
        ON nvl(s."kafka_partition",0) = nvl(m."kafka_partition",0)
        WHERE
        m."kafka_offset" IS NULL OR
        s."kafka_offset" > m."kafka_offset";
    
        --Insert only business relevant column to final table selecting from staging table
        Insert into Orders 
        SELECT 
        OrderID
        ,ProductID
        ,ProductName
        ,CustomerID
        ,CustomerName
        ,Store_Name
        ,OrderDate
        ,Quatity
        ,Price
        ,OrderStatus
        FROM Orders_Staging_Table;
    
        --Insert the max kafka_offset for every loaded Kafka partitions into Audit table 
        INSERT INTO Orders_Streaming_Audit
        SELECT kafka_partition, MAX(kafka_offset)
        FROM Orders_Staging_Table
        GROUP BY kafka_partition;   
    
        END;
        $$ LANGUAGE plpgsql;
    

  2. Run the stored procedure to load data into the Orders table:
    call SP_Orders_Load();

  3. Validate data in the Orders table.

Establish cross-account streaming ingestion

If your MSK cluster belongs to a different account, complete the following steps to create IAM roles to set up cross-account streaming ingestion. Let’s assume the Redshift cluster is in account A and the MSK cluster is in account B, as shown in the following diagram.

Complete the following steps:

  1. In account B, create an IAM role called MyRedshiftMSKRole that allows Amazon Redshift (account A) to communicate with the MSK cluster (account B) named MyTestCluster. Depending on whether your MSK cluster uses IAM authentication or unauthenticated access to connect, you need to create an IAM role with one of the following policies:
    • An IAM policAmazonAmazon MSK using unauthenticated access:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }

    • An IAM policy for Amazon MSK when using IAM authentication:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKIAMpolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka-cluster:ReadData",
                      "kafka-cluster:DescribeTopic",
                      "kafka-cluster:Connect"
                  ],
                  "Resource": [
                      "arn:aws:kafka:us-east-1:0123456789:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1",
                      "arn:aws:kafka:us-east-1:0123456789:topic/MyTestCluster/*"
                  ]
              },
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }
      

The resource section in the preceding example gives access to all topics in the MyTestCluster MSK cluster. If you need to restrict the IAM role to specific topics, you need to replace the topic resource with a more restrictive resource policy.

  1. After you create the IAM role in account B, take note of the IAM role ARN (for example, arn:aws:iam::0123456789:role/MyRedshiftMSKRole).
  2. In account A, create a Redshift customizable IAM role called MyRedshiftRole, that Amazon Redshift will assume when connecting to Amazon MSK. The role should have a policy like the following, which allows the Amazon Redshift IAM Role in account A to assume the Amazon MSK role in account B:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "RedshiftMSKAssumePolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::0123456789:role/MyRedshiftMSKRole"        
           }
        ]
    }
    

  3. Take note of the role ARN for the Amazon Redshift IAM role (for example, arn:aws:iam::9876543210:role/MyRedshiftRole).
  4. Go back to account B and add this role in the trust policy of the IAM role arn:aws:iam::0123456789:role/MyRedshiftMSKRole to allow account B to trust the IAM role from account A. The trust policy should look like the following code:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "sts:AssumeRole",
          "Principal": {
            "AWS": "arn:aws:iam::9876543210:role/MyRedshiftRole"
          }
        }
      ]
    } 
    

  5. Sign in to the Amazon Redshift console as account A.
  6. Launch the query editor v2 or your preferred SQL client and run the following statements to access the MSK topic in account B. To map to the MSK cluster, create an external schema using role chaining by specifying IAM role ARNs, separated by a comma without any spaces around it. The role attached to the Redshift cluster comes first in the chain.
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  
    'arn:aws:iam::9876543210:role/MyRedshiftRole,arn:aws:iam::0123456789:role/MyRedshiftMSKRole' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn'; --replace with ARN of MSK cluster 
    

Performance considerations

Keep in mind the following performance considerations:

  • Keep the streaming materialized view simple and move transformations like unnesting, aggregation, and case expressions to a later step—for example, by creating another materialized view on top of the streaming materialized view.
  • Consider creating only one streaming materialized view in a single Redshift cluster or workgroup for a given MSK topic. Creation of multiple materialized views per MSK topic can slow down the ingestion performance because each materialized view becomes a consumer for that topic and shares the Amazon MSK bandwidth for that topic. Live streaming data in a streaming materialized view can be shared across multiple Redshift clusters or Redshift Serverless workgroups using data sharing.
  • While defining your streaming materialized view, avoid using Json_Extract_Path_Text to pre-shred data, because Json_extract_path_text operates on the data row by row, which significantly impacts ingestion throughput. It is preferable to land the data as is from the stream and then shred it later.
  • Where possible, consider skipping the sort key in the streaming materialized view to accelerate the ingestion speed. When a streaming materialized view has a sort key, a sort operation will occur with every batch of ingested data from the stream. Sorting has a performance overheard depending on the sort key data type, number of sort key columns, and amount of data ingested in each batch. This sorting step can increase the latency before the streaming data is available to query. You should weigh which is more important: latency on ingestion or latency on querying the data.
  • For optimized performance of the streaming materialized view and to reduce storage usage, occasionally purge data from the materialized view using delete, truncate, or alter table append.
  • If you need to ingest multiple MSK topics in parallel into Amazon Redshift, start with a smaller number of streaming materialized views and keep adding more materialized views to evaluate the overall ingestion performance within a cluster or workgroup.
  • Increasing the number of nodes in a Redshift provisioned cluster or the base RPU of a Redshift Serverless workgroup can help boost the ingestion performance of a streaming materialized view. For optimal performance, you should aim to have as many slices in your Redshift provisioned cluster as there are partitions in your MSK topic, or 8 RPU for every four partitions in your MSK topic.

Monitoring techniques

Records in the topic that exceed the size of the target materialized view column at the time of ingestion will be skipped. Records that are skipped by the materialized view refresh will be logged in the SYS_STREAM_SCAN_ERRORS system table.

Errors that occur when processing a record due to a calculation or a data type conversion or some other logic in the materialized view definition will result in the materialized view refresh failure until the offending record has expired from the topic. To avoid these types of issues, test the logic of your materialized view definition carefully; otherwise, land the records into the default VARBYTE column and process them later.

The following are available monitoring views:

  • SYS_MV_REFRESH_HISTORY – Use this view to gather information about the refresh history of your streaming materialized views. The results include the refresh type, such as manual or auto, and the status of the most recent refresh. The following query shows the refresh history for a streaming materialized view:
    select mv_name, refresh_type, status, duration  from SYS_MV_REFRESH_HISTORY where mv_name='mv_store_sales'

  • SYS_STREAM_SCAN_ERRORS – Use this view to check the reason why a record failed to load via streaming ingestion from an MSK topic. As of writing this post, when ingesting from Amazon MSK, this view only logs errors when the record is larger than the materialized view column size. This view will also show the unique identifier (offset) of the MSK record in the position column. The following query shows the error code and error reason when a record exceeded the maximum size limit:
    select mv_name, external_schema_name, stream_name, record_time, query_id, partition_id, "position", error_code, error_reason
    from SYS_STREAM_SCAN_ERRORS  where mv_name='test_mv' and external_schema_name ='streaming_schema'	;
    

  • SYS_STREAM_SCAN_STATES – Use this view to monitor the number of records scanned at a given record_time. This view also tracks the offset of the last record read in the batch. The following query shows topic data for a specific materialized view:
    select mv_name,external_schema_name,stream_name,sum(scanned_rows) total_records,
    sum(scanned_bytes) total_bytes 
    from SYS_STREAM_SCAN_STATES where mv_name='test_mv' and external_schema_name ='streaming_schema' group by 1,2,3;
    

  • SYS_QUERY_HISTORY – Use this view to check the overall metrics for a streaming materialized view refresh. This will also log errors in the error_message column for errors that don’t show up in SYS_STREAM_SCAN_ERRORS. The following query shows the error causing the refresh failure of a streaming materialized view:
    select  query_id, query_type, status, query_text, error_message from sys_query_history where status='failed' and start_time>='2024-02-03 03:18:00' order by start_time desc

Additional considerations for implementation

You have the choice to optionally generate a materialized view on top of a streaming materialized view, allowing you to unnest and precompute results for end-users. This approach eliminates the need to store the results in a final table using a stored procedure.

In this post, you use the CAN_JSON_PARSE function to guard against any errors to more successfully ingest data—in this case, the streaming records that can’t be parsed are skipped by Amazon Redshift. However, if you want to keep track of your error records, consider storing them in a column using the following SQL when creating the streaming materialized view:

CREATE MATERIALIZED VIEW Orders_Stream_MV AS 
SELECT
kafka_partition, 
kafka_offset, 
refresh_time, 
JSON_PARSE(kafka_value) as Data 
case when CAN_JSON_PARSE(kafka_value) = true then json_parse(kafka_value) end Data,
case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end Invalid_Data
FROM custschema."ORDERTOPIC";

You can also consider unloading data from the view SYS_STREAM_SCAN_ERRORS into an Amazon Simple Storage Service (Amazon S3) bucket and get alerts by sending a report via email using Amazon Simple Notification Service (Amazon SNS) notifications whenever a new S3 object is created.

Lastly, based on your data freshness requirement, you can use Amazon EventBridge to schedule the jobs in your data warehouse to call the aforementioned SP_Orders_Load stored procedure on a regular basis. EventBridge does this at fixed intervals, and you may need to have a mechanism (for example, an AWS Step Functions state machine) to monitor if the previous call to the procedure completed. For more information, refer to Creating an Amazon EventBridge rule that runs on a schedule. You can also refer to Accelerate orchestration of an ELT process using AWS Step Functions and Amazon Redshift Data API. Another option is to use Amazon Redshift query editor v2 to schedule the refresh. For details, refer to Scheduling a query with query editor v2.

Conclusion

In this post, we discussed best practices to implement near-real-time analytics using Amazon Redshift streaming ingestion with Amazon MSK. We showed you an example pipeline to ingest data from an MSK topic into Amazon Redshift using streaming ingestion. We also showed a reliable strategy to perform incremental streaming data load into Amazon Redshift using Kafka Partition and Kafka Offset. Additionally, we demonstrated the steps to configure cross-account streaming ingestion from Amazon MSK to Amazon Redshift and discussed performance considerations for optimized ingestion rate. Lastly, we discussed monitoring techniques to track failures in Amazon Redshift streaming ingestion.

If you have any questions, leave them in the comments section.


About the Authors

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Adekunle Adedotun is a Sr. Database Engineer with Amazon Redshift service. He has been working on MPP databases for 6 years with a focus on performance tuning. He also provides guidance to the development team for new and existing service features.

Near-real-time analytics using Amazon Redshift streaming ingestion with Amazon Kinesis Data Streams and Amazon DynamoDB

Post Syndicated from Poulomi Dasgupta original https://aws.amazon.com/blogs/big-data/near-real-time-analytics-using-amazon-redshift-streaming-ingestion-with-amazon-kinesis-data-streams-and-amazon-dynamodb/

Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, easy, and secure analytics at scale. Tens of thousands of customers rely on Amazon Redshift to analyze exabytes of data and run complex analytical queries, making it the widely used cloud data warehouse. You can run and scale analytics in seconds on all your data without having to manage your data warehouse infrastructure.

You can use the Amazon Redshift streaming ingestion capability to update your analytics databases in near-real time. Amazon Redshift streaming ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use SQL (Structured Query Language) to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams, and pull data directly to Amazon Redshift.

In this post, we discuss a solution that uses Amazon Redshift streaming ingestion to provide near-real-time analytics.

Overview of solution

We walk through an example pipeline to ingest data from an Amazon DynamoDB source table in near-real time using Kinesis Data Streams in combination with Amazon Redshift streaming ingestion. We also walk through using PartiQL in Amazon Redshift to unnest nested JSON documents and build fact and dimension tables that are used in your data warehouse refresh. The solution uses Kinesis Data Streams to capture item-level changes from an application DynamoDB table.

As shown in the following reference architecture, DynamoDB table data changes are streamed into Amazon Redshift through Kinesis Data Streams and Amazon Redshift streaming ingestion for near-real-time analytics dashboard visualization using Amazon QuickSight.

The process flow includes the following steps:

  1. Create a Kinesis data stream and turn on the data stream from DynamoDB to capture item-level changes in your DynamoDB table.
  2. Create a streaming materialized view in your Amazon Redshift cluster to consume live streaming data from the data stream.
  3. The streaming data gets ingested into a JSON payload. Use a combination of a PartiQL statement and dot notation to unnest the JSON document into data columns of a staging table in Amazon Redshift.
  4. Create fact and dimension tables in the Amazon Redshift cluster and keep loading the latest data at regular intervals from the staging table using transformation logic.
  5. Establish connectivity between a QuickSight dashboard and Amazon Redshift to deliver visualization and insights.

Prerequisites

You must have the following:

Set up a Kinesis data stream

To configure your Kinesis data stream, complete the following steps:

  1. Create a Kinesis data stream called demo-data-stream. For instructions, refer to Step 1 in Set up streaming ETL pipelines.

Configure the stream to capture changes from the DynamoDB table.

  1. On the DynamoDB console, choose Tables in the navigation pane.
  2. Open your table.
  3. On the Exports and streams tab, choose Turn on under Amazon Kinesis data stream details.

  1. For Destination Kinesis data stream, choose demo-data-stream.
  2. Choose Turn on stream.

Item-level changes in the DynamoDB table should now be flowing to the Kinesis data stream.

  1. To verify if the data is entering the stream, on the Kinesis Data Streams console, open demo-data-stream.
  2. On the Monitoring tab, find the PutRecord success – average (Percent) and PutRecord – sum (Bytes) metrics to validate record ingestion.

Set up streaming ingestion

To set up streaming ingestion, complete the following steps:

  1. Set up the AWS Identity and Access Management (IAM) role and trust policy required for streaming ingestion. For instructions, refer to Steps 1 and 2 in Getting started with streaming ingestion from Amazon Kinesis Data Streams.
  2. Launch the Query Editor v2 from the Amazon Redshift console or use your preferred SQL client to connect to your Amazon Redshift cluster for the next steps.
  3. Create an external schema:
CREATE EXTERNAL SCHEMA demo_schema
FROM KINESIS
IAM_ROLE { default | 'iam-role-arn' };
  1. To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session or cluster level.
  2. Create a materialized view to consume the stream data and store stream records in semi-structured SUPER format:
CREATE MATERIALIZED VIEW demo_stream_vw AS
    SELECT approximate_arrival_timestamp,
    partition_key,
    shard_id,
    sequence_number,
    json_parse(kinesis_data) as payload    
    FROM demo_schema."demo-data-stream";
  1. Refresh the view, which triggers Amazon Redshift to read from the stream and load data into the materialized view:
REFRESH MATERIALIZED VIEW demo_stream_vw;

You can also set your streaming materialized view to use auto refresh capabilities. This will automatically refresh your materialized view as data arrives in the stream. See CREATE MATERIALIZED VIEW for instructions on how to create a materialized view with auto refresh.

Unnest the JSON document

The following is a sample of a JSON document that was ingested from the Kinesis data stream to the payload column of the streaming materialized view demo_stream_vw:

{
  "awsRegion": "us-east-1",
  "eventID": "6d24680a-6d12-49e2-8a6b-86ffdc7306c1",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "sample-dynamoDB",
  "dynamodb": {
    "ApproximateCreationDateTime": 1657294923614,
    "Keys": {
      "pk": {
        "S": "CUSTOMER#CUST_123"
      },
      "sk": {
        "S": "TRANSACTION#2022-07-08T23:59:59Z#CUST_345"
      }
    },
    "NewImage": {
      "completionDateTime": {
        "S": "2022-07-08T23:59:59Z"
      },
      "OutofPockPercent": {
        "N": 50.00
      },
      "calculationRequirements": {
        "M": {
          "dependentIds": {
            "L": [
              {
                "M": {
                  "sk": {
                    "S": "CUSTOMER#2022-07-08T23:59:59Z#CUST_567"
                  },
                  "pk": {
                    "S": "CUSTOMER#CUST_123"
                  }
                }
              },
              {
                "M": {
                  "sk": {
                    "S": "CUSTOMER#2022-07-08T23:59:59Z#CUST_890"
                  },
                  "pk": {
                    "S": "CUSTOMER#CUST_123"
                  }
                }
              }
            ]
          }
        }
      },
      "Event": {
        "S": "SAMPLE"
      },
      "Provider": {
        "S": "PV-123"
      },
      "OutofPockAmount": {
        "N": 1000
      },
      "lastCalculationDateTime": {
        "S": "2022-07-08T00:00:00Z"
      },
      "sk": {
        "S": "CUSTOMER#2022-07-08T23:59:59Z#CUST_567"
      },
      "OutofPockMax": {
        "N": 2000
      },
      "pk": {
        "S": "CUSTOMER#CUST_123"
      }
    },
    "SizeBytes": 694
  },
  "eventSource": "aws:dynamodb"
}

We can use dot notation to unnest the JSON document. But in addition to that, we should use a PartiQL statement to handle arrays if applicable. For example, in the preceding JSON document, there is an array under the element:

"dynamodb"."NewImage"."calculationRequirements"."M"."dependentIds"."L".

The following SQL query uses a combination of dot notation and a PartiQL statement to unnest the JSON document:

select 
substring(a."payload"."dynamodb"."Keys"."pk"."S"::varchar, position('#' in "payload"."dynamodb"."Keys"."pk"."S"::varchar)+1) as Customer_ID,
substring(a."payload"."dynamodb"."Keys"."sk"."S"::varchar, position('#TRANSACTION' in "payload"."dynamodb"."Keys"."sk"."S"::varchar)+1) as Transaction_ID,
substring(b."M"."sk"."S"::varchar, position('#CUSTOMER' in b."M"."sk"."S"::varchar)+1) Dependent_ID,
a."payload"."dynamodb"."NewImage"."OutofPockMax"."N"::int as OutofPocket_Max,
a."payload"."dynamodb"."NewImage"."OutofPockPercent"."N"::decimal(5,2) as OutofPocket_Percent,
a."payload"."dynamodb"."NewImage"."OutofPockAmount"."N"::int as OutofPock_Amount,
a."payload"."dynamodb"."NewImage"."Provider"."S"::varchar as Provider,
a."payload"."dynamodb"."NewImage"."completionDateTime"."S"::timestamptz as Completion_DateTime,
a."payload"."eventName"::varchar Event_Name,
a.approximate_arrival_timestamp
from demo_stream_vw a
left outer join a."payload"."dynamodb"."NewImage"."calculationRequirements"."M"."dependentIds"."L" b on true;

The query unnests the JSON document to the following result set.

Precompute the result set using a materialized view

Optionally, to precompute and store the unnested result set from the preceding query, you can create a materialized view and schedule it to refresh at regular intervals. In this post, we maintain the preceding unnested data in a materialized view called mv_demo_super_unnest, which will be refreshed at regular intervals and used for further processing.

To capture the latest data from the DynamoDB table, the Amazon Redshift streaming materialized view needs to be refreshed at regular intervals, and then the incremental data should be transformed and loaded into the final fact and dimension table. To avoid reprocessing the same data, a metadata table can be maintained at Amazon Redshift to keep track of each ELT process with status, start time, and end time, as explained in the following section.

Maintain an audit table in Amazon Redshift

The following is a sample DDL of a metadata table that is maintained for each process or job:

create table MetaData_ETL
(
JobName varchar(100),
StartDate timestamp, 
EndDate timestamp, 
Status varchar(50)
);

The following is a sample initial entry of the metadata audit table that can be maintained at job level. The insert statement is the initial entry for the ELT process to load the Customer_Transaction_Fact table:

insert into MetaData_ETL 
values
('Customer_Transaction_Fact_Load', current_timestamp, current_timestamp,'Ready' );

Build a fact table with the latest data

In this post, we demonstrate the loading of a fact table using specific transformation logic. We are skipping the dimension table load, which uses similar logic.

As a prerequisite, create the fact and dimension tables in a preferred schema. In following example, we create the fact table Customer_Transaction_Fact in Amazon Redshift:

CREATE TABLE public.Customer_Transaction_Fact (
Transaction_ID character varying(500),
Customer_ID character varying(500),
OutofPocket_Percent numeric(5,2),
OutofPock_Amount integer,
OutofPocket_Max integer,
Provider character varying(500),
completion_datetime timestamp
);

Transform data using a stored procedure

We load this fact table from the unnested data using a stored procedure. For more information, refer to Creating stored procedures in Amazon Redshift.

Note that in this sample use case, we are using transformation logic to identify and load the latest value of each column for a customer transaction.

The stored procedure contains the following components:

  • In the first step of the stored procedure, the job entry in the MetaData_ETL table is updated to change the status to Running and StartDate as the current timestamp, which indicates that the fact load process is starting.
  • Refresh the materialized view mv_demo_super_unnest, which contains the unnested data.
  • In the following example, we load the fact table Customer_Transaction_Fact using the latest data from the streaming materialized view based on the column approximate_arrival_timestamp, which is available as a system column in the streaming materialized view. The value of approximate_arrival_timestamp is set when a Kinesis data stream successfully receives and stores a record.
  • The following logic in the stored procedure checks if the approximate_arrival_timestamp in mv_demo_super_unnest is greater than the EndDate timestamp in the MetaData_ETL audit table, so that it can only process the incremental data.
  • Additionally, while loading the fact table, we identify the latest non-null value of each column for every Transaction_ID depending on the order of the approximate_arrival_timestamp column using the rank and min
  • The transformed data is loaded into the intermediate staging table
  • The impacted records with the same Transaction_ID values are deleted and reloaded into the Customer_Transaction_Fact table from the staging table
  • In the last step of the stored procedure, the job entry in the MetaData_ETL table is updated to change the status to Complete and EndDate as the current timestamp, which indicates that the fact load process has completed successfully.

See the following code:

CREATE OR REPLACE PROCEDURE SP_Customer_Transaction_Fact()
AS $$
BEGIN

set enable_case_sensitive_identifier to true;

--Update metadata audit table entry to indicate that the fact load process is running
update MetaData_ETL
set status = 'Running',
StartDate = getdate()
where JobName = 'Customer_Transaction_Fact_Load';

refresh materialized view mv_demo_super_unnest;

drop table if exists Customer_Transaction_Fact_Stg;

--Create latest record by Merging records based on approximate_arrival_timestamp
create table Customer_Transaction_Fact_Stg as
select 
m.Transaction_ID,
min(case when m.rank_Customer_ID =1 then m.Customer_ID end) Customer_ID,
min(case when m.rank_OutofPocket_Percent =1 then m.OutofPocket_Percent end) OutofPocket_Percent,
min(case when m.rank_OutofPock_Amount =1 then m.OutofPock_Amount end) OutofPock_Amount,
min(case when m.rank_OutofPocket_Max =1 then m.OutofPocket_Max end) OutofPocket_Max,
min(case when m.rank_Provider =1 then m.Provider end) Provider,
min(case when m.rank_Completion_DateTime =1 then m.Completion_DateTime end) Completion_DateTime
from
(
select *,
rank() over(partition by Transaction_ID order by case when mqp.Customer_ID is not null then 1 end, approximate_arrival_timestamp desc ) rank_Customer_ID,
rank() over(partition by Transaction_ID order by case when mqp.OutofPocket_Percent is not null then 1 end, approximate_arrival_timestamp desc ) rank_OutofPocket_Percent,
rank() over(partition by Transaction_ID order by case when mqp.OutofPock_Amount is not null then 1 end, approximate_arrival_timestamp  desc )  rank_OutofPock_Amount,
rank() over(partition by Transaction_ID order by case when mqp.OutofPocket_Max is not null then 1 end, approximate_arrival_timestamp desc ) rank_OutofPocket_Max,
rank() over(partition by Transaction_ID order by case when mqp.Provider is not null then 1 end, approximate_arrival_timestamp  desc ) rank_Provider,
rank() over(partition by Transaction_ID order by case when mqp.Completion_DateTime is not null then 1 end, approximate_arrival_timestamp desc )  rank_Completion_DateTime
from mv_demo_super_unnest mqp
where upper(mqp.event_Name) <> 'REMOVE' and mqp.approximate_arrival_timestamp > (select mde.EndDate from MetaData_ETL mde where mde.JobName = 'Customer_Transaction_Fact_Load') 
) m
group by m.Transaction_ID 
order by m.Transaction_ID
;

--Delete only impacted Transaction_ID from Fact table
delete from Customer_Transaction_Fact  
where Transaction_ID in ( select mqp.Transaction_ID from Customer_Transaction_Fact_Stg mqp);

--Insert latest records from staging table to Fact table
insert into Customer_Transaction_Fact
select * from Customer_Transaction_Fact_Stg; 

--Update metadata audit table entry to indicate that the fact load process is completed
update MetaData_ETL
set status = 'Complete',
EndDate = getdate()
where JobName = 'Customer_Transaction_Fact_Load';
END;
$$ LANGUAGE plpgsql;

Additional considerations for implementation

There are several additional capabilities that you could utilize to modify this solution to meet your needs. Many customers utilize multiple AWS accounts, and it’s common that the Kinesis data stream may be in a different AWS account than the Amazon Redshift data warehouse. If this is the case, you can utilize an Amazon Redshift IAM role that assumes a role in the Kinesis data stream AWS account in order to read from the data stream. For more information, refer to Cross-account streaming ingestion for Amazon Redshift.

Another common use case is that you need to schedule the refresh of your Amazon Redshift data warehouse jobs so that the data warehouse’s data is continuously updated. To do this, you can utilize Amazon EventBridge to schedule the jobs in your data warehouse to run on a regular basis. For more information, refer to Creating an Amazon EventBridge rule that runs on a schedule. Another option is to use Amazon Redshift Query Editor v2 to schedule the refresh. For details, refer to Scheduling a query with query editor v2.

If you have a requirement to load data from a DynamoDB table with existing data, refer to Loading data from DynamoDB into Amazon Redshift.

For more information on Amazon Redshift streaming ingestion capabilities, refer to Real-time analytics with Amazon Redshift streaming ingestion.

Clean up

To avoid unnecessary charges, clean up any resources that you built as part of this architecture that are no longer in use. This includes dropping the materialized view, stored procedure, external schema, and tables created as part of this post. Additionally, make sure you delete the DynamoDB table and delete the Kinesis data stream.

Conclusion

After following the solution in this post, you’re now able to build near-real-time analytics using Amazon Redshift streaming ingestion. We showed how you can ingest data from a DynamoDB source table using a Kinesis data stream in order to refresh your Amazon Redshift data warehouse. With the capabilities presented in this post, you should be able to increase the refresh rate of your Amazon Redshift data warehouse in order to provide the most up-to-date data in your data warehouse for your use case.


About the authors

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Matt Nispel is an Enterprise Solutions Architect at AWS. He has more than 10 years of experience building cloud architectures for large enterprise companies. At AWS, Matt helps customers rearchitect their applications to take full advantage of the cloud. Matt lives in Minneapolis, Minnesota, and in his free time enjoys spending time with friends and family.

Dan Dressel is a Senior Analytics Specialist Solutions Architect at AWS. He is passionate about databases, analytics, machine learning, and architecting solutions. In his spare time, he enjoys spending time with family, nature walking, and playing foosball.

Accelerate orchestration of an ELT process using AWS Step Functions and Amazon Redshift Data API

Post Syndicated from Poulomi Dasgupta original https://aws.amazon.com/blogs/big-data/accelerate-orchestration-of-an-elt-process-using-aws-step-functions-and-amazon-redshift-data-api/

Extract, Load, and Transform (ELT) is a modern design strategy where raw data is first loaded into the data warehouse and then transformed with familiar Structured Query Language (SQL) semantics leveraging the power of massively parallel processing (MPP) architecture of the data warehouse. When you use an ELT pattern, you can also use your existing SQL workload while migrating from your on-premises data warehouse to Amazon Redshift. This eliminates the need to rewrite relational and complex SQL workloads into a new framework. With Amazon Redshift, you can load, transform, and enrich your data efficiently using familiar SQL with advanced and robust SQL support, simplicity, and seamless integration with your existing SQL tools. When you adopt an ELT pattern, a fully automated and highly scalable workflow orchestration mechanism will help to minimize the operational effort that you must invest in managing the pipelines. It also ensures the timely and accurate refresh of your data warehouse.

AWS Step Functions is a low-code, serverless, visual workflow service where you can orchestrate complex business workflows with an event-driven framework and easily develop repeatable and dependent processes. It can ensure that the long-running, multiple ELT jobs run in a specified order and complete successfully instead of manually orchestrating those jobs or maintaining a separate application.

Amazon DynamoDB is a fast, flexible NoSQL database service for single-digit millisecond performance at any scale.

This post explains how to use AWS Step Functions, Amazon DynamoDB, and Amazon Redshift Data API to orchestrate the different steps in your ELT workflow and process data within the Amazon Redshift data warehouse.

Solution overview

In this solution, we will orchestrate an ELT process using AWS Step Functions. As part of the ELT process, we will refresh the dimension and fact tables at regular intervals from staging tables, which ingest data from the source. We will maintain the current state of the ELT process (e.g., Running or Ready) in an audit table that will be maintained at Amazon DynamoDB. AWS Step Functions allows you to directly call the Data API from a state machine, reducing the complexity of running the ELT pipeline. For loading the dimensions and fact tables, we will be using Amazon Redshift Data API from AWS Lambda. We will use Amazon EventBridge for scheduling the state machine to run at a desired interval based on the customer’s SLA.

For a given ELT process, we will set up a JobID in a DynamoDB audit table and set the JobState as “Ready” before the state machine runs for the first time. The state machine performs the following steps:

  1. The first process in the Step Functions workflow is to pass the JobID as input to the process that is configured as JobID 101 in Step Functions and DynamoDB by default via the CloudFormation template.
  2. The next step is to fetch the current JobState for the given JobID by running a query against the DynamoDB audit table using Lambda Data API.
  3. If JobState is “Running,” then it indicates that the previous iteration is not completed yet, and the process should end.
  4. If the JobState is “Ready,” then it indicates that the previous iteration was completed successfully and the process is ready to start. So, the next step will be to update the DynamoDB audit table to change the JobState to “Running” and JobStart to the current time for the given JobID using DynamoDB Data API within a Lambda function.
  5. The next step will be to start the dimension table load from the staging table data within Amazon Redshift using Lambda Data API. In order to achieve that, we can either call a stored procedure using the Amazon Redshift Data API, or we can also run series of SQL statements synchronously using Amazon Redshift Data API within a Lambda function.
  6. In a typical data warehouse, multiple dimension tables are loaded in parallel at the same time before the fact table gets loaded. Using Parallel flow in Step Functions, we will load two dimension tables at the same time using Amazon Redshift Data API within a Lambda function.
  7. Once the load is completed for both the dimension tables, we will load the fact table as the next step using Amazon Redshift Data API within a Lambda function.
  8. As the load completes successfully, the last step would be to update the DynamoDB audit table to change the JobState to “Ready” and JobEnd to the current time for the given JobID, using DynamoDB Data API within a Lambda function.
    Solution Overview

Components and dependencies

The following architecture diagram highlights the end-to-end solution using AWS services:

Architecture Diagram

Before diving deeper into the code, let’s look at the components first:

  • AWS Step Functions – You can orchestrate a workflow by creating a State Machine to manage failures, retries, parallelization, and service integrations.
  • Amazon EventBridge – You can run your state machine on a daily schedule by creating a Rule in Amazon EventBridge.
  • AWS Lambda – You can trigger a Lambda function to run Data API either from Amazon Redshift or DynamoDB.
  • Amazon DynamoDB – Amazon DynamoDB is a fully managed, serverless, key-value NoSQL database designed to run high-performance applications at any scale. DynamoDB is extremely efficient in running updates, which improves the performance of metadata management for customers with strict SLAs.
  • Amazon Redshift – Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, easy, and secure analytics at scale.
  • Amazon Redshift Data API – You can access your Amazon Redshift database using the built-in Amazon Redshift Data API. Using this API, you can access Amazon Redshift data with web services–based applications, including AWS Lambda.
  • DynamoDB API – You can access your Amazon DynamoDB tables from a Lambda function by importing boto3.

Prerequisites

To complete this walkthrough, you must have the following prerequisites:

  1. An AWS account.
  2. An Amazon Redshift cluster.
  3. An Amazon Redshift customizable IAM service role with the following policies:
    • AmazonS3ReadOnlyAccess
    • AmazonRedshiftFullAccess
  4. Above IAM role associated to the Amazon Redshift cluster.

Deploy the CloudFormation template

To set up the ETL orchestration demo, the steps are as follows:

  1. Sign in to the AWS Management Console.
  2. Click on Launch Stack.

    CreateStack-1
  3. Click Next.
  4. Enter a suitable name in Stack name.
  5. Provide the information for the Parameters as detailed in the following table.
CloudFormation template parameter Allowed values Description
RedshiftClusterIdentifier Amazon Redshift cluster identifier Enter the Amazon Redshift cluster identifier
DatabaseUserName Database user name in Amazon Redshift cluster Amazon Redshift database user name which has access to run SQL Script
DatabaseName Amazon Redshift database name Name of the Amazon Redshift primary database where SQL script would be run
RedshiftIAMRoleARN Valid IAM role ARN attached to Amazon Redshift cluster AWS IAM role ARN associated with the Amazon Redshift cluster

Create Stack 2

  1. Click Next and a new page appears. Accept the default values in the page and click Next. On the last page check the box to acknowledge resources might be created and click on Create stack.
    Create Stack 3
  2. Monitor the progress of the stack creation and wait until it is complete.
  3. The stack creation should complete approximately within 5 minutes.
  4. Navigate to Amazon Redshift console.
  5. Launch Amazon Redshift query editor v2 and connect to your cluster.
  6. Browse to the database name provided in the parameters while creating the cloudformation template e.g., dev, public schema and expand Tables. You should see the tables as shown below.
    Redshift Query Editor v2 1
  7. Validate the sample data by running the following SQL query and confirm the row count match above the screenshot.
select 'customer',count(*) from public.customer
union all
select 'fact_yearly_sale',count(*) from public.fact_yearly_sale
union all
select 'lineitem',count(*) from public.lineitem
union all
select 'nation',count(*) from public.nation
union all
select 'orders',count(*) from public.orders
union all
select 'supplier',count(*) from public.supplier

Run the ELT orchestration

  1. After you deploy the CloudFormation template, navigate to the stack detail page. On the Resources tab, choose the link for DynamoDBETLAuditTable to be redirected to the DynamoDB console.
  2. Navigate to Tables and click on table name beginning with <stackname>-DynamoDBETLAuditTable. In this demo, the stack name is DemoETLOrchestration, so the table name will begin with DemoETLOrchestration-DynamoDBETLAuditTable.
  3. It will expand the table. Click on Explore table items.
  4. Here you can see the current status of the job, which will be in Ready status.
    DynamoDB 1
  5. Navigate again to stack detail page on the CloudFormation console. On the Resources tab, choose the link for RedshiftETLStepFunction to be redirected to the Step Functions console.
    CFN Stack Resources
  6. Click Start Execution. When it successfully completes, all steps will be marked as green.
    Step Function Running
  7. While the job is running, navigate back to DemoETLOrchestration-DynamoDBETLAuditTable in the DynamoDB console screen. You will see JobState as Running with JobStart time.
    DynamoDB 2
  1. After Step Functions completes, JobState will be changed to Ready with JobStart and JobEnd time.
    DynamoDB 3

Handling failure

In the real world sometimes, the ELT process can fail due to unexpected data anomalies or object related issues. In that case, the step function execution will also fail with the failed step marked in red as shown in the screenshot below:
Step Function 2

Once you identify and fix the issue, please follow the below steps to restart the step function:

  1. Navigate to the DynamoDB table beginning with DemoETLOrchestration-DynamoDBETLAuditTable. Click on Explore table items and select the row with the specific JobID for the failed job.
  2. Go to Action and select Edit item to modify the JobState to Ready as shown below:
    DynamoDB 4
  3. Follow steps 5 and 6 under the “Run the ELT orchestration” section to restart execution of the step function.

Validate the ELT orchestration

The step function loads the dimension tables public.supplier and public.customer and the fact table public.fact_yearly_sale. To validate the orchestration, the process steps are as follows:

  1. Navigate to the Amazon Redshift console.
  2. Launch Amazon Redshift query editor v2 and connect to your cluster.
  3. Browse to the database name provided in the parameters while creating the cloud formation template e.g., dev, public schema.
  4. Validate the data loaded by Step Functions by running the following SQL query and confirm the row count to match as follows:
select 'customer',count(*) from public.customer
union all
select 'fact_yearly_sale',count(*) from public.fact_yearly_sale
union all
select 'supplier',count(*) from public.supplier

Redshift Query Editor v2 2

Schedule the ELT orchestration

The steps are as follows to schedule the Step Functions:

  1. Navigate to the Amazon EventBridge console and choose Create rule.
    Event Bridge 1
  1. Under Name, enter a meaningful name, for example, Trigger-Redshift-ELTStepFunction.
  2. Under Event bus, choose default.
  3. Under Rule Type, select Schedule.
  4. Click on Next.
    Event Bridge 2
  5. Under Schedule pattern, select A schedule that runs at a regular rate, such as every 10 minutes.
  6. Under Rate expression, enter Value as 5 and choose Unit as Minutes.
  7. Click on Next.
    Event Bridge 3
  8. Under Target types, choose AWS service.
  9. Under Select a Target, choose Step Functions state machine.
  10. Under State machine, choose the step function created by the CloudFormation template.
  11. Under Execution role, select Create a new role for this specific resource.
  12. Click on Next.
    Event Bridge 4
  13. Review the rule parameters and click on Create Rule.

After the rule has been created, it will automatically trigger the step function every 5 minutes to perform ELT processing in Amazon Redshift.

Clean up

Please note that deploying a CloudFormation template incurs cost. To avoid incurring future charges, delete the resources you created as part of the CloudFormation stack by navigating to the AWS CloudFormation console, selecting the stack, and choosing Delete.

Conclusion

In this post, we described how to easily implement a modern, serverless, highly scalable, and cost-effective ELT workflow orchestration process in Amazon Redshift using AWS Step Functions, Amazon DynamoDB and Amazon Redshift Data API. As an alternate solution, you can also use Amazon Redshift for metadata management instead of using Amazon DynamoDB. As part of this demo, we show how a single job entry in DynamoDB gets updated for each run, but you can also modify the solution to maintain a separate audit table with the history of each run for each job, which would help with debugging or historical tracking purposes. Step Functions manage failures, retries, parallelization, service integrations, and observability so your developers can focus on higher-value business logic. Step Functions can integrate with Amazon SNS to send notifications in case of failure or success of the workflow. Please follow this AWS Step Functions documentation to implement the notification mechanism.


About the Authors

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Raks KhareRaks Khare is an Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers architect data analytics solutions at scale on the AWS platform.

Tahir Aziz is an Analytics Solution Architect at AWS. He has worked with building data warehouses and big data solutions for over 13 years. He loves to help customers design end-to-end analytics solutions on AWS. Outside of work, he enjoys traveling
and cooking.

Cross-account streaming ingestion for Amazon Redshift

Post Syndicated from Poulomi Dasgupta original https://aws.amazon.com/blogs/big-data/cross-account-streaming-ingestion-for-amazon-redshift/

As the most widely used and fastest cloud data warehouse, Amazon Redshift makes it simple and cost-effective to analyze all your data using standard SQL, your existing ETL (extract, transform, and load), business intelligence (BI), and reporting tools quickly and securely. Tens of thousands of customers use Amazon Redshift to analyze exabytes of data per day and power analytics workloads such as BI, predictive analytics, and real-time streaming analytics without having to manage the data warehouse infrastructure. You can also gain up to three times better price performance with Amazon Redshift than other cloud data warehouses.

We are continuously innovating and releasing new features of Amazon Redshift for our customers, enabling the implementation of a wide range of data use cases and meeting requirements with performance and scale. One of the features recently announced is Amazon Redshift Streaming Ingestion for Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK), which lets you experience performance at scale by ingesting real-time streaming data. Amazon Redshift with Kinesis Data Streams is fully managed and runs your streaming applications without requiring infrastructure management. You can use SQL to connect to and directly ingest data from multiple Kinesis data streams simultaneously with low latency and high bandwidth, allowing you to derive insights in seconds instead of minutes.

Previously, loading data from a streaming service like Kinesis Data Streams into Amazon Redshift included several steps. These included connecting the stream to an Amazon Kinesis Data Firehose and waiting for Kinesis Data Firehose to stage the data in Amazon Simple Storage Service (Amazon S3), using various-sized batches at varying-length buffer intervals. After this, Kinesis Data Firehose triggered a COPY command to load the data from Amazon S3 to a table in Amazon Redshift.

Rather than including preliminary staging in Amazon S3, streaming ingestion provides low-latency, high-speed ingestion of stream data from Kinesis Data Streams into an Amazon Redshift materialized view.

In this post, we walk through cross-account Amazon Redshift streaming ingestion by creating a Kinesis data stream in one account, and generating and loading streaming data into Amazon Redshift in a second account within the same Region using role chaining.

Solution overview

The following diagram illustrates our solution architecture.

We demonstrate the following steps to perform cross-account streaming ingestion for Amazon Redshift:

  1. Create a Kinesis data stream in Account-1.
  2. Create an AWS Identity and Access Management (IAM) role in Account-1 to read the data stream using AWS best practices around applying least privileges permissions.
  3. Create an Amazon Redshift – Customizable IAM service role in Account-2 to assume the IAM role.
  4. Create an Amazon Redshift cluster in Account-2 and attach the IAM role.
  5. Modify the trust relationship of the Kinesis Data Streams IAM role in order to access the Amazon Redshift IAM role on its behalf.
  6. Create an external schema using IAM role chaining.
  7. Create a materialized view for high-speed ingestion of stream data.
  8. Refresh the materialized view and start querying.

Account-1 setup

Complete the following steps in Account-1:

  1. Create a Kinesis data stream called my-data-stream. For instructions, refer to Step 1 in Set up streaming ETL pipelines.
  2. Send records to this data stream from an open-source API that continuously generates random user data. For instructions, refer to Steps 2 and 3 in Set up streaming ETL pipelines.
  3. To verify if the data is entering the stream, navigate to the Amazon Kinesis -> Data streams -> my-data-stream -> Monitoring tab.
  4. Find the PutRecord success – average (Percent) and PutRecord – sum (Bytes) metrics to validate record ingestion.

    Next, we create an IAM policy called KinesisStreamPolicy in Account-1.
  5. On the IAM console, choose Policies in the navigation pane.
  6. Choose Create policy.
  7. Create a policy called KinesisStreamPolicy and add the following JSON to your policy (provide the AWS account ID for Account-1):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "ReadStream",
                "Effect": "Allow",
                "Action": [
                    "kinesis:DescribeStreamSummary",
                    "kinesis:GetShardIterator",
                    "kinesis:GetRecords",
                    "kinesis:DescribeStream"
                ],
                "Resource": "arn:aws:kinesis:*:<Account-1>:stream/*"
            },
            {
                "Sid": "ListStream",
                "Effect": "Allow",
                "Action": [
                    "kinesis:ListStreams",
                    "kinesis:ListShards"
                ],
                "Resource": "*"
            }
        ]
    }

  8. In the navigation pane, choose Roles.
  9. Choose Create role.
  10. Select AWS service and choose Kinesis.
  11. Create a new role called KinesisStreamRole.
  12. Attach the policy KinesisStreamPolicy.

Account-2 setup

Complete the following steps in Account-2:

  1. Sign in to the Amazon Redshift console in Account-2.
  2. Create an Amazon Redshift cluster.
  3. On the IAM console, choose Policies in the navigation pane.
  4. Choose Create policy.
  5. Create a policy RedshiftStreamPolicy and add the following JSON (provide the AWS account ID for Account-1):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "StmtStreamRole",
                "Effect": "Allow",
                "Action": [
                    "sts:AssumeRole"
                ],
                "Resource": "arn:aws:iam::<Account-1>:role/KinesisStreamRole"
            }
        ]
    }

  6. In the navigation pane, choose Roles.
  7. Choose Create role.
  8. Select AWS service and choose Redshift and Redshift customizable.
  9. Create a role called RedshiftStreamRole.
  10. Attach the policy RedshiftStreamPolicy to the role.

Set up trust relationship

To set up the trust relationship, complete the following steps:

  1. Sign in to the IAM console as Account-1.
  2. In the navigation pane, choose Roles.
  3. Edit the IAM role KinesisStreamRole and modify the trust relationship (provide the AWS account ID for Account-2):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<Account-2>:role/RedshiftStreamRole"
                },
                "Action": "sts:AssumeRole"
            }        
        ]
    }

Set up streaming ingestion

To set up streaming ingestion, complete the following steps:

  1. Sign in to the Amazon Redshift console as Account-2.
  2. Launch the Query Editor v2 or your preferred SQL client and run the following statements to access the data stream my-data-stream in Account-1.
  3. Create an external schema using role chaining (replace the IAM role ARNs, separated by a comma without any spaces around it):
    CREATE EXTERNAL SCHEMA schema_stream
    FROM KINESIS
    IAM_ROLE 'arn:aws:iam::<Account-2>:role/RedshiftStreamRole,
    arn:aws:iam::<Account-1>:role/KinesisStreamRole';

  4. Create a materialized view to consume the stream data and store stream records in semi-structured SUPER format:
    CREATE MATERIALIZED VIEW my_stream_vw AS
        SELECT approximatearrivaltimestamp,
        partitionkey,
        shardid,
        sequencenumber,
        json_parse(from_varbyte(data, 'utf-8')) as payload    
        FROM schema_stream."my-data-stream";

  5. Refresh the view, which triggers Amazon Redshift to read from the stream and load data into the materialized view:
    REFRESH MATERIALIZED VIEW my_stream_vw;

  6. Query data in the materialized view using the dot notation:
    SELECT payload.name.first, payload.name.last, payload.name.title,
    payload.dob.date as dob, payload.cell, payload.location.city, payload.email
    FROM my_stream_vw;

You can now view the results, as shown in the following screenshot.

Conclusion

In this post, we discussed how to set up two different AWS accounts to enable cross-account Amazon Redshift streaming ingestion. It’s simple to get started and you can perform rich analytics on streaming data, right within Amazon Redshift using existing familiar SQL.

For information about how to set up Amazon Redshift streaming ingestion using Kinesis Data Streams in a single account, refer to Real-time analytics with Amazon Redshift streaming ingestion.


About the authors

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Raks KhareRaks Khare is an Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers architect data analytics solutions at scale on the AWS platform.