Tag Archives: AWS Big Data

Optimize your Amazon Redshift query performance with automated materialized views

Post Syndicated from Adam Gatt original https://aws.amazon.com/blogs/big-data/optimize-your-amazon-redshift-query-performance-with-automated-materialized-views/

Amazon Redshift is a fast, fully managed cloud data warehouse database that makes it cost-effective to analyze your data using standard SQL and business intelligence tools. Amazon Redshift allows you to analyze structured and semi-structured data and seamlessly query data lakes and operational databases, using AWS designed hardware and automated machine learning (ML)-based tuning to deliver top-tier price-performance at scale.

Although Amazon Redshift provides excellent price performance out of the box, it offers additional optimizations that can improve this performance and allow you to achieve even faster query response times from your data warehouse.

For example, you can physically tune tables in a data model to minimize the amount of data scanned and distributed within a cluster, which speeds up operations such as table joins and range-bound scans. Amazon Redshift now automates this tuning with the automatic table optimization (ATO) feature.

Another optimization for reducing query runtime is to precompute query results in the form of a materialized view. Materialized views store precomputed query results that future similar queries can use. This improves query performance because many computation steps can be skipped and the precomputed results returned directly. Unlike a simple cache, many materialized views can be incrementally refreshed when DML changes are applied on the underlying (base) tables and can be used by other similar queries, not just the query used to create the materialized view.

Amazon Redshift introduced materialized views in March 2020. In June 2020, support for external tables was added. With these releases, you could use materialized views on both local and external tables to deliver low-latency performance by using precomputed views in your queries. However, this approach required you to be aware of what materialized views were available on the cluster, and if they were up to date.

In November 2020, materialized view automatic refresh and query rewrite features were added. With materialized view-aware automatic rewriting, data analysts get the benefit of materialized views for their queries and dashboards without having to query the materialized view directly. The analyst may not even be aware the materialized views exist. The auto rewrite feature enables this by rewriting queries to use materialized views without the query needing to explicitly reference them. In addition, auto refresh keeps materialized views up to date when base table data is changed, and there are available cluster resources for the materialized view maintenance.

However, materialized views still have to be manually created, monitored, and maintained by data engineers or DBAs. To reduce this overhead, Amazon Redshift has introduced the Automated Materialized View (AutoMV) feature, which goes one step further and automatically creates materialized views for queries with common recurring joins and aggregations.

This post explains what materialized views are, how manual materialized views work and the benefits they provide, and what’s required to build and maintain manual materialized views to achieve performance improvements and optimization. Then we explain how this is greatly simplified with the new automated materialized view feature.

Manually create materialized views

A materialized view is a database object that stores precomputed query results in a materialized (persisted) dataset. Similar queries can use the precomputed results from the materialized view and skip the expensive tasks of reading the underlying tables and performing joins and aggregates, thereby improving the query performance.

For example, you can improve the performance of a dashboard by materializing the results of its queries into a materialized view or multiple materialized views. When the dashboard is opened or refreshed, it can use the precomputed results from the materialized view instead of rereading the base tables and reprocessing the queries. By creating a materialized view once and querying it multiple times, redundant processing can be avoided, improving query performance and freeing up resources for other processing on the database.

To demonstrate this, we use the following query, which returns daily order and sales numbers. It joins two tables and aggregates at the day level.

SET enable_result_cache_for_session TO OFF;

SELECT o.o_orderdate AS order_date
      ,SUM(l.l_extendedprice) AS ext_price_total
FROM orders o
INNER JOIN lineitem l
   ON o.o_orderkey = l.l_orderkey
WHERE o.o_orderdate >= '1997-01-01'
AND   o.o_orderdate < '1998-01-01'
GROUP BY o.o_orderdate
ORDER BY 1;

At the top of the query, we set enable_result_cache_for_session to OFF. This setting disables the results cache, so we can see the full processing runtime each time we run the query. Unlike a materialized view, the results cache is a simple cache that stores the results of a single query in memory, it can’t be used by other similar queries, is not updated when the base tables are modified, and because it isn’t persisted, can be aged-out of memory by more frequently used queries.

When we run this query on a 10-node ra3.4xl cluster with the TPC-H 3 TB dataset, it returns in approximately 20 seconds. If we need to run this query or similar queries more than once, we can create a materialized view with the CREATE MATERIALIZED VIEW command and query the materialized view object directly, which has the same structure as a table:

CREATE MATERIALIZED VIEW mv_daily_sales
AS
SELECT o.o_orderdate AS order_date
      ,SUM(l.l_extendedprice) AS ext_price_total
FROM orders o
INNER JOIN lineitem l
   ON o.o_orderkey = l.l_orderkey
WHERE o.o_orderdate >= '1997-01-01'
AND   o.o_orderdate < '1998-01-01'
GROUP BY o.o_orderdate;

SELECT order_date
      ,ext_price_total
FROM   mv_daily_sales
ORDER BY 1;

Because the join and aggregations have been precomputed, it runs in approximately 900 milliseconds, a performance improvement of 96%.

As we have just shown, you can query the materialized view directly; however, Amazon Redshift can automatically rewrite a query to use one or more materialized views. The query rewrite feature transparently rewrites the query as it’s being run to retrieve precomputed results from a materialized view. This process is automatically triggered on eligible and up-to-date materialized views, if the query contains the same base tables and joins, and has similar aggregations as the materialized view.

For example, if we rerun the sales query, because it’s eligible for rewriting, it’s automatically rewritten to use the mv_daily_sales materialized view. We start with the original query:

SELECT o.o_orderdate AS order_date
      ,SUM(l.l_extendedprice) AS ext_price_total
FROM orders o
INNER JOIN lineitem l
   ON o.o_orderkey = l.l_orderkey
WHERE o.o_orderdate >= '1997-01-01'
AND   o.o_orderdate < '1998-01-01'
GROUP BY o.o_orderdate
ORDER BY 1;

Internally, the query is rewritten to the following SQL and run. This process is completely transparent to the user.

SELECT order_date
      ,ext_price_total
FROM   mv_daily_sales
ORDER BY 1;

The rewriting can be confirmed by looking at the query’s explain plan:

EXPLAIN SELECT o.o_orderdate AS order_date
      ,SUM(l.l_extendedprice) AS ext_price_total
FROM orders o
INNER JOIN lineitem l
   ON o.o_orderkey = l.l_orderkey
WHERE o.o_orderdate >= '1997-01-01'
AND   o.o_orderdate < '1998-01-01'
GROUP BY o.o_orderdate;

+------------------------------------------------------------------------------------------------+
|QUERY PLAN                                                                                      |
+------------------------------------------------------------------------------------------------+
|XN HashAggregate  (cost=5.47..5.97 rows=200 width=31)                                           |
|  ->  XN Seq Scan on mv_tbl__mv_daily_sales__0 derived_table1  (cost=0.00..3.65 rows=365 width=31)|
+------------------------------------------------------------------------------------------------+

The plan shows the query has been rewritten and has retrieved the results from the mv_daily_sales materialized view, not the query’s base tables: orders and lineitem.

Other queries that use the same base tables and level of aggregation, or a level of aggregation derived from the materialized view’s level, are also rewritten. For example:

EXPLAIN SELECT date_trunc('month', o.o_orderdate) AS order_month
      ,SUM(l.l_extendedprice) AS ext_price_total
FROM orders o
INNER JOIN lineitem l
   ON o.o_orderkey = l.l_orderkey
WHERE o.o_orderdate >= '1997-01-01'
AND   o.o_orderdate < '1998-01-01'
GROUP BY order_month;

+------------------------------------------------------------------------------------------------+
|QUERY PLAN                                                                                      |
+------------------------------------------------------------------------------------------------+
|XN HashAggregate  (cost=7.30..10.04 rows=365 width=19)                                          |
|  ->  XN Seq Scan on mv_tbl__mv_daily_sales__0 derived_table1  (cost=0.00..5.47 rows=365 width=19)|
+------------------------------------------------------------------------------------------------+

If data in the orders or lineitem table changes, mv_daily_sales becomes stale; this means the materialized view isn’t reflecting the state of its base tables. If we update a row in lineitem and check the stv_mv_info system table, we can see the is_stale flag is set to t (true):

UPDATE lineitem
SET l_extendedprice = 5000
WHERE l_orderkey = 2362252519
AND l_linenumber = 1;

SELECT name
      ,is_stale
FROM stv_mv_info
WHERE name = 'mv_daily_sales';

+--------------+--------+
|name          |is_stale|
+--------------+--------+
|mv_daily_sales|t       |
+--------------+--------+

We can now manually refresh the materialized view using the REFRESH MATERIALIZED VIEW statement:

REFRESH MATERIALIZED VIEW mv_daily_sales;

SELECT name
      ,is_stale
FROM stv_mv_info
WHERE name = 'mv_daily_sales';

+--------------+--------+
|name          |is_stale|
+--------------+--------+
|mv_daily_sales|f       |
+--------------+--------+

There are two types of materialized view refresh: full and incremental. A full refresh reruns the underlying SQL statement and rebuilds the whole materialized view. An incremental refresh only updates specific rows affected by the source data change. To see if a materialized view is eligible for incremental refreshes, view the state column in the stv_mv_info system table. A state of 0 indicates the materialized view will be fully refreshed, and a state of 1 indicates the materialized view will be incrementally refreshed.

SELECT name
      ,state
FROM stv_mv_info
WHERE name = 'mv_daily_sales';

+--------------+--------+
|name          |state   |
+--------------+--------+
|mv_daily_sales|       1|
+--------------+--------+

You can schedule manual refreshes on the Amazon Redshift console if you need to refresh a materialized view at fixed periods, such as once per hour. For more information, refer to Scheduling a query on the Amazon Redshift console.

As well as the ability to do a manual refresh, Amazon Redshift can also automatically refresh materialized views. The auto refresh feature intelligently determines when to refresh the materialized view, and if you have multiple materialized views, which order to refresh them in. Amazon Redshift considers the benefit of refreshing a materialized view (how often the materialized view is used, what performance gain the materialized view provides) and the cost (resources required for the refresh, current system load, available system resources).

This intelligent refreshing has a number of benefits. Because not all materialized views are equally important, deciding when and in which order to refresh materialized views on a large system is a complex task for a DBA to solve. Also, the DBA needs to consider other workloads running on the system, and try to ensure the latency of critical workloads is not increased by the effect of refreshing materialized views. The auto refresh feature helps remove the need for a DBA to do these difficult and time-consuming tasks.

You can set a materialized view to be automatically refreshed in the CREATE MATERIALIZED VIEW statement with the AUTO REFRESH YES parameter:

CREATE MATERIALIZED VIEW mv_daily_sales
AUTO REFRESH YES
AS
SELECT ...

Now when the source data of the materialized view changes, the materialized view is automatically refreshed. We can view the status of the refresh in the svl_mv_refresh_status system table. For example:

UPDATE lineitem
SET l_extendedprice = 6000
WHERE l_orderkey = 2362252519
AND l_linenumber = 1;

SELECT mv_name
      ,starttime
      ,endtime
      ,status
      ,refresh_type
FROM svl_mv_refresh_status
WHERE mv_name = 'mv_daily_sales';

+--------------+--------------------------+--------------------------+---------------------------------------------+------------+
|mv_name       |starttime                 |endtime                   |status                                       |refresh_type|
+--------------+--------------------------+--------------------------+---------------------------------------------+------------+
|mv_daily_sales|2022-05-06 14:07:24.857074|2022-05-06 14:07:33.342346|Refresh successfully updated MV incrementally|Auto        |
+--------------+--------------------------+--------------------------+---------------------------------------------+------------+

To remove a materialized view, we use the DROP MATERIALIZED VIEW command:

DROP MATERIALIZED VIEW mv_daily_sales;

Now that you’ve seen what materialized views are, their benefits, and how they are created, used, and removed, let’s discuss the drawbacks. Designing and implementing a set of materialized views to help improve overall query performance on a database requires a skilled resource to perform several involved and time-consuming tasks:

  • Analyzing queries run on the system
  • Identifying which queries are run regularly and provide business benefit
  • Prioritizing the identified queries
  • Determining if the performance improvement is worth creating a materialized view and storing the dataset
  • Physically creating and refreshing the materialized views
  • Monitoring the usage of the materialized views
  • Dropping materialized views that are rarely or never used or can’t be refreshed due to the structure of base tables changing

Significant skill, effort, and time is required to design and create materialized views that provide an overall benefit. Also, ongoing monitoring is needed to identify poorly designed or underutilized materialized views that are occupying resources without providing gains.

Amazon Redshift now has a feature to automate this process, Automated Materialized Views (AutoMVs). We explain how AutoMVs work and how to use them on your cluster in the following sections.

Automatically create materialized views

When the AutoMV feature is enabled on an Amazon Redshift cluster (it’s enabled by default), Amazon Redshift monitors recently run queries and identifies any that could have their performance improved by a materialized view. Expensive parts of the query, such as aggregates and joins that can be persisted into materialized views and reused by future queries, are then extracted from the main query and any subqueries. The extracted query parts are then rewritten into create materialized view statements (candidate materialized views) and stored for further processing.

The candidate materialized views are not just one-to-one copies of queries; extra processing is applied to create generalized materialized views that can be used by queries similar to the original query. In the following example, the result set is limited by the filters o_orderpriority = '1-URGENT' and l_shipmode ='AIR'. Therefore, a materialized view built from this result set could only serve queries selecting that limited range of data.

SELECT o.o_orderdate
      ,SUM(l.l_extendedprice)
FROM orders o
INNER JOIN lineitem l
   ON o.o_orderkey = l.l_orderkey
WHERE o.o_orderpriority = '1-URGENT'
AND   l.l_shipmode ='AIR'
GROUP BY o.o_orderdate;

Amazon Redshift uses many techniques to create generalized materialized views; one of these techniques is called predicate elevation. To apply predicate elevation to this query, the filtered columns o_orderpriority and l_shipmode are moved into the GROUP BY clause, thereby storing the full range of data in the materialized view, which allows similar queries to use the same materialized view. This approach is driven by dashboard-like workloads that often issue identical queries with different filter predicates.

SELECT o.o_orderdate
      ,o.o_orderpriority
      ,l.l_shipmode
      ,SUM(l.l_extendedprice)
FROM orders o
INNER JOIN lineitem l
   ON o.o_orderkey = l.l_orderkey
GROUP BY o.o_orderdate
        ,o.o_orderpriority
        ,l.l_shipmode;

In the next processing step, ML algorithms are applied to calculate which of the candidate materialized views provides the best performance benefit and system-wide performance optimization. The algorithms follow similar logic to the auto refresh feature mentioned previously. For each candidate materialized view, Amazon Redshift calculates a benefit, which corresponds to the expected performance improvement should the materialized view be materialized and used in the workload. In addition, it calculates a cost corresponding to the system resources required to create and maintain the candidate. Existing manual materialized views are also considered; an AutoMV will not be created if a manual materialized view already exists that covers the same scope, and manual materialized views have auto refresh priority over AutoMVs.

The list of materialized views is then sorted in order of overall cost-benefit, taking into consideration workload management (WLM) query priorities, with materialized views related to queries on a higher priority queue ordered before materialized views related to queries on a lower priority queue. After the list of materialized views has been fully sorted, they’re automatically created and populated in the background in the prioritized order.

The created AutoMVs are then monitored by a background process that checks their activity, such as how often they have been queried and refreshed. If the process determines that an AutoMV is not being used or refreshed, for example due to the base table’s structure changing, it is dropped.

Example

To demonstrate this process in action, we use the following query taken from the 3 TB Cloud DW Benchmark, a performance testing benchmark derived from TPC-H. You can load the benchmark data into your cluster and follow along with the example.

SET enable_result_cache_for_session TO OFF;

SELECT /* TPC-H Q12 */
       l_shipmode
     , SUM(CASE
              WHEN o_orderpriority = '1-URGENT'
                 OR o_orderpriority = '2-HIGH'
                 THEN 1
              ELSE 0
   END) AS high_line_count
     , SUM(CASE
              WHEN o_orderpriority  '1-URGENT'
                 AND o_orderpriority  '2-HIGH'
                 THEN 1
              ELSE 0
   END) AS low_line_count
FROM orders
   , lineitem
WHERE o_orderkey = l_orderkey
AND l_shipmode IN ('MAIL', 'SHIP')
AND l_commitdate < l_receiptdate
AND l_shipdate = DATE '1994-01-01'
AND l_receiptdate < DATEADD(YEAR, 1, CAST('1994-01-01' AS DATE))
GROUP BY l_shipmode
ORDER BY l_shipmode;

We run the query three times and then wait for 30 minutes. On a 10-node ra3.4xl cluster, the query runs in approximately 8 seconds.

During the 30 minutes, Amazon Redshift assesses the benefit of materializing candidate AutoMVs. It computes a sorted list of candidate materialized views and creates the most beneficial ones with incremental refresh, auto refresh, and query rewrite enabled. When the query or similar queries run, they’re automatically and transparently rewritten to use one or more of the created AutoMVs.

Ongoing, if data in the base tables is modified (i.e. the AutoMV becomes stale), an incremental refresh automatically runs, inserting, updating, and deleting rows in the AutoMV to bring its data to the latest state.

Rerunning the query shows that it runs in approximately 800 milliseconds, a performance improvement of 90%. We can confirm the query is using the AutoMV by checking the explain plan:

EXPLAIN SELECT /* TPC-H Q12 */
       l_shipmode
     ,
 SUM(CASE
              WHEN o_orderpriority = '1-URGENT'
                 OR o_orderpriority = '2-HIGH'
                 THEN 1
              ELSE 0
   END) AS high_line_count
     , SUM(CASE
              WHEN o_orderpriority <> '1-URGENT'
                 AND o_orderpriority <> '2-HIGH'
                 THEN 1
              ELSE 0
   END) AS low_line_count
FROM orders
   , lineitem
WHERE o_orderkey = l_orderkey
AND l_shipmode IN ('MAIL', 'SHIP')
AND l_commitdate < l_receiptdate
AND l_shipdate < l_commitdate
AND l_receiptdate >= DATE '1994-01-01'
AND l_receiptdate < DATEADD(YEAR, 1, CAST('1994-01-01' AS DATE))
GROUP BY l_shipmode
ORDER BY l_shipmode;

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|QUERY PLAN                                                                                                                                                           |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|XN Merge  (cost=1000000000354.23..1000000000354.23 rows=1 width=30)                                                                                                  |
|  Merge Key: derived_table1.grvar_1                                                                                                                                  |
|  ->  XN Network  (cost=1000000000354.23..1000000000354.23 rows=1 width=30)                                                                                          |
|        Send to leader                                                                                                                                               |
|        ->  XN Sort  (cost=1000000000354.23..1000000000354.23 rows=1 width=30)                                                                                       |
|              Sort Key: derived_table1.grvar_1                                                                                                                       |
|              ->  XN HashAggregate  (cost=354.21..354.22 rows=1 width=30)                                                                                            |
|                    ->  XN Seq Scan on mv_tbl__auto_mv_2000__0 derived_table1  (cost=0.00..349.12 rows=679 width=30)                                                 |
|                          Filter: ((grvar_2 < '1995-01-01'::date) AND (grvar_2 >= '1994-01-01'::date) AND ((grvar_1 = 'SHIP'::bpchar) OR (grvar_1 = 'MAIL'::bpchar)))|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+

To demonstrate how AutoMVs can also improve the performance of similar queries, we change some of the filters on the original query. In the following example, we change the filter on l_shipmode from IN ('MAIL', 'SHIP') to IN ('TRUCK', 'RAIL', 'AIR'), and change the filter on l_receiptdate to the first 6 months of the previous year. The query runs in approximately 900 milliseconds and, looking at the explain plan, we confirm it’s using the AutoMV:

EXPLAIN SELECT /* TPC-H Q12 modified */
       l_shipmode
     , SUM(CASE
              WHEN o_orderpriority = '1-URGENT'
                 OR o_orderpriority = '2-HIGH'
                 THEN 1
              ELSE 0
   END) AS high_line_count
     , SUM(CASE
              WHEN o_orderpriority <> '1-URGENT'
                 AND o_orderpriority <> '2-HIGH'
                 THEN 1
              ELSE 0
   END) AS low_line_count
FROM orders
   , lineitem
WHERE o_orderkey = l_orderkey
AND l_shipmode IN ('TRUCK', 'RAIL', 'AIR')
AND l_commitdate < l_receiptdate
AND l_shipdate < l_commitdate
AND l_receiptdate >= DATE '1993-01-01'
AND l_receiptdate < DATE '1993-07-01'
GROUP BY l_shipmode
ORDER BY l_shipmode;

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|QUERY PLAN                                                                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|XN Merge  (cost=1000000000396.30..1000000000396.31 rows=1 width=30)                                                                                                                                |
|  Merge Key: derived_table1.grvar_1                                                                                                                                                                |
|  ->  XN Network  (cost=1000000000396.30..1000000000396.31 rows=1 width=30)                                                                                                                        |
|        Send to leader                                                                                                                                                                             |
|        ->  XN Sort  (cost=1000000000396.30..1000000000396.31 rows=1 width=30)                                                                                                                     |
|              Sort Key: derived_table1.grvar_1                                                                                                                                                     |
|              ->  XN HashAggregate  (cost=396.29..396.29 rows=1 width=30)                                                                                                                          |
|                    ->  XN Seq Scan on mv_tbl__auto_mv_2000__0 derived_table1  (cost=0.00..392.76 rows=470 width=30)                                                                               |
|                          Filter: ((grvar_2 < '1993-07-01'::date) AND (grvar_2 >= '1993-01-01'::date) AND ((grvar_1 = 'AIR'::bpchar) OR (grvar_1 = 'RAIL'::bpchar) OR (grvar_1 = 'TRUCK'::bpchar)))|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

The AutoMV feature is transparent to users and is fully system managed. Therefore, unlike manual materialized views, AutoMVs are not visible to users and can’t be queried directly. They also don’t appear in any system tables like stv_mv_info or svl_mv_refresh_status.

Finally, if the AutoMV hasn’t been used for some time by the workload, it’s automatically dropped and the storage released. When we rerun the query after this, the runtime returns to the original 8 seconds because the query is now using the base tables. This can be confirmed by examining the explain plan.

This example illustrates that the AutoMV feature reduces the effort and time required to create and maintain materialized views.

Performance tests and results

To see how well AutoMVs work in practice, we ran tests using the 1 TB and 3 TB versions of the Cloud DW benchmark derived from TPC-H. This test consists of a power run script with 22 queries that is run three times with the results cache off. The tests were run with two different clusters: 4-node ra3.4xlarge and 2-node ra3.16xlarge with a concurrency of 1 and 5.

The Cloud DW benchmark is derived from the TPC-H benchmark. It isn’t comparable to published TPC-H results, because the results of our tests don’t fully comply with the specification.

The following table shows our results.

Suite Scale Cluster Concurrency Number Queries Elapsed Secs – AutoMV Off Elapsed Secs – AutoMV On % Improvement
TPC-H 1 TB 4 node ra3.4xlarge 1 66 1046 913 13%
TPC-H 1 TB 4 node ra3.4xlarge 5 330 3592 3191 11%
TPC-H 3 TB 2 node
ra3.16xlarge
1 66 1707 1510 12%
TPC-H 3 TB 2 node
ra3.16xlarge
5 330 6971 5650 19%

The AutoMV feature improved query performance by up to 19% without any manual intervention.

Summary

In this post, we first presented manual materialized views, their various features, and how to take advantage of them. We then looked into the effort and time required to design, create, and maintain materialized views to provide performance improvements in a data warehouse.

Next, we discussed how AutoMVs help overcome these challenges and seamlessly provide performance improvements for SQL queries and dashboards. We went deeper into the details of how AutoMVs work and discussed how ML algorithms determine which materialized views to create based on the predicted performance improvement and overall benefit they will provide compared to the cost required to create and maintain them. Then we covered some of the internal processing logic such as how predicate elevation creates generalized materialized views that can be used by a range of queries, not just the original query that triggered the materialized view creation.

Finally, we showed the results of a performance test on an industry benchmark where the AutoMV feature improved performance by up to 19%.

As we have demonstrated, automated materialized views provide performance improvements to a data warehouse without requiring any manual effort or specialized expertise. They transparently work in the background, optimizing your workload performance and automatically adapting when your workloads change.

Automated materialized views are enabled by default. We encourage you to monitor any performance improvements they have on your current clusters. If you’re new to Amazon Redshift, try the Getting Started tutorial and use the free trial to create and provision your first cluster and experiment with the feature.


About the Authors

Adam Gatt is a Senior Specialist Solution Architect for Analytics at AWS. He has over 20 years of experience in data and data warehousing and helps customers build robust, scalable and high-performance analytics solutions in the cloud.

Rahul Chaturvedi is an Analytics Specialist Solutions Architect at AWS. Prior to this role, he was a Data Engineer at Amazon Advertising and Prime Video, where he helped build petabyte-scale data lakes for self-serve analytics.

Use the AWS Glue connector to read and write Apache Iceberg tables with ACID transactions and perform time travel

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/use-the-aws-glue-connector-to-read-and-write-apache-iceberg-tables-with-acid-transactions-and-perform-time-travel/

Nowadays, many customers have built their data lakes as the core of their data analytic systems. In a typical use case of data lakes, many concurrent queries run to retrieve consistent snapshots of business insights by aggregating query results. A large volume of data constantly comes from different data sources into the data lakes. There is also a common demand to reflect the changes occurring in the data sources into the data lakes. This means that not only inserts but also updates and deletes need to be replicated into the data lakes.

Apache Iceberg provides the capability of ACID transactions on your data lakes, which allows concurrent queries to add or delete records isolated from any existing queries with read-consistency for queries. Iceberg is an open table format designed for large analytic workloads on huge datasets. You can perform ACID transactions against your data lakes by using simple SQL expressions. It also enables time travel, rollback, hidden partitioning, and schema evolution changes, such as adding, dropping, renaming, updating, and reordering columns.

AWS Glue is one of the key elements to building data lakes. It extracts data from multiple sources and ingests your data to your data lake built on Amazon Simple Storage Service (Amazon S3) using both batch and streaming jobs. To expand the accessibility of your AWS Glue extract, transform, and load (ETL) jobs to Iceberg, AWS Glue provides an Apache Iceberg connector. The connector allows you to build Iceberg tables on your data lakes and run Iceberg operations such as ACID transactions, time travel, rollbacks, and so on from your AWS Glue ETL jobs.

In this post, we give an overview of how to set up the Iceberg connector for AWS Glue and configure the relevant resources to use Iceberg with AWS Glue jobs. We also demonstrate how to run typical Iceberg operations on AWS Glue interactive sessions with an example use case.

Apache Iceberg connector for AWS Glue

With the Apache Iceberg connector for AWS Glue, you can take advantage of the following Iceberg capabilities:

  • Basic operations on Iceberg tables – This includes creating Iceberg tables in the AWS Glue Data Catalog and inserting, updating, and deleting records with ACID transactions in the Iceberg tables
  • Inserting and updating records – You can run UPSERT (update and insert) queries for your Iceberg table
  • Time travel on Iceberg tables – You can read a specific version of an Iceberg table from table snapshots that Iceberg manages
  • Rollback of table versions – You can revert an Iceberg table back to a specific version of the table

Iceberg offers additional useful capabilities such as hidden partitioning; schema evolution with add, drop, update, and rename support; automatic data compaction; and more. For more details about Iceberg, refer to the Apache Iceberg documentation.

Next, we demonstrate how the Apache Iceberg connector for AWS Glue works for each Iceberg capability based on an example use case.

Overview of example customer scenario

Let’s assume that an ecommerce company sells products on their online platform. Customers can buy products and write reviews to each product. Customers can add, update, or delete their reviews at any time. The customer reviews are an important source for analyzing customer sentiment and business trends.

In this scenario, we have the following teams in our organization:

  • Data engineering team – Responsible for building and managing data platforms.
  • Data analyst team – Responsible for analyzing customer reviews and creating business reports. This team queries the reviews daily, creates a business intelligence (BI) report, and shares it with sales team.
  • Customer support team – Responsible for replying to customer inquiries. This team queries the reviews when they get inquiries about the reviews.

Our solution has the following requirements:

  • Query scalability is important because the website is huge.
  • Individual customer reviews can be added, updated, and deleted.
  • The data analyst team needs to use both notebooks and ad hoc queries for their analysis.
  • The customer support team sometimes needs to view the history of the customer reviews.
  • Customer reviews can always be added, updated, and deleted, even while one of the teams is querying the reviews for analysis. This means that any result in a query isn’t affected by uncommitted customer review write operations.
  • Any changes in customer reviews that are made by the organization’s various teams need to be reflected in BI reports and query results.

In this post, we build a data lake of customer review data on top of Amazon S3. To meet these requirements, we introduce Apache Iceberg to enable adding, updating, and deleting records; ACID transactions; and time travel queries. We also use an AWS Glue Studio notebook to integrate and query the data at scale. First, we set up the connector so we can create an AWS Glue connection for Iceberg.

Set up the Apache Iceberg connector and create the Iceberg connection

We first set up Apache Iceberg connector for AWS Glue to use Apache Iceberg with AWS Glue jobs. Particularly, in this section, we set up the Apache Iceberg connector for AWS Glue and create an AWS Glue job with the connector. Complete the following steps:

  1. Navigate to the Apache Iceberg connector for AWS Glue page in AWS Marketplace.
  2. Choose Continue to Subscribe.

  1. Review the information under Terms and Conditions, and choose Accept Terms to continue.

  1. When the subscription is complete, choose Continue to Configuration.

  1. For Fulfillment option, choose Glue 3.0. (1.0 and 2.0 are also available options.)
  2. For Software version, choose the latest software version.

As of this writing, 0.12.0-2 is the latest version of the Apache Iceberg connector for AWS Glue.

  1. Choose Continue to Launch.

  1. Choose Usage instructions.
  2. Choose Activate the Glue connector from AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, iceberg-connection).

  1. Choose Create connection and activate connector.

A message appears that the connection was successfully added, and the connection is now visible on the AWS Glue Studio console.

Configure resources and permissions

We use a provided AWS CloudFormation template to set up Iceberg configuration for AWS Glue. AWS CloudFormation creates the following resources:

  • An S3 bucket to store an Iceberg configuration file and actual data
  • An AWS Lambda function to generate an Iceberg configuration file based on parameters provided by a user for the CloudFormation template, and to clean up the resources created through this post
  • AWS Identity and Access Management (IAM) roles and policies with necessary permissions
  • An AWS Glue database in the Data Catalog to register Iceberg tables

To deploy the CloudFormation template, complete the following steps:

  1. Choose Launch Stack:

Launch Button

  1. For DynamoDBTableName, enter a name for an Amazon DynamoDB table that is created automatically when AWS Glue creates an Iceberg table.

This table is used for an AWS Glue job to obtain a commit lock to avoid concurrently modifying records in Iceberg tables. For more details about commit locking, refer to DynamoDB for Commit Locking. Note that you shouldn’t specify the name of an existing table.

  1. For IcebergDatabaseName, enter a name for the AWS Glue database that is created in the Data Catalog and used for registering Iceberg tables.
  2. Choose Next.

  1. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  2. Choose Create stack.

Start an AWS Glue Studio notebook to use Apache Iceberg

After you launch the CloudFormation stack, you create an AWS Glue Studio notebook to perform Iceberg operations. Complete the following steps:

  1. Download the Jupyter notebook file.
  2. On the AWS Glue console, choose Jobs in the navigation pane.
  3. Under Create job, select Jupyter Notebook.

  1. Select Upload and edit an existing notebook and upload iceberg-with-glue.ipynb.

  1. Choose Create.
  2. For Job name, enter a name.
  3. For IAM role, choose IcebergConnectorGlueJobRole, which was created via the CloudFormation template.
  4. Choose Start notebook job.

The process takes a few minutes to complete, after which you can see an AWS Glue Studio notebook view.

  1. Choose Save to save the notebook.

Set up the Iceberg configuration

To set up the Iceberg configuration, complete the following steps:

  1. Run the following cells with multiple options (magics). Note that you set your connection name for the %connections magic in the cell.

For more information, refer to Configuring AWS Glue Interactive Sessions for Jupyter and AWS Glue Studio notebooks.

A message Session <session-id> has been created appears when your AWS Glue Studio notebook is ready.

In the last cell in this section, you load your Iceberg configuration, which you specified when launching the CloudFormation stack. The Iceberg configuration includes a warehouse path for Iceberg actual data, a DynamoDB table name for commit locking, a database name for your Iceberg tables, and more.

To load the configuration, set the S3 bucket name that was created via the CloudFormation stack.

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Choose the stack you created.
  3. On the Outputs tab, copy the S3 bucket name.

  1. Set the S3 name as the S3_BUCKET parameter in your notebook.

  1. Run the cell and load the Iceberg configuration that you set.

Initialize the job with Iceberg configurations

We continue to run cells to initiate a SparkSession in this section.

  1. Set an Iceberg warehouse path and a DynamoDB table name for Iceberg commit locking from the user_config parameter.
  2. Initialize a SparkSession by setting the Iceberg configurations.
  3. With the SparkSession object, create SparkContext and GlueContext objects.

The following screenshot shows the relevant section in the notebook.

We provide the details of each parameter that you configure for the SparkSession in the appendix of this post.

For this post, we demonstrate setting the Spark configuration for Iceberg. You can also set the configuration as AWS Glue job parameters. For more information, refer to the Usage Information section in the Iceberg connector product page.

Use case walkthrough

To walk through our use case, we use two tables; acr_iceberg and acr_iceberg_report. The table acr_iceberg contains the customer review data. The table acr_iceberg_report contains BI analysis results based on the customer review data. All changes to acr_iceberg also impact acr_iceberg_report. The table acr_iceberg_report needs to be updated daily, right before sharing business reports with stakeholders.

To demonstrate this use case, we walk through the following typical steps:

  1. A data engineering team registers the acr_iceberg and acr_iceberg_report tables in the Glue Data Catalog.
  2. Customers (ecommerce users) add reviews to products in the Industrial_Supplies category. These reviews are added to the Iceberg table.
  3. A customer requests to update their reviews. We simulate updating the customer review in the acr_iceberg table.
  4. We reflect the customer’s request of the updated review in acr_iceberg into acr_iceberg_report.
  5. We revert the customer’s request of the updated review for the customer review table acr_iceberg, and reflect the reversion in acr_iceberg_report.

1. Create Iceberg tables of customer reviews and BI reports

In this step, the data engineering team creates the acr_iceberg Iceberg table for customer reviews data (based on the Amazon Customer Reviews Dataset), and the team creates the acr_iceberg_report Iceberg table for BI reports.

Create the acr_iceberg table for customer reviews

The following code initially extracts the Amazon customer reviews, which are stored in a public S3 bucket. Then it creates an Iceberg table of the customer reviews and loads these reviews into your specified S3 bucket (created via CloudFormation stack). Note that the script loads partial datasets to avoid taking a lot of time to load the data.

# Loading the dataset and creating an Iceberg table. This will take about 3-5 minutes.
spark.read \
    .option('basePath', INPUT_BASE_PATH) \
    .parquet(*INPUT_CATEGORIES) \
    .writeTo(f'{CATALOG}.{DATABASE}.{TABLE}') \
    .tableProperty('format-version', '2') \
    .create()

Regarding the tableProperty parameter, we specify format version 2 to make the table version compatible with Amazon Athena. For more information about Athena support for Iceberg tables, refer to Considerations and limitations. To learn more about the difference between Iceberg table versions 1 and 2, refer to Appendix E: Format version changes.

Let’s run the following cells. Running the second cell takes around 3–5 minutes.

After you run the cells, the acr_iceberg table is available in your specified database in the Glue Data Catalog.

You can also see the actual data and metadata of the Iceberg table in the S3 bucket that is created through the CloudFormation stack. Iceberg creates the table and writes actual data and relevant metadata that includes table schema, table version information, and so on. See the following objects in your S3 bucket:

$ aws s3 ls 's3://your-bucket/data/' --recursive
YYYY-MM-dd hh:mm:ss   83616660 data/iceberg_blog_default.db/acr_iceberg/data/00000-44-c2983230-c43a-4f4a-9b89-1f7c13e59645-00001.parquet
YYYY-MM-dd hh:mm:ss   83247771 
...
YYYY-MM-dd hh:mm:ss       5134 data/iceberg_blog_default.db/acr_iceberg/metadata/00000-bc5d3ea2-280f-4e28-a71f-4c2b749ed637.metadata.json
YYYY-MM-dd hh:mm:ss     116950 data/iceberg_blog_default.db/acr_iceberg/metadata/411308cd-1f4d-4535-9444-f6b56a56697f-m0.avro
YYYY-MM-dd hh:mm:ss       3821 data/iceberg_blog_default.db/acr_iceberg/metadata/snap-6122957686233868728-1-411308cd-1f4d-4535-9444-f6b56a56697f.avro

The job tries to create a DynamoDB table, which you specified in the CloudFormation stack (in the following screenshot, its name is myGlueLockTable), if it doesn’t exist already. As we discussed earlier, the DynamoDB table is used for commit locking for Iceberg tables.

Create the acr_iceberg_report Iceberg table for BI reports

The data engineer team also creates the acr_iceberg_report table for BI reports in the Glue Data Catalog. This table initially has the following records.

comment_count avg_star product_category
1240 4.20729367860598 Camera
95 4.80167540490342 Industrial_Supplies
663 3.80123467540571 PC

To create the table, run the following cell.

The two Iceberg tables have been created. Let’s check the acr_iceberg table records by running a query.

Determine the average star rating for each product category by querying the Iceberg table

You can see the Iceberg table records by using a SELECT statement. In this section, we query the acr_iceberg table to simulate seeing a current BI report data by running an ad hoc query.

Run the following cell in the notebook to get the aggregated number of customer comments and mean star rating for each product_category.

The cell output has the following results.

Another way to query Iceberg tables is using Amazon Athena (when you use the Athena with Iceberg tables, you need to set up the Iceberg environment) or Amazon EMR.

2. Add customer reviews in the Iceberg table

In this section, customers add comments for some products in the Industrial Supplies product category, and we add these comments to the acr_iceberg table. To demonstrate this scenario, we create a Spark DataFrame based on the following new customer reviews and then add them to the table with an INSERT statement.

marketplace customer_id review_id product_id product_
parent
product_
title
star_
rating
helpful_
votes
total_
votes
vine verified_
purchase
review_
headline
review_
body
review_
date
year product_
category
US 12345689 ISB35E4556F144 I00EDBY7X8 989172340 plastic containers 5 0 0 N Y Five Stars Great product! 2022-02-01 2022 Industrial_
Supplies
US 78901234 IS4392CD4C3C4 I00D7JFOPC 952000001 battery tester 3 0 0 N Y nice one, but
it broke
some days later
nope 2022-02-01 2022 Industrial_
Supplies
US 12345123 IS97B103F8B24C I002LHA74O 818426953 spray bottle 2 1 1 N N Two Stars the bottle isn’t
as big as pictured.
2022-02-01 2022 Industrial_
Supplies
US 23000093 ISAB4268D46F3X I00ARPLCGY 562945918 3d printer 5 3 3 N Y Super great very useful 2022-02-01 2022 Industrial_
Supplies
US 89874312 ISAB4268137V2Y I80ARDQCY 564669018 circuit board 4 0 0 Y Y Great, but
a little bit expensive
you should buy this,
but note the price
2022-02-01 2022 Industrial_
Supplies

Run the following cells in the notebook to insert the customer comments to the Iceberg table. The process takes about 1 minute.

Run the next cell to see an addition to the product category Industrial_Supplies with 5 under comment_count.

3. Update a customer review in the Iceberg table

In the previous section, we added new customer reviews to the acr_iceberg Iceberg table. In this section, a customer requests an update of their review. Specifically, customer 78901234 requests the following update of the review ID IS4392CD4C3C4.

  • change star_rating from 3 to 5
  • update the review_headline from nice one, but it broke some days later to very good

We update the customer comment by using an UPDATE query by running the following cell.

We can review the updated record by running the next cell as follows.

Also, when you run this cell for the reporting table, you can see the updated avg_star column value for the Industrial_Supplies product category. Specifically, the avg_star value has been updated from 3.8 to 4.2 as a result of the star_rating changing from 3 to 5:

4. Reflect changes in the customer reviews table in the BI report table with a MERGE INTO query

In this section, we reflect the changes in the acr_iceberg table into the BI report table acr_iceberg_report. To do so, we run the MERGE INTO query and combine the two tables based on the condition of the product_category column in each table. This query works as follows:

  • When the product_category column in each table is the same, the query returns the sum of each column record
  • When the column in each table is not the same, the query just inserts a new record

This MERGE INTO operation is also referred to as an UPSERT (update and insert).

Run the following cell to reflect the update of customer reviews in the acr_iceberg table into the acr_iceberg_report BI table.

After the MERGE INTO query is complete, you can see the updated acr_iceberg_report table by running the following cell.

The MERGE INTO query performed the following changes:

  • In the Camera, Industrial_Supplies, and PC product categories, each comment_count is the sum between the initial value of the acr_iceberg_report table and the aggregated table value. For example, in the Industrial_Supplies product category row, the comment_count 100 is calculated by 95 (in the initial version of acr_iceberg_report) + 5 (in the aggregated report table).
  • In addition to comment_count, the avg_star in the Camera, Industrial_Supplies, or PC product category row is also computed by averaging between each avg_star value in acr_iceberg_report and in the aggregated table.
  • In other product categories, each comment_count and avg_star is the same as each value in the aggregated table, which means that each value in the aggregated table is inserted into the acr_iceberg_report table.

5. Roll back the Iceberg tables and reflect changes in the BI report table

In this section, the customer who requested the update of the review now requests to revert the updated review.

Iceberg stores versioning tables through the operations for Iceberg tables. We can see the information of each version of table by inspecting tables, and we can also time travel or roll back tables to an old table version.

To complete the customer request to revert the updated review, we need to revert the table version of acr_iceberg to the earlier version when we first added the reviews. Additionally, we need to update the acr_iceberg_report table to reflect the rollback of the acr_iceberg table version. Specifically, we need to perform the following three steps to complete these operations:

  1. Check the history of table changes of acr_iceberg and acr_iceberg_report to get each table snapshot.
  2. Roll back acr_iceberg to the version when first we inserted records, and also roll back the acr_iceberg_report table to the initial version to reflect the customer review update.
  3. Merge the acr_iceberg table with the acr_iceberg_report table again.

Get the metadata of each report table

As a first step, we check table versions by inspecting the table. Run the following cells.

Now you can see the following table versions in acr_iceberg and acr_iceberg_report:

  • acr_iceberg has three versions:
    • The oldest one is the initial version of this table, which shows the append operation
    • The second oldest one is the record insertion, which shows the append operation
    • The latest one is the update, which shows the overwrite operation
  • acr_iceberg_report has two versions:
    • The oldest one is the initial version of this table, which shows the append operation
    • The other one is from the MERGE INTO query in the previous section, which shows the overwrite operation

As shown in the following screenshot, we roll back to the acr_iceberg table version, inserting records based on the customer revert request. We also roll back to the acr_iceberg_report table version in the initial version to discard the MERGE INTO operation in the previous section.

Roll back the acr_iceberg and acr_iceberg_report tables

Based on your snapshot IDs, you can roll back each table version:

  • For acr_iceberg, use the second-oldest snapshot_id (in this example, 5440744662350048750) and replace <Type snapshot_id in ace_iceberg table> in the following cell with this snapshot_id.
  • For acr_iceberg_report table, use the initial snapshot_id (in this example, 7958428388396549892) and replace <Type snaphost_id in ace_iceberg_report table> in the following cell with this snapshot_id.

After you specify the snapshot_id for each rollback query, run the following cells.

When this step is complete, you can see the previous and current snapshot IDs of each table.

Each Iceberg table has been reverted to the specific version now.

Reflect changes in acr_iceberg into acr_iceberg_report again

We reflect the acr_iceberg table reversion into the current acr_iceberg_report table. To complete this, run the following cell.

After you rerun the MERGE INTO query, run the following cell to see the new table records. When we compare the table records, we observe that the avg_star value in Industrial_Supplies is lower than the value of the previous table avg_star.

You were able to reflect a customer’s request of reverting their updated review on the BI report table. Specifically, you can get the updated avg_star record in the Industrial_Supplies product category.

Clean up

To clean up all resources that you created, delete the CloudFormation stack.

Conclusion

In this post, we walked through using the Apache Iceberg connector with AWS Glue ETL jobs. We created an Iceberg table built on Amazon S3, and ran queries such as reading the Iceberg table data, inserting a record, merging two tables, and time travel.

The operations for the Iceberg table that we demonstrated in this post aren’t all of the operations Iceberg supports. Refer to the Apache Iceberg documentation for information about more operations.

Appendix: Spark configurations to use Apache Iceberg on AWS Glue

As we mentioned earlier, the notebook sets up a Spark configuration to integrate Iceberg with AWS Glue. The following table shows what each parameter defines.

Spark configuration key Value Description
spark.sql.catalog.{CATALOG} org.apache.iceberg.spark.SparkCatalog Specifies a Spark catalog interface that communicates with Iceberg tables.
spark.sql.catalog.{CATALOG}.warehouse {WAREHOUSE_PATH} A warehouse path for jobs to write iceberg metadata and actual data.
spark.sql.catalog.{CATALOG}.catalog-impl org.apache.iceberg.aws.
glue.GlueCatalog
The implementation of the Spark catalog class to communicate between Iceberg tables and the AWS Glue Data Catalog.
spark.sql.catalog.{CATALOG}.io-impl org.apache.iceberg.aws.s3.S3FileIO Used for Iceberg to communicate with Amazon S3.
spark.sql.catalog.{CATALOG}.lock-impl org.apache.iceberg.aws.glue.
DynamoLockManager
Used for Iceberg to manage table locks.
spark.sql.catalog.{CATALOG}.lock.table {DYNAMODB_TABLE} A DynamoDB table name to store table locks.
spark.sql.extensions org.apache.icerberg.spark.extensions.
IcebergSparkSessionExtensions
The implementation that enables Spark to run Iceberg-specific SQL commands.
spark.sql.session.timeZone UTC Sets the time zone of the Spark environment to UTC for further Iceberg time travel queries. The epoch time is in the UTC time zone.

About the Author

Tomohiro Tanaka is a Cloud Support Engineer at Amazon Web Services. He builds Glue connectors such as Apache Iceberg connector and TPC-DS connector. He’s passionate about helping customers build data lakes using ETL workloads. In his free time, he also enjoys coffee breaks with his colleagues and making coffee at home.

Best practices for right-sizing your Apache Kafka clusters to optimize performance and cost

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/best-practices-for-right-sizing-your-apache-kafka-clusters-to-optimize-performance-and-cost/

Apache Kafka is well known for its performance and tunability to optimize for various use cases. But sometimes it can be challenging to find the right infrastructure configuration that meets your specific performance requirements while minimizing the infrastructure cost.

This post explains how the underlying infrastructure affects Apache Kafka performance. We discuss strategies on how to size your clusters to meet your throughput, availability, and latency requirements. Along the way, we answer questions like “when does it make sense to scale up vs. scale out?” We end with guidance on how to continuously verify the size of your production clusters.

We use performance tests to illustrate and explain the effect and trade-off of different strategies to size your cluster. But as usual, it’s important to not just blindly trust benchmarks you happen to find on the internet. We therefore not only show how to reproduce the results, but also explain how to use a performance testing framework to run your own tests for your specific workload characteristics.

Sizing Apache Kafka clusters

The most common resource bottlenecks for clusters from an infrastructure perspective are network throughput, storage throughput, and network throughput between brokers and the storage backend for brokers using network attached storage such as Amazon Elastic Block Store (Amazon EBS).

The remainder of the post explains how the sustained throughput limit of a cluster not only depends on the storage and network throughput limits of the brokers, but also on the number of brokers and consumer groups as well as the replication factor r. We derive the following formula (referred to as Equation 1 throughout this post) for the theoretical sustained throughput limit tcluster given the infrastructure characteristics of a specific cluster:

max(tcluster) <= min{
  max(tstorage) * #brokers/r,
  max(tEBSnetwork) * #brokers/r,
  max(tEC2network) * #brokers/(#consumer groups + r-1)
}

For production clusters, it’s a best practice to target the actual throughput at 80% of its theoretical sustained throughput limit. Consider, for instance, a three-node cluster with m5.12xlarge brokers, a replication factor of 3, EBS volumes with a baseline throughput of 1000 MB/sec, and two consumer groups consuming from the tip of the topic. Taking all these parameters into account, the sustained throughput absorbed by the cluster should target 800 MB/sec.

However, this throughput calculation is merely providing an upper bound for workloads that are optimized for high throughput scenarios. Regardless of how you configure your topics and the clients reading from and writing into these topics, the cluster can’t absorb more throughput. For workloads with different characteristics, like latency-sensitive or compute-intensive workloads, the actual throughput that can be absorbed by a cluster while meeting these additional requirements is often smaller.

To find the right configuration for your workload, you need to work backward from your use case and determine the appropriate throughput, availability, durability, and latency requirements. Then, use Equation 1 to obtain the initial sizing of your cluster based on your throughput, durability, and storage requirements. Verify this initial cluster sizing by running performance tests and then fine-tune the cluster size, cluster configuration, and client configuration to meet your other requirements. Lastly, add additional capacity for production clusters so they can still ingest the expected throughput even if the cluster is running at reduced capacity, for instance, during maintenance, scaling, or loss of a broker. Depending on your workload, you may even consider adding enough spare capacity to withstanding an event affecting all brokers of an entire Availability Zone.

The remainder of the post dives deeper into the aspects of cluster sizing. The most important aspects are as follows:

  • There is often a choice between either scaling out or scaling up to increase the throughput and performance of a cluster. Small brokers give you smaller capacity increments and have a smaller blast radius in case they become unavailable. But having many small brokers increases the time it takes for operations that require a rolling update to brokers to complete, and increases the likelihood for failure.
  • All traffic that producers are sending into a cluster is persisted to disk. Therefore, the underlying throughput of the storage volume can become the bottleneck of the cluster. In this case, it makes sense to either increase the volume throughput if possible or to add more volumes to the cluster.
  • All data persisted on EBS volumes traverses the network. Amazon EBS-optimized instances come with dedicated capacity for Amazon EBS I/O, but the dedicated Amazon EBS network can still become the bottleneck of the cluster. In this case, it makes sense to scale up brokers, because larger brokers have higher Amazon EBS network throughput.
  • The more consumer groups that are reading from the cluster, the more data that egresses over the Amazon Elastic Compute Cloud (Amazon EC2) network of the brokers. Depending on the broker type and size, the Amazon EC2 network can become the bottleneck of the cluster. In that case, it makes sense to scale up brokers, because larger brokers have higher Amazon EC2 network throughput.
  • For p99 put latencies, there is a substantial performance impact of enabling in-cluster encryption. Scaling up the brokers of a cluster can substantially reduce the p99 put latency compared to smaller brokers.
  • When consumers fall behind or need to reprocess historic data, the requested data may no longer reside in memory, and brokers need to fetch data from the storage volume. This causes non-sequential I/O reads. When using EBS volumes, it also causes additional network traffic to the volume. Using larger brokers with more memory or enabling compression can mitigate this effect.
  • Using the burst capabilities of your cluster is a very powerful way to absorb sudden throughput spikes without scaling your cluster, which takes time to complete. Burst capacity also helps in response to operational events. For instance, when brokers are undergoing maintenance or partitions need to be rebalanced within the cluster, they can use the burst performance to complete the operation faster.
  • Monitor or alarm on important infrastructure-related cluster metrics such as BytesInPerSec, ReplicationBytesInPerSec, BytesOutPerSec, and ReplicationBytesOutPerSec to receive notification when the current cluster size is no longer optimal for the current cluster size.

The remainder of the post provides additional context and explains the reasoning behind these recommendations.

Understanding Apache Kafka performance bottlenecks

Before we start talking about performance bottlenecks from an infrastructure perspective, let’s revisit how data flows within a cluster.

For this post, we assume that producers and consumers are behaving well and according to best practices, unless explicitly stated differently. For example, we assume the producers are evenly balancing the load between brokers, brokers host the same number of partitions, there are enough partitions to ingest the throughput, consumers consume directly from the tip of the stream, and so on. The brokers are receiving the same load and are doing the same work. We therefore just focus on Broker 1 in the following diagram of a data flow within a cluster.

Data flow within a Kafka cluster

The producers send an aggregate throughput of tcluster into the cluster. As the traffic evenly spreads across brokers, Broker 1 receives an incoming throughput of tcluster/3. With a replication factor of 3, Broker 1 replicates the traffic it directly receives to the two other brokers (the blue lines). Likewise, Broker 1 receives replication traffic from two brokers (the red lines). Each consumer group consumes the traffic that is directly produced into Broker 1 (the green lines). All traffic that arrives in Broker 1 from producers and replication traffic from other brokers is eventually persisted to storage volumes attached to the broker.

Accordingly, the throughput of the storage volume and the broker network are both tightly coupled with the overall cluster throughput and warrant a closer look.

Storage backend throughput characteristics

Apache Kafka has been designed to utilize large sequential I/O operations when writing data to disk. Producers are only ever appending data to the tip of the log, causing sequential writes. Moreover, Apache Kafka is not synchronously flushing to disk. Instead, Apache Kafka is writing to the page cache, leaving it up to the operating system to flush pages to disk. This results in large sequential I/O operations, which optimizes disk throughput.

For many practical purposes, the broker can drive the full throughput of the volume and is not limited by IOPS. We assume that consumers are reading from the tip of the topic. This implies that performance of EBS volumes is throughput bound and not I/O bound, and reads are served from the page cache.

The ingress throughput of the storage backend depends on the data that producers are sending directly to the broker plus the replication traffic the broker is receiving from its peers. For an aggregated throughput produced into the cluster of tcluster and a replication factor of r, the throughput received by the broker storage is as follows:

tstorage = tcluster/#brokers + tcluster/#brokers * (r-1)
        = tcluster/#brokers * r

Therefore, the sustained throughput limit of the entire cluster is bound by the following:

max(tcluster) <= max(tstorage) * #brokers/r

AWS offers different options for block storage: instance storage and Amazon EBS. Instance storage is located on disks that are physically attached to the host computer, whereas Amazon EBS is network attached storage.

Instance families that come with instance storage achieve high IOPS and disk throughput. For instance, Amazon EC2 I3 instances include NVMe SSD-based instance storage optimized for low latency, very high random I/O performance, and high sequential read throughput. However, the volumes are tied to brokers. Their characteristics, in particular their size, only depend on the instance family, and the volume size can’t be adapted. Moreover, when a broker fails and needs to be replaced, the storage volume is lost. The replacement broker then needs to replicate the data from other brokers. This replication causes additional load on the cluster in addition to the reduced capacity from the broker loss.

In contrast, the characteristics of EBS volumes can be adapted while they’re in use. You can use these capabilities to automatically scale broker storage over time rather than provisioning storage for peak or adding additional brokers. Some EBS volume types, such as gp3, io2, and st1, also allow you to adapt the throughput and IOPS characteristics of existing volumes. Moreover, the lifecycle of EBS volumes is independent of the broker—if a broker fails and needs to be replaced, the EBS volume can be reattached to the replacement broker. This avoids most of the otherwise required replication traffic.

Using EBS volumes is therefore often a good choice for many common Apache Kafka workloads. They provide more flexibility and enable faster scaling and recovery operations.

Amazon EBS throughput characteristics

When using Amazon EBS as the storage backend, there are several volume types to choose from. The throughput characteristics of the different volume types range between 128 MB/sec and 4000 MB/sec (for more information, refer to Amazon EBS volume types). You can even choose to attach multiple volumes to a broker to increase the throughput beyond what can be delivered by a single volume.

However, Amazon EBS is network attached storage. All data a broker is writing to an EBS volume needs to traverse the network to the Amazon EBS backend. Newer generation instance families, like the M5 family, are Amazon EBS-optimized instances with dedicated capacity for Amazon EBS I/O. But there are limits on the throughput and the IOPS that depend on the size of the instance and not only on the volume size. The dedicated capacity for Amazon EBS provides a higher baseline throughput and IOPS for larger instances. The capacity ranges between 81 MB/sec and 2375 MB/sec. For more information, refer to Supported instance types.

When using Amazon EBS for storage, we can adapt the formula for the cluster sustained throughput limit to obtain a tighter upper bound:

max(tcluster) <= min{
  max(tstorage) * #brokers/r,
  max(tEBSnetwork) * #brokers/r
}

Amazon EC2 network throughput

So far, we have only considered network traffic to the EBS volume. But replication and the consumer groups also cause Amazon EC2 network traffic out of the broker. The traffic that producers are sending into a broker is replicated to r-1 brokers. Moreover, every consumer group reads the traffic that a broker ingests. Therefore, the overall outgoing network traffic is as follows:

tEC2network = tcluster/#brokers * #consumer groups + tcluster/#brokers * (r–1)
          = tcluster/#brokers * (#consumer groups + r-1)

Taking this traffic into account finally gives us a reasonable upper bound for the sustained throughput limit of the cluster, which we have already seen in Equation 1:

max(tcluster) <= min{
  max(tstorage) * #brokers/r,
  max(tEBSnetwork) * #brokers/r,
  max(tEC2network) * #brokers/(#consumer groups + r-1)
}

For production workloads, we recommend keeping the actual throughput of your workload below 80% of the theoretical sustained throughput limit as it’s determined by this formula. Furthermore, we assume that all data producers sent into the cluster are eventually read by at least one consumer group. When the number of consumers is larger or equal than 1, the Amazon EC2 network traffic out of a broker is always higher than the traffic into the broker. We can therefore ignore data traffic into brokers as a potential bottleneck.

With Equation 1, we can verify if a cluster with a given infrastructure can absorb the throughput required for our workload under ideal conditions. For more information about the Amazon EC2 network bandwidth of m5.8xlarge and larger instances, refer to Amazon EC2 Instance Types. You can also find the Amazon EBS bandwidth of m5.4xlarge instances on the same page. Smaller instances use credit-based systems for Amazon EC2 network bandwidth and the Amazon EBS bandwidth. For the Amazon EC2 network baseline bandwidth, refer to Network performance. For the Amazon EBS baseline bandwidth, refer to Supported instance types.

Right-size your cluster to optimize for performance and cost

So, what do we take from this? Most importantly, keep in mind that that these results only indicate the sustained throughput limit of a cluster under ideal conditions. These results can give you a general number for the expected sustained throughput limit of your clusters. But you must run your own experiments to verify these results for your specific workload and configuration.

However, we can draw a few conclusions from this throughput estimation: adding brokers increases the sustained cluster throughput. Similarly, decreasing the replication factor increases the sustained cluster throughput. Adding more than one consumer group may reduce the sustained cluster throughput if the Amazon EC2 network becomes the bottleneck.

Let’s run a couple of experiments to get empirical data on practical sustained cluster throughput that also accounts for producer put latencies. For these tests, we keep the throughput within the recommended 80% of the sustained throughput limit of clusters. When running your own tests, you may notice that clusters can even deliver higher throughput than what we show.

Measure Amazon MSK cluster throughput and put latencies

To create the infrastructure for the experiments, we use Amazon Managed Streaming for Apache Kafka (Amazon MSK). Amazon MSK provisions and manages highly available Apache Kafka clusters that are backed by Amazon EBS storage. The following discussion therefore also applies to clusters that have not been provisioned through Amazon MSK, if backed by EBS volumes.

The experiments are based on the kafka-producer-perf-test.sh and kafka-consumer-perf-test.sh tools that are included in the Apache Kafka distribution. The tests use six producers and two consumer groups with six consumers each that are concurrently reading and writing from the cluster. As mentioned before, we make sure that clients and brokers are behaving well and according to best practices: producers are evenly balancing the load between brokers, brokers host the same number of partitions, consumers consume directly from the tip of the stream, producers and consumers are over-provisioned so that they don’t become a bottleneck in the measurements, and so on.

We use clusters that have their brokers deployed to three Availability Zones. Moreover, replication is set to 3 and acks is set to all to achieve a high durability of the data that is persisted in the cluster. We also configured a batch.size of 256 kB or 512 kB and set linger.ms to 5 milliseconds, which reduces the overhead of ingesting small batches of records and therefore optimizes throughput. The number of partitions is adjusted to the broker size and cluster throughput.

The configuration for brokers larger than m5.2xlarge has been adapted according to the guidance of the Amazon MSK Developer Guide. In particular when using provisioned throughput, it’s essential to optimize the cluster configuration accordingly.

The following figure compares put latencies for three clusters with different broker sizes. For each cluster, the producers are running roughly a dozen individual performance tests with different throughput configurations. Initially, the producers produce a combined throughput of 16 MB/sec into the cluster and gradually increase the throughput with every individual test. Each individual test runs for 1 hour. For instances with burstable performance characteristics, credits are depleted before starting the actual performance measurement.

Comparing throughput and put latencies of different broker sizes

For brokers with more than 334 GB of storage, we can assume the EBS volume has a baseline throughput of 250 MB/sec. The Amazon EBS network baseline throughput is 81.25, 143.75, 287.5, and 593.75 MB/sec for the different broker sizes (for more information, see Supported instance types). The Amazon EC2 network baseline throughput is 96, 160, 320, and 640 MB/sec (for more information, see Network performance). Note that this only considers the sustained throughput; we discuss burst performance in a later section.

For a three-node cluster with replication 3 and two consumer groups, the recommended ingress throughput limits as per Equation 1 is as follows.

Broker size Recommended sustained throughput limit
m5.large 58 MB/sec
m5.xlarge 96 MB/sec
m5.2xlarge 192 MB/sec
m5.4xlarge 200 MB/sec

Even though the m5.4xlarge brokers have twice the number of vCPUs and memory compared to m5.2xlarge brokers, the cluster sustained throughput limit barely increases when scaling the brokers from m5.2xlarge to m5.4xlarge. That’s because with this configuration, the EBS volume used by brokers becomes a bottleneck. Remember that we’ve assumed a baseline throughput of 250 MB/sec for these volumes. For a three-node cluster and replication factor of 3, each broker needs to write the same traffic to the EBS volume as is sent to the cluster itself. And because the 80% of the baseline throughput of the EBS volume is 200 MB/sec, the recommended sustained throughput limit of the cluster with m5.4xlarge brokers is 200 MB/sec.

The next section describes how you can use provisioned throughput to increase the baseline throughput of EBS volumes and therefore increase the sustained throughput limit of the entire cluster.

Increase broker throughput with provisioned throughput

From the previous results, you can see that from a pure throughput perspective there is little benefit to increasing the broker size from m5.2xlarge to m5.4xlarge with the default cluster configuration. The baseline throughput of the EBS volume used by brokers limits their throughput. However, Amazon MSK recently launched the ability to provision storage throughput up to 1000 MB/sec. For self-managed clusters you can use gp3, io2, or st1 volume types to achieve a similar effect. Depending on the broker size, this can substantially increase the overall cluster throughput.

The following figure compares the cluster throughput and put latencies of different broker sizes and different provisioned throughput configurations.

Comparing max sustained throughput of different brokers with and without provisioned throughput

For a three-node cluster with replication 3 and two consumer groups, the recommended ingress throughput limits as per Equation 1 are as follows.

Broker size Provisioned throughput configuration Recommended sustained throughput limit
m5.4xlarge 200 MB/sec
m5.4xlarge 480 MB/sec 384 MB/sec
m5.8xlarge 850 MB/sec 680 MB/sec
m5.12xlarge 1000 MB/sec 800 MB/sec
m5.16xlarge 1000 MB/sec 800 MB/sec

The provisioned throughput configuration was carefully chosen for the given workload. With two consumer groups consuming from the cluster, it doesn’t make sense to increase the provisioned throughput of m4.4xlarge brokers beyond the 480 MB/sec. The Amazon EC2 network, not the EBS volume throughput, restricts the recommended sustained throughput limit of the cluster to 384 MB/sec. But for workloads with a different number of consumers, it can make sense to further increase or decrease the provisioned throughput configuration to match the baseline throughput of the Amazon EC2 network.

Scale out to increase cluster write throughput

Scaling out the cluster naturally increases the cluster throughput. But how does this affect performance and cost? Let’s compare the throughput of two different clusters: a three-node m5.4xlarge and a six-node m5.2xlarge cluster, as shown in the following figure. The storage size for the m5.4xlarge cluster has been adapted so that both clusters have the same total storage capacity and therefore the cost for these clusters is identical.

Comparing throughput of different cluster configurations

The six-node cluster has almost double the throughput of the three-node cluster and substantially lower p99 put latencies. Just looking at ingress throughput of the cluster, it can make sense to scale out rather than to scale up, if you need more that 200 MB/sec of throughput. The following table summarizes these recommendations.

Number of brokers Recommended sustained throughput limit
m5.large m5.2xlarge m5.4xlarge
3 58 MB/sec 192 MB/sec 200 MB/sec
6 115 MB/sec 384 MB/sec 400 MB/sec
9 173 MB/sec 576 MB/sec 600 MB/sec

In this case, we could have also used provisioned throughput to increase the throughput of the cluster. Compare, for instance, the sustained throughput limit of the six-node m5.2xlarge cluster in the preceding figure with that of the three-node m5.4xlarge cluster with provisioned throughput from the earlier example. The sustained throughput limit of both clusters is identical, which is caused by the same Amazon EC2 network bandwidth limit that usually grows proportional with the broker size.

Scale up to increase cluster read throughput

The more consumer groups are reading from the cluster, the more data egresses over the Amazon EC2 network of the brokers. Larger brokers have a higher network baseline throughput (up to 25 Gb/sec) and can therefore support more consumer groups reading from the cluster.

The following figure compares how latency and throughput changes for the different number of consumer groups for a three-node m5.2xlarge cluster.

Comparing the max sustained throughput of a cluster for different number of consumer groups

As demonstrated in this figure, increasing the number of consumer groups reading from a cluster decreases its sustained throughput limit. The more consumers that consumer groups are reading from the cluster, the more data that needs to egress from the brokers over the Amazon EC2 network. The following table summarizes these recommendations.

Consumer groups Recommended sustained throughput limit
m5.large m5.2xlarge m5.4xlarge
0 65 MB/sec 200 MB/sec 200 MB/sec
2 58 MB/sec 192 MB/sec 200 MB/sec
4 38 MB/sec 128 MB/sec 200 MB/sec
6 29 MB/sec 96 MB/sec 192 MB/sec

The broker size determines the Amazon EC2 network throughput, and there is no way to increase it other than scaling up. Accordingly, to scale the read throughput of the cluster, you either need to scale up brokers or increase the number of brokers.

Balance broker size and number of brokers

When sizing a cluster, you often have the choice to either scale out or scale up to increase the throughput and performance of a cluster. Assuming storage size is adjusted accordingly, the cost of those two options is often identical. So when should you scale out or scale up?

Using smaller brokers allows you to scale the capacity in smaller increments. Amazon MSK enforces that brokers are evenly balanced across all configured Availability Zones. You can therefore only add a number of brokers that are a multiple of the number of Availability Zones. For instance, if you add three brokers to a three-node m5.4xlarge cluster with provisioned throughput, you increase the recommended sustained cluster throughput limit by 100%, from 384 MB/sec to 768 MB/sec. However, if you add three brokers to a six-node m5.2xlarge cluster, you increase the recommended cluster throughput limit by 50%, from 384 MB/sec to 576 MB/sec.

Having too few very large brokers also increases the blast radius in case a single broker is down for maintenance or because of failure of the underlying infrastructure. For instance, for a three-node cluster, a single broker corresponds to 33% of the cluster capacity, whereas it’s only 17% for a six-node cluster. When provisioning clusters to best practices, you have added enough spare capacity to not impact your workload during these operations. But for larger brokers, you may need to add more spare capacity than required because of the larger capacity increments.

However, the more brokers are part of the cluster, the longer it takes for maintenance and update operations to complete. The service applies these changes sequentially to one broker at a time to minimize impact to the availability of the cluster. When provisioning clusters to best practices, you have added enough spare capacity to not impact your workload during these operations. But the time it takes to complete the operation is still something to consider because you need to wait for one operation to complete before you can run another one.

You need to find a balance that works for your workload. Small brokers are more flexible because they give you smaller capacity increments. But having too many small brokers increases the time it takes for maintenance operations to complete and increase the likelihood for failure. Clusters with fewer larger brokers complete update operations faster. But they come with larger capacity increments and a higher blast radius in case of broker failure.

Scale up for CPU intensive workloads

So far, we have we have focused on the network throughput of brokers. But there are other factors that determine the throughput and latency of the cluster. One of them is encryption. Apache Kafka has several layers where encryption can protect data in transit and at rest: encryption of the data stored on the storage volumes, encryption of traffic between brokers, and encryption of traffic between clients and brokers.

Amazon MSK always encrypts your data at rest. You can specify the AWS Key Management Service (AWS KMS) customer master key (CMK) that you want Amazon MSK to use to encrypt your data at rest. If you don’t specify a CMK, Amazon MSK creates an AWS managed CMK for you and uses it on your behalf. For data that is in-flight, you can choose to enable encryption of data between producers and brokers (in-transit encryption), between brokers (in-cluster encryption), or both.

Turning on in-cluster encryption forces the brokers to encrypt and decrypt individual messages. Therefore, sending messages over the network can no longer take advantage of the efficient zero copy operation. This results in additional CPU and memory bandwidth overhead.

The following figure shows the performance impact for these options for three-node clusters with m5.large and m5.2xlarge brokers.

Comparing put latencies for different encryption settings and broker sizes

For p99 put latencies, there is a substantial performance impact of enabling in-cluster encryption. As shown in the preceding graphs, scaling up brokers can mitigate the effect. The p99 put latency at 52 MB/sec throughput of an m5.large cluster with in-transit and in-cluster encryption is above 200 milliseconds (red and green dashed line in the left graph). Scaling the cluster to m5.2xlarge brokers brings down the p99 put latency at the same throughput to below 15 milliseconds (red and green dashed line in the right graph).

There are other factors that can increase CPU requirements. Compression as well as log compaction can also impact the load on clusters.

Scale up for a consumer not reading from the tip of the stream

We have designed the performance tests such that consumers are always reading from the tip of the topic. This effectively means that brokers can serve the reads from consumers directly from memory, not causing any read I/O to Amazon EBS. In contrast to all other sections of the post, we drop this assumption to understand how consumers that have fallen behind can impact cluster performance. The following diagram illustrates this design.

Illustration of cunsomers reading from page cache and storage

When a consumer falls behind or needs to recover from failure it reprocesses older messages. In that case, the pages holding the data may no longer reside in the page cache, and brokers need to fetch the data from the EBS volume. That causes additional network traffic to the volume and non-sequential I/O reads. This can substantially impact the throughput of the EBS volume.

In an extreme case, a backfill operation can reprocess the complete history of events. In that case, the operation not only causes additional I/O to the EBS volume, it also loads a lot of pages holding historic data into the page cache, effectively evicting pages that are holding more recent data. Consequently, consumers that are slightly behind the tip of the topic and would usually read directly from the page cache may now cause additional I/O to the EBS volume because the backfill operation has evicted the page they need to read from memory.

One option to mitigate these scenarios is to enable compression. By compressing the raw data, brokers can keep more data in the page cache before it’s evicted from memory. However, keep in mind that compression requires more CPU resources. If you can’t enable compression or if enabling compression can’t mitigate this scenario, you can also increase the size of the page cache by increasing the memory available to brokers by scaling up.

Use burst performance to accommodate traffic spikes

So far, we’ve been looking at the sustained throughput limit of clusters. That’s the throughput the cluster can sustain indefinitely. For streaming workloads, it’s important to understand baseline the throughput requirements and size accordingly. However, the Amazon EC2 network, Amazon EBS network, and Amazon EBS storage system are based on a credit system; they provide a certain baseline throughput and can burst to a higher throughput for a certain period based on the instance size. This directly translates to the throughput of MSK clusters. MSK clusters have a sustained throughput limit and can burst to a higher throughput for short periods.

The blue line in the following graph shows the aggregate throughput of a three-node m5.large cluster with two consumer groups. During the entire experiment, producers are trying to send data as quickly as possible into the cluster. So, although 80% of the sustained throughput limit of the cluster is around 58 MB/sec, the cluster can burst to a throughput well above 200 MB/sec for almost half an hour.

Throughput of a fully saturated cluster over time

Think of it this way: When configuring the underlying infrastructure of a cluster, you’re basically provisioning a cluster with a certain sustained throughput limit. Given the burst capabilities, the cluster can then instantaneously absorb much higher throughput for some time. For instance, if the average throughput of your workload is usually around 50 MB/sec, the three-node m5.large cluster in the preceding graph can ingress more than four times its usual throughput for roughly half an hour. And that’s without any changes required. This burst to a higher throughput is completely transparent and doesn’t require any scaling operation.

This is a very powerful way to absorb sudden throughput spikes without scaling your cluster, which takes time to complete. Moreover, the additional capacity also helps in response to operational events. For instance, when brokers are undergoing maintenance or partitions need to be rebalanced within the cluster, they can use burst performance to get brokers online and back in sync more quickly. The burst capacity is also very valuable to quickly recover from operational events that affect an entire Availability Zone and cause a lot of replication traffic in response to the event.

Monitoring and continuous optimization

So far, we have focused on the initial sizing of your cluster. But after you determine the correct initial cluster size, the sizing efforts shouldn’t stop. It’s important to keep reviewing your workload after it’s running in production to know if the broker size is still appropriate. Your initial assumptions may no longer hold in practice, or your design goals might have changed. After all, one of the great benefits of cloud computing is that you can adapt the underlying infrastructure through an API call.

As we have mentioned before, the throughput of your production clusters should target 80% of their sustained throughput limit. When the underlying infrastructure is starting to experience throttling because it has exceeded the throughput limit for too long, you need to scale up the cluster. Ideally, you would even scale the cluster before it reaches this point. By default, Amazon MSK exposes three metrics that indicate when this throttling is applied to the underlying infrastructure:

  • BurstBalance – Indicates the remaining balance of I/O burst credits for EBS volumes. If this metric starts to drop, consider increasing the size of the EBS volume to increase the volume baseline performance. If Amazon CloudWatch isn’t reporting this metric for your cluster, your volumes are larger than 5.3 TB and no longer subject to burst credits.
  • CPUCreditBalance – Only relevant for brokers of the T3 family and indicates the amount of available CPU credits. When this metric starts to drop, brokers are consuming CPU credits to burst beyond their CPU baseline performance. Consider changing the broker type to the M5 family.
  • TrafficShaping – A high-level metric indicating the number of packets dropped due to exceeding network allocations. Finer detail is available when the PER_BROKER monitoring level is configured for the cluster. Scale up brokers if this metric is elevated during your typical workloads.

In the previous example, we saw the cluster throughput drop substantially after network credits were depleted and traffic shaping was applied. Even if we didn’t know the maximum sustained throughput limit of the cluster, the TrafficShaping metric in the following graph clearly indicates that we need to scale up the brokers to avoid further throttling on the Amazon EC2 network layer.

Throttling of the broker network correlates with the cluster throughput drop

Amazon MSK exposes additional metrics that help you understand whether your cluster is over- or under-provisioned. As part of the sizing exercise, you have determined the sustained throughput limit of your cluster. You can monitor or even create alarms on the BytesInPerSec, ReplicationBytesInPerSec, BytesOutPerSec, and ReplicationBytesInPerSec metrics of the cluster to receive notification when the current cluster size is no longer optimal for the current workload characteristics. Likewise, you can monitor the CPUIdle metric and alarm when your cluster is under- or over-provisioned in terms of CPU utilization.

Those are only the most relevant metrics to monitor the size of your cluster from an infrastructure perspective. You should also monitor the health of the cluster and the entire workload. For further guidance on monitoring clusters, refer to Best Practices.

A framework for testing Apache Kafka performance

As mentioned before, you must run your own tests to verify if the performance of a cluster matches your specific workload characteristics. We have published a performance testing framework on GitHub that helps automate the scheduling and visualization of many tests. We have been using the same framework to generate the graphs that we have been discussing in this post.

The framework is based on the kafka-producer-perf-test.sh and kafka-consumer-perf-test.sh tools that are part of the Apache Kafka distribution. It builds automation and visualization around these tools.

For smaller brokers that are subject to bust capabilities, you can also configure the framework to first generate excess load over an extended period to deplete networking, storage, or storage network credits. After the credit depletion completes, the framework runs the actual performance test. This is important to measure the performance of clusters that can be sustained indefinitely rather than measuring peak performance, which can only be sustained for some time.

To run your own test, refer to the GitHub repository, where you can find the AWS Cloud Development Kit (AWS CDK) template and additional documentation on how to configure, run, and visualize the results of performance test.

Conclusion

We’ve discussed various factors that contribute to the performance of Apache Kafka from an infrastructure perspective. Although we’ve focused on Apache Kafka, we also learned about Amazon EC2 networking and Amazon EBS performance characteristics.

To find the right size for your clusters, work backward from your use case to determine the throughput, availability, durability, and latency requirements.

Start with an initial sizing of your cluster based on your throughput, storage, and durability requirements. Scale out or use provisioned throughput to increase the write throughput of the cluster. Scale up to increase the number of consumers that can consume from the cluster. Scale up to facilitate in-transit or in-cluster encryption and consumers that aren’t reading form the tip of the stream.

Verify this initial cluster sizing by running performance tests and then fine-tune the cluster size and configuration to match other requirements, such as latency. Add additional capacity for production clusters so that they can withstand the maintenance or loss of a broker. Depending on your workload, you may even consider withstanding an event affecting an entire Availability Zone. Finally, keep monitoring your cluster metrics and resize the cluster in case your initial assumptions no longer hold.


About the Author

Steffen Hausmann is a Principal Streaming Architect at AWS. He works with customers around the globe to design and build streaming architectures so that they can get value from analyzing their streaming data. He holds a doctorate degree in computer science from the University of Munich and in his free time, he tries to lure his daughters into tech with cute stickers he collects at conferences.

Unify log aggregation and analytics across compute platforms

Post Syndicated from Hari Ohm Prasath original https://aws.amazon.com/blogs/big-data/unify-log-aggregation-and-analytics-across-compute-platforms/

Our customers want to make sure their users have the best experience running their application on AWS. To make this happen, you need to monitor and fix software problems as quickly as possible. Doing this gets challenging with the growing volume of data needing to be quickly detected, analyzed, and stored. In this post, we walk you through an automated process to aggregate and monitor logging-application data in near-real time, so you can remediate application issues faster.

This post shows how to unify and centralize logs across different computing platforms. With this solution, you can unify logs from Amazon Elastic Compute Cloud (Amazon EC2), Amazon Elastic Container Service (Amazon ECS), Amazon Elastic Kubernetes Service (Amazon EKS), Amazon Kinesis Data Firehose, and AWS Lambda using agents, log routers, and extensions. We use Amazon OpenSearch Service (successor to Amazon Elasticsearch Service) with OpenSearch Dashboards to visualize and analyze the logs, collected across different computing platforms to get application insights. You can deploy the solution using the AWS Cloud Development Kit (AWS CDK) scripts provided as part of the solution.

Customer benefits

A unified aggregated log system provides the following benefits:

  • A single point of access to all the logs across different computing platforms
  • Help defining and standardizing the transformations of logs before they get delivered to downstream systems like Amazon Simple Storage Service (Amazon S3), Amazon OpenSearch Service, Amazon Redshift, and other services
  • The ability to use Amazon OpenSearch Service to quickly index, and OpenSearch Dashboards to search and visualize logs from its routers, applications, and other devices

Solution overview

In this post, we use the following services to demonstrate log aggregation across different compute platforms:

  • Amazon EC2 – A web service that provides secure, resizable compute capacity in the cloud. It’s designed to make web-scale cloud computing easier for developers.
  • Amazon ECS – A web service that makes it easy to run, scale, and manage Docker containers on AWS, designed to make the Docker experience easier for developers.
  • Amazon EKS – A web service that makes it easy to run, scale, and manage Docker containers on AWS.
  • Kinesis Data Firehose – A fully managed service that makes it easy to stream data to Amazon S3, Amazon Redshift, or Amazon OpenSearch Service.
  • Lambda – A compute service that lets you run code without provisioning or managing servers. It’s designed to make web-scale cloud computing easier for developers.
  • Amazon OpenSearch Service – A fully managed service that makes it easy for you to perform interactive log analytics, real-time application monitoring, website search, and more.

The following diagram shows the architecture of our solution.

The architecture uses various log aggregation tools such as log agents, log routers, and Lambda extensions to collect logs from multiple compute platforms and deliver them to Kinesis Data Firehose. Kinesis Data Firehose streams the logs to Amazon OpenSearch Service. Log records that fail to get persisted in Amazon OpenSearch service will get written to AWS S3. To scale this architecture, each of these compute platforms streams the logs to a different Firehose delivery stream, added as a separate index, and rotated every 24 hours.

The following sections demonstrate how the solution is implemented on each of these computing platforms.

Amazon EC2

The Kinesis agent collects and streams logs from the applications running on EC2 instances to Kinesis Data Firehose. The agent is a standalone Java software application that offers an easy way to collect and send data to Kinesis Data Firehose. The agent continuously monitors files and sends logs to the Firehose delivery stream.

BDB-1742-Ec2

The AWS CDK script provided as part of this solution deploys a simple PHP application that generates logs under the /etc/httpd/logs directory on the EC2 instance. The Kinesis agent is configured via /etc/aws-kinesis/agent.json to collect data from access_logs and error_logs, and stream them periodically to Kinesis Data Firehose (ec2-logs-delivery-stream).

Because Amazon OpenSearch Service expects data in JSON format, you can add a call to a Lambda function to transform the log data to JSON format within Kinesis Data Firehose before streaming to Amazon OpenSearch Service. The following is a sample input for the data transformer:

46.99.153.40 - - [29/Jul/2021:15:32:33 +0000] "GET / HTTP/1.1" 200 173 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36"

The following is our output:

{
    "logs" : "46.99.153.40 - - [29/Jul/2021:15:32:33 +0000] \"GET / HTTP/1.1\" 200 173 \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36\"",
}

We can enhance the Lambda function to extract the timestamp, HTTP, and browser information from the log data, and store them as separate attributes in the JSON document.

Amazon ECS

In the case of Amazon ECS, we use FireLens to send logs directly to Kinesis Data Firehose. FireLens is a container log router for Amazon ECS and AWS Fargate that gives you the extensibility to use the breadth of services at AWS or partner solutions for log analytics and storage.

BDB-1742-ECS

The architecture hosts FireLens as a sidecar, which collects logs from the main container running an httpd application and sends them to Kinesis Data Firehose and streams to Amazon OpenSearch Service. The AWS CDK script provided as part of this solution deploys a httpd container hosted behind an Application Load Balancer. The httpd logs are pushed to Kinesis Data Firehose (ecs-logs-delivery-stream) through the FireLens log router.

Amazon EKS

With the recent announcement of Fluent Bit support for Amazon EKS, you no longer need to run a sidecar to route container logs from Amazon EKS pods running on Fargate. With the new built-in logging support, you can select a destination of your choice to send the records to. Amazon EKS on Fargate uses a version of Fluent Bit for AWS, an upstream conformant distribution of Fluent Bit managed by AWS.

BDB-1742-EKS

The AWS CDK script provided as part of this solution deploys an NGINX container hosted behind an internal Application Load Balancer. The NGINX container logs are pushed to Kinesis Data Firehose (eks-logs-delivery-stream) through the Fluent Bit plugin.

Lambda

For Lambda functions, you can send logs directly to Kinesis Data Firehose using the Lambda extension. You can deny the records being written to Amazon CloudWatch.

BDB-1742-Lambda

After deployment, the workflow is as follows:

  1. On startup, the extension subscribes to receive logs for the platform and function events. A local HTTP server is started inside the external extension, which receives the logs.
  2. The extension buffers the log events in a synchronized queue and writes them to Kinesis Data Firehose via PUT records.
  3. The logs are sent to downstream systems.
  4. The logs are sent to Amazon OpenSearch Service.

The Firehose delivery stream name gets specified as an environment variable (AWS_KINESIS_STREAM_NAME).

For this solution, because we’re only focusing on collecting the run logs of the Lambda function, the data transformer of the Kinesis Data Firehose delivery stream filters out the records of type function ("type":"function") before sending it to Amazon OpenSearch Service.

The following is a sample input for the data transformer:

[
   {
      "time":"2021-07-29T19:54:08.949Z",
      "type":"platform.start",
      "record":{
         "requestId":"024ae572-72c7-44e0-90f5-3f002a1df3f2",
         "version":"$LATEST"
      }
   },
   {
      "time":"2021-07-29T19:54:09.094Z",
      "type":"platform.logsSubscription",
      "record":{
         "name":"kinesisfirehose-logs-extension-demo",
         "state":"Subscribed",
         "types":[
            "platform",
            "function"
         ]
      }
   },
   {
      "time":"2021-07-29T19:54:09.096Z",
      "type":"function",
      "record":"2021-07-29T19:54:09.094Z\tundefined\tINFO\tLoading function\n"
   },
   {
      "time":"2021-07-29T19:54:09.096Z",
      "type":"platform.extension",
      "record":{
         "name":"kinesisfirehose-logs-extension-demo",
         "state":"Ready",
         "events":[
            "INVOKE",
            "SHUTDOWN"
         ]
      }
   },
   {
      "time":"2021-07-29T19:54:09.097Z",
      "type":"function",
      "record":"2021-07-29T19:54:09.097Z\t024ae572-72c7-44e0-90f5-3f002a1df3f2\tINFO\tvalue1 = value1\n"
   },   
   {
      "time":"2021-07-29T19:54:09.098Z",
      "type":"platform.runtimeDone",
      "record":{
         "requestId":"024ae572-72c7-44e0-90f5-3f002a1df3f2",
         "status":"success"
      }
   }
]

Prerequisites

To implement this solution, you need the following prerequisites:

Build the code

Check out the AWS CDK code by running the following command:

mkdir unified-logs && cd unified-logs
git clone https://github.com/aws-samples/unified-log-aggregation-and-analytics .

Build the lambda extension by running the following command:

cd lib/computes/lambda/extensions
chmod +x extension.sh
./extension.sh
cd ../../../../

Make sure to replace default AWS region specified under the value of firehose.endpoint attribute inside lib/computes/ec2/ec2-startup.sh.

Build the code by running the following command:

yarn install && npm run build

Deploy the code

If you’re running AWS CDK for the first time, run the following command to bootstrap the AWS CDK environment (provide your AWS account ID and AWS Region):

cdk bootstrap \
    --cloudformation-execution-policies arn:aws:iam::aws:policy/AdministratorAccess \
    aws://<AWS Account Id>/<AWS_REGION>

You only need to bootstrap the AWS CDK one time (skip this step if you have already done this).

Run the following command to deploy the code:

cdk deploy --requires-approval

You get the following output:

 ✅  CdkUnifiedLogStack

Outputs:
CdkUnifiedLogStack.ec2ipaddress = xx.xx.xx.xx
CdkUnifiedLogStack.ecsloadbalancerurl = CdkUn-ecsse-PY4D8DVQLK5H-xxxxx.us-east-1.elb.amazonaws.com
CdkUnifiedLogStack.ecsserviceLoadBalancerDNS570CB744 = CdkUn-ecsse-PY4D8DVQLK5H-xxxx.us-east-1.elb.amazonaws.com
CdkUnifiedLogStack.ecsserviceServiceURL88A7B1EE = http://CdkUn-ecsse-PY4D8DVQLK5H-xxxx.us-east-1.elb.amazonaws.com
CdkUnifiedLogStack.eksclusterClusterNameCE21A0DB = ekscluster92983EFB-d29892f99efc4419bc08534a3d253160
CdkUnifiedLogStack.eksclusterConfigCommand515C0544 = aws eks update-kubeconfig --name ekscluster92983EFB-d29892f99efc4419bc08534a3d253160 --region us-east-1 --role-arn arn:aws:iam::xxx:role/CdkUnifiedLogStack-clustermasterroleCD184EDB-12U2TZHS28DW4
CdkUnifiedLogStack.eksclusterGetTokenCommand3C33A2A5 = aws eks get-token --cluster-name ekscluster92983EFB-d29892f99efc4419bc08534a3d253160 --region us-east-1 --role-arn arn:aws:iam::xxx:role/CdkUnifiedLogStack-clustermasterroleCD184EDB-12U2TZHS28DW4
CdkUnifiedLogStack.elasticdomainarn = arn:aws:es:us-east-1:xxx:domain/cdkunif-elasti-rkiuv6bc52rp
CdkUnifiedLogStack.s3bucketname = cdkunifiedlogstack-logsfailederrcapturebucket0bcc-xxxxx
CdkUnifiedLogStack.samplelambdafunction = CdkUnifiedLogStack-LambdatransformerfunctionFA3659-c8u392491FrW

Stack ARN:
arn:aws:cloudformation:us-east-1:xxxx:stack/CdkUnifiedLogStack/6d53ef40-efd2-11eb-9a9d-1230a5204572

AWS CDK takes care of building the required infrastructure, deploying the sample application, and collecting logs from different sources to Amazon OpenSearch Service.

The following is some of the key information about the stack:

  • ec2ipaddress – The public IP address of the EC2 instance, deployed with the sample PHP application
  • ecsloadbalancerurl – The URL of the Amazon ECS Load Balancer, deployed with the httpd application
  • eksclusterClusterNameCE21A0DB – The Amazon EKS cluster name, deployed with the NGINX application
  • samplelambdafunction – The sample Lambda function using the Lambda extension to send logs to Kinesis Data Firehose
  • opensearch-domain-arn – The ARN of the Amazon OpenSearch Service domain

Generate logs

To visualize the logs, you first need to generate some sample logs.

  1. To generate Lambda logs, invoke the function using the following AWS CLI command (run it a few times):
aws lambda invoke \
--function-name "<<samplelambdafunction>>" \
--payload '{"payload": "hello"}' /tmp/invoke-result \
--cli-binary-format raw-in-base64-out \
--log-type Tail

Make sure to replace samplelambdafunction with the actual Lambda function name. The file path needs to be updated based on the underlying operating system.

The function should return "StatusCode": 200, with the following output:

{
    "StatusCode": 200,
    "LogResult": "<<Encoded>>",
    "ExecutedVersion": "$LATEST"
}
  1. Run the following command a couple of times to generate Amazon EC2 logs:
curl http://ec2ipaddress:80

Make sure to replace ec2ipaddress with the public IP address of the EC2 instance.

  1. Run the following command a couple of times to generate Amazon ECS logs:
curl http://ecsloadbalancerurl:80

Make sure to replace ecsloadbalancerurl with the public ARN of the AWS Application Load Balancer.

We deployed the NGINX application with an internal load balancer, so the load balancer hits the health checkpoint of the application, which is sufficient to generate the Amazon EKS access logs.

Visualize the logs

To visualize the logs, complete the following steps:

  1. On the Amazon OpenSearch Service console, choose the hyperlink provided for the OpenSearch Dashboard 7URL.
  2. Configure access to the OpenSearch Dashboard.
  3. Under OpenSearch Dashboard, on the Discover menu, start creating a new index pattern for each compute log.

We can see separate indexes for each compute log partitioned by date, as in the following screenshot.

BDB-1742-create-index

The following screenshot shows the process to create index patterns for Amazon EC2 logs.

BDB-1742-ec2

After you create the index pattern, we can start analyzing the logs using the Discover menu under OpenSearch Dashboard in the navigation pane. This tool provides a single searchable and unified interface for all the records with various compute platforms. We can switch between different logs using the Change index pattern submenu.

BDB-1742-unified

Clean up

Run the following command from the root directory to delete the stack:

cdk destroy

Conclusion

In this post, we showed how to unify and centralize logs across different compute platforms using Kinesis Data Firehose and Amazon OpenSearch Service. This approach allows you to analyze logs quickly and the root cause of failures, using a single platform rather than different platforms for different services.

If you have feedback about this post, submit your comments in the comments section.

Resources

For more information, see the following resources:


About the author

HariHari Ohm Prasath is a Senior Modernization Architect at AWS, helping customers with their modernization journey to become cloud native. Hari loves to code and actively contributes to the open source initiatives. You can find him in Medium, Github & Twitter @hariohmprasath.

balluBallu Singh is a Principal Solutions Architect at AWS. He lives in the San Francisco Bay area and helps customers architect and optimize applications on AWS. In his spare time, he enjoys reading and spending time with his family.

Enforce customized data quality rules in AWS Glue DataBrew

Post Syndicated from Navnit Shukla original https://aws.amazon.com/blogs/big-data/enforce-customized-data-quality-rules-in-aws-glue-databrew/

GIGO (garbage in, garbage out) is a concept common to computer science and mathematics: the quality of the output is determined by the quality of the input. In modern data architecture, you bring data from different data sources, which creates challenges around volume, velocity, and veracity. You might write unit tests for applications, but it’s equally important to ensure the data veracity of these applications, because incoming data quality can make or break your application. Incorrect, missing, or malformed data can have a large impact on production systems. Examples of data quality issues include but are not limited to the following:

  • Missing or incorrect values can lead to failures in the production system that require non-null values
  • Changes in the distribution of data can lead to unexpected outputs of machine learning (ML) models
  • Aggregations of incorrect data can lead to wrong business decisions
  • Incorrect data types have a big impact on financial or scientific institutes

In this post, we introduce data quality rules in AWS Glue DataBrew. DataBrew is a visual data preparation tool that makes it easy to profile and prepare data for analytics and ML. We demonstrate how to use DataBrew to define a list of rules in a new entity called a ruleset. A ruleset is a set of rules that compare different data metrics against expected values.

The post describes the implementation process and provides a step-by-step guide to build data quality checks in DataBrew.

Solution overview

To illustrate our data quality use case, we use a human resources dataset. This dataset contains the following attributes:

Emp ID, Name Prefix, First Name, Middle Initial,Last Name,Gender,E Mail,Father's Name,Mother's Name,Mother's Maiden Name,Date of Birth,Time of Birth,Age in Yrs.,Weight in Kgs.,Date of Joining,Quarter of Joining,Half of Joining,Year of Joining,Month of Joining,Month Name of Joining,Short Month,Day of Joining,DOW of Joining,Short DOW,Age in Company (Years),Salary,Last % Hike,SSN,Phone No. ,Place Name,County,City,State,Zip,Region,User Name,Password

For this post, we downloaded data with 5 million records, but feel free to use a smaller dataset to follow along with this post.

The following diagram illustrates the architecture for our solution.

The steps in this solution are as follows:

  1. Create a sample dataset.
  2. Create a ruleset.
  3. Create data quality rules.
  4. Create a profile job.
  5. Inspect the data quality rules validation results.
  6. Clean the dataset.
  7. Create a DataBrew job.
  8. Validate the data quality check with the updated dataset.

Prerequisites

Before you get started, complete the following prerequisites:

  1. Have an AWS account.
  2. Download the sample dataset.
  3. Extract the CSV file.
  4. Create an Amazon Simple Storage Service (Amazon S3) bucket with three folders: input, output, and profile.
  5. Upload the sample data in input folder to your S3 bucket (for example, s3://<s3 bucket name>/input/).

Create a sample dataset

To create your dataset, complete the following steps:

  1. On the DataBrew console, in the navigation pane, choose Datasets.
  2. Choose Connect new dataset.
  3. For Dataset name, enter a name (for example, human-resource-dataset).
  4. Under Data lake/data store, choose Amazon S3 as your source.
  5. For Enter your source from Amazon S3, enter the S3 bucket location where you uploaded your sample files (for example, s3://<s3 bucket name>/input/).
  6. Under Additional configurations, keep the selected file type CSV and CSV delimiter comma (,).
  7. Scroll to the bottom of the page and choose Create dataset.

The dataset is now available on the Datasets page.

Create a ruleset

We now define data quality rulesets against the dataset created in the previous step.

  1. On the DataBrew console, in the navigation pane, choose DQ Rules.
  2. Choose Create data quality ruleset.
  3. For Ruleset name, enter a name (­for example, human-resource-dataquality-ruleset).
  4. Under Associated dataset, choose the dataset you created earlier.

Create data quality rules

To add data quality rules, you can use rules and add multiple rules, and within each rule, you can define multiple checks.

For this post, we create the following data quality rules and data quality checks within the rules:

  • Row count is correct
  • No duplicate rows
  • Employee ID, email address, and SSN are unique
  • Employee ID and phone number are not be null
  • Employee ID and employee age in years has no negative values
  • SSN data format is correct (123-45-6789)
  • Phone number for string length is correct
  • Region column only has the specified region
  • Employee ID is an integer

Row count is correct

To check the total row count, complete the following steps:

  1. Add a new rule.
  2. For Rule name, enter a name (for example, Check total record count).
  3. For Data quality check scope, choose Individual check for each column.
  4. For Rule success criteria, choose All data quality checks are met (AND).
  5. For Data quality checks¸ choose Number of rows.
  6. For Condition, choose Is equals.
  7. For Value, enter 5000000.

No duplicate rows

To check the dataset for duplicate rows, complete the following steps:

  1. Choose Add another rule.
  2. For Rule name, enter a name (for example, Check dataset for duplicate rows).
  3. For Data quality check scope, choose Individual check for each column.
  4. For Rule success criteria, choose All data quality checks are met (AND).
  5. Under Check 1, for Data quality check¸ choose Duplicate rows.
  6. For Condition, choose Is equals.
  7. For Value, enter 0 and choose rows on the drop-down menu.

Employee ID, email address, and SSN are unique

To check that the employee ID, email, and SSN are unique, complete the following steps:

  1. Choose Add another rule.
  2. For Rule name, enter a name (for example, Check dataset for Unique Values).
  3. For Data quality check scope, choose Common checks for selected columns.
  4. For Rule success criteria, choose All data quality checks are met (AND).
  5. For Selected columns, select Selected columns.
  6. Choose the columns Emp ID, e mail, and SSN.
  7. Under Check 1, for Data quality check, choose Unique values.
  8. For Condition, choose Is equals.
  9. For Value, enter 100 and choose %(percent) rows on the drop-down menu.

Employee ID and phone number are not be null

To check that employee IDs and phone numbers aren’t null, complete the following steps:

  1. Choose Add another rule.
  2. For Rule name, enter a name (for example, Check Dataset for NOT NULL).
  3. For Data quality check scope, choose Common checks for selected columns.
  4. For Rule success criteria, choose All data quality checks are met (AND).
  5. For Selected columns, select Selected columns.
  6. Choose the columns Emp ID and Phone No.
  7. Under Check 1, for Data quality check, choose Value is not missing.
  8. For Condition, choose Greater than equals.
  9. For Threshold, enter 100 and choose %(percent) rows on the drop-down menu.

Employee ID and age in years has no negative values

To check the employee ID and age for positive values, complete the following steps:

  1. Choose Add another rule.
  2. For Rule name, enter a name (for example, Check emp ID and age for positive values).
  3. For Data quality check scope, choose Individual check for each column.
  4. For Rule success criteria, choose All data quality checks are met (AND).
  5. Under Check 1, for Data quality check, choose Numeric values.
  6. Choose Emp ID on the drop-down menu.
  7. For Condition, choose Greater than equals.
  8. For Value, select Custom value and enter 0.
  9. Choose Add another quality check and repeat the same steps for age in years.

SSN data format is correct

To check the SSN data format, complete the following steps:

  1. Choose Add another rule.
  2. For Rule name, enter a name (for example, Check dataset format).
  3. For Data quality check scope, choose Individual check for each column.
  4. For Rule success criteria, choose All data quality checks are met (AND).
  5. Under Check 1, for Data quality check, choose String values.
  6. Choose SSN on the drop-down menu.
  7. For Condition, choose Matches (RegEx pattern).
  8. For RegEx value, enter ^[0-9]{3}-[0-9]{2}-[0-9]{4}$.

Phone number string length is correct

To check the length of the phone number, complete the following steps:

  1. Choose Add another rule.
  2. For Rule name, enter a name (for example, Check Dataset Phone no. for string length).
  3. For Data quality check scope, choose Individual check for each column.
  4. For Rule success criteria, choose All data quality checks are met (AND).
  5. Under Check 1, for Data quality check, choose Value string length.
  6. Choose Phone No on the drop-down menu.
  7. For Condition, choose Greater than equals.
  8. For Value, select Custom value and enter 9.
  9. Under Check 2, for Data quality check, choose Value string length.
  10. Choose Phone No on the drop-down menu.
  11. For Condition, choose Less than equals.
  12. For Value¸ select Custom value and enter 12.

Region column only has the specified region

To check the Region column, complete the following steps:

  1. Choose Add another rule.
  2. For Rule name, enter a name (for example, Check Region column only for specific region).
  3. For Data quality check scope, choose Individual check for each column.
  4. For Rule success criteria, choose All data quality checks are met (AND).
  5. Under Check 1, for Data quality check, choose Value is exactly.
  6. Choose Region on the drop-down menu.
  7. For Value, select Custom value.
  8. Choose the values Midwest, South, West, and Northeast.

Employee ID is an integer

To check that the employee ID is an integer, complete the following steps:

  1. Choose Add another rule.
  2. For Rule name, enter a name (for example, Validate Emp ID is an Integer).
  3. For Data quality check scope, choose Individual check for each column.
  4. For Rule success criteria, choose All data quality checks are met (AND).
  5. Under Check 1, for Data quality check, choose String values.
  6. Choose Emp ID on the drop-down menu.
  7. For Condition, choose Matches (RegEx pattern).
  8. For RegEx value, enter ^[0-9]+$.
  9. After you create all the rules, choose Create ruleset.

Your ruleset is now listed on the Data quality rulesets page.

Create a profile job

To create a profile job with your new ruleset, complete the following steps:

  1. On the Data quality rulesets page, select the ruleset you just created.
  2. Choose Create profile job with ruleset.
  3. For Job name, keep the prepopulated name or enter a new one.
  4. For Data sample, select Full dataset.

The default sample size is important for data quality rules validation, because it matters if you validate all the roles or a limited sample.

  1. Under Job output settings, for S3 location, enter the path to the profile bucket.

If you enter a new bucket name, the folder is created automatically.

  1. Keep the default settings for the remaining optional sections: Data profile configurations, Data quality rules, Advanced job settings, Associated schedules, and Tags.

The next step is to choose or create the AWS Identity and Access Management (IAM) role that grants DataBrew access to read from the input S3 bucket and write to the job output bucket.

  1. For Role name, choose an existing role or choose Create a new IAM role and enter an IAM role suffix.
  2. Choose Create and run job.

For more information about configuring and running DataBrew jobs, see Creating, running, and scheduling AWS Glue DataBrew jobs.

Inspect data quality rules validation results

To inspect the data quality rules, we need to let the profile job complete.

  1. On the Jobs page of the DataBrew console, choose the Profile jobs tab.
  2. Wait until the profile job status changes to Succeeded.
  3. When the job is complete, choose View data profile.

You’re redirected to the Data profile overview tab on the Datasets page.

  1. Choose the Data quality rules tab.

Here you can review the status to your data quality rules. As shown in the following screenshot, eight of the nine data quality rules defined were successful, and one rule failed.

Our failed data quality rule indicates that we found duplicate values for employee ID, SSN, and email.

  1. To confirm that the data has duplicate values, on the Column statistics tab, choose the Emp ID column.
  2. Scroll down to the section Top distinct values.

Similarly, you can check the E Mail and SSN columns to find that those columns also have duplicate values.

Now we have confirmed that our data has duplicate values. The next step is to clean up the dataset and rerun the quality rules validation.

Clean the dataset

To clean the dataset, we first need to create a project.

  1. On the DataBrew console, choose Projects.
  2. Choose Create project.
  3. For Project name, enter a name (for this post, human-resource-project-demo).
  4. For Select a dataset, select My datasets.
  5. Select the human-resource-dataset dataset.
  6. Keep the sampling size at its default.
  7. Under Permissions, for Role name, choose the IAM role that we created previously for our DataBrew profile job.
  8. Choose Create project.

The project takes a few minutes to open. When it’s complete, you can see your data.

Next, we delete the duplicate value from the Emp ID column.

  1. Choose the Emp ID column.
  2. Choose the more options icon (three dots) to view all the transforms available for this column.
  3. Choose Remove duplicate values.
  4. Repeat these steps for the SSN and E Mail columns.

You can now see the three applied steps in the Recipe pane.

Create a DataBrew job

The next step is to create a DataBrew job to run these transforms against the full dataset.

  1. On the project details page, choose Create job.
  2. For Job name, enter a name (for example, human-resource-after-dq-check).
  3. Under Job output settings¸ for File type, choose your final storage format to be CSV.
  4. For S3 location, enter your output S3 bucket location (for example, s3://<s3 bucket name>/output/).
  5. For Compression, choose None.
  6. Under Permissions, for Role name¸ choose the same IAM role we used previously.
  7. Choose Create and run job.
  8. Wait for job to complete; you can monitor the job on the Jobs page.

Validate the data quality check with the corrected dataset

To perform the data quality checks with the corrected dataset, complete the following steps:

  1. Follow the steps outlined earlier to create a new dataset, using the corrected data from the previous section.
  2. Choose the Amazon S3 location of the job output.
  3. Choose Create dataset.
  4. Choose DQ Rules and select the ruleset you created earlier.
  5. On the Actions menu, choose Duplicate.
  6. For Ruleset name, enter a name (for example, human-resource-dataquality-ruleset-on-corrected-dataset).
  7. Select the newly created dataset.
  8. Choose Create data quality ruleset.
  9. After the ruleset is created, select it and choose Create profile job with ruleset.
  10. Create a new profile job.
  11. Choose Create and run job.
  12. When the job is complete, repeat the steps from earlier to inspect the data quality rules validation results.

This time, under Data quality rules, all the rules are passed except Check total record count because you removed duplicate values.

On the Column statistics page, under Top distinct values for the Emp ID column, you can see the distinct values.

You can find similar results for the SSN and E Mail columns.

Clean up

To avoid incurring future charges, we recommend you delete the resources you created during this walkthrough.

Conclusion

As demonstrated in this post, you can use DataBrew to help create data quality rules, which can help you identify any discrepancies in your data. You can also use DataBrew to clean the data and validate it going forwards. You can learn more about AWS Glue DataBrew from here and learn around AWS Glue DataBrew pricing here.


About the Authors

Navnit Shukla is an AWS Specialist Solution Architect, Analytics, and is passionate about helping customers uncover insights from their data. He has been building solutions to help organizations make data-driven decisions.

Harsh Vardhan Singh Gaur is an AWS Solutions Architect, specializing in Analytics. He has over 5 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Create a serverless event-driven workflow to ingest and process Microsoft data with AWS Glue and Amazon EventBridge

Post Syndicated from Venkata Sistla original https://aws.amazon.com/blogs/big-data/create-a-serverless-event-driven-workflow-to-ingest-and-process-microsoft-data-with-aws-glue-and-amazon-eventbridge/

Microsoft SharePoint is a document management system for storing files, organizing documents, and sharing and editing documents in collaboration with others. Your organization may want to ingest SharePoint data into your data lake, combine the SharePoint data with other data that’s available in the data lake, and use it for reporting and analytics purposes. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. AWS Glue provides all the capabilities needed for data integration so that you can start analyzing your data and putting it to use in minutes instead of months.

Organizations often manage their data on SharePoint in the form of files and lists, and you can use this data for easier discovery, better auditing, and compliance. SharePoint as a data source is not a typical relational database and the data is mostly semi structured, which is why it’s often difficult to join the SharePoint data with other relational data sources. This post shows how to ingest and process SharePoint lists and files with AWS Glue and Amazon EventBridge, which enables you to join other data that is available in your data lake. We use SharePoint REST APIs with a standard open data protocol (OData) syntax. OData advocates a standard way of implementing REST APIs that allows for SQL-like querying capabilities. OData helps you focus on your business logic while building RESTful APIs without having to worry about the various approaches to define request and response headers, query options, and so on.

AWS Glue event-driven workflows

Unlike a traditional relational database, SharePoint data may or may not change frequently, and it’s difficult to predict the frequency at which your SharePoint server generates new data, which makes it difficult to plan and schedule data processing pipelines efficiently. Running data processing frequently can be expensive, whereas scheduling pipelines to run infrequently can deliver cold data. Similarly, triggering pipelines from an external process can increase complexity, cost, and job startup time.

AWS Glue supports event-driven workflows, a capability that lets developers start AWS Glue workflows based on events delivered by EventBridge. The main reason to choose EventBridge in this architecture is because it allows you to process events, update the target tables, and make information available to consume in near-real time. Because frequency of data change in SharePoint is unpredictable, using EventBridge to capture events as they arrive enables you to run the data processing pipeline only when new data is available.

To get started, you simply create a new AWS Glue trigger of type EVENT and place it as the first trigger in your workflow. You can optionally specify a batching condition. Without event batching, the AWS Glue workflow is triggered every time an EventBridge rule matches, which may result in multiple concurrent workflows running. AWS Glue protects you by setting default limits that restrict the number of concurrent runs of a workflow. You can increase the required limits by opening a support case. Event batching allows you to configure the number of events to buffer or the maximum elapsed time before firing the particular trigger. When the batching condition is met, a workflow run is started. For example, you can trigger your workflow when 100 files are uploaded in Amazon Simple Storage Service (Amazon S3) or 5 minutes after the first upload. We recommend configuring event batching to avoid too many concurrent workflows, and optimize resource usage and cost.

To illustrate this solution better, consider the following use case for a wine manufacturing and distribution company that operates across multiple countries. They currently host all their transactional system’s data on a data lake in Amazon S3. They also use SharePoint lists to capture feedback and comments on wine quality and composition from their suppliers and other stakeholders. The supply chain team wants to join their transactional data with the wine quality comments in SharePoint data to improve their wine quality and manage their production issues better. They want to capture those comments from the SharePoint server within an hour and publish that data to a wine quality dashboard in Amazon QuickSight. With an event-driven approach to ingest and process their SharePoint data, the supply chain team can consume the data in less than an hour.

Overview of solution

In this post, we walk through a solution to set up an AWS Glue job to ingest SharePoint lists and files into an S3 bucket and an AWS Glue workflow that listens to S3 PutObject data events captured by AWS CloudTrail. This workflow is configured with an event-based trigger to run when an AWS Glue ingest job adds new files into the S3 bucket. The following diagram illustrates the architecture.

To make it simple to deploy, we captured the entire solution in an AWS CloudFormation template that enables you to automatically ingest SharePoint data into Amazon S3. SharePoint uses ClientID and TenantID credentials for authentication and uses Oauth2 for authorization.

The template helps you perform the following steps:

  1. Create an AWS Glue Python shell job to make the REST API call to the SharePoint server and ingest files or lists into Amazon S3.
  2. Create an AWS Glue workflow with a starting trigger of EVENT type.
  3. Configure CloudTrail to log data events, such as PutObject API calls to CloudTrail.
  4. Create a rule in EventBridge to forward the PutObject API events to AWS Glue when they’re emitted by CloudTrail.
  5. Add an AWS Glue event-driven workflow as a target to the EventBridge rule. The workflow gets triggered when the SharePoint ingest AWS Glue job adds new files to the S3 bucket.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Configure SharePoint server authentication details

Before launching the CloudFormation stack, you need to set up your SharePoint server authentication details, namely, TenantID, Tenant, ClientID, ClientSecret, and the SharePoint URL in AWS Systems Manager Parameter Store of the account you’re deploying in. This makes sure that no authentication details are stored in the code and they’re fetched in real time from Parameter Store when the solution is running.

To create your AWS Systems Manager parameters, complete the following steps:

  1. On the Systems Manager console, under Application Management in the navigation pane, choose Parameter Store.
    systems manager
  2. Choose Create Parameter.
  3. For Name, enter the parameter name /DATALAKE/GlueIngest/SharePoint/tenant.
  4. Leave the type as string.
  5. Enter your SharePoint tenant detail into the value field.
  6. Choose Create parameter.
  7. Repeat these steps to create the following parameters:
    1. /DataLake/GlueIngest/SharePoint/tenant
    2. /DataLake/GlueIngest/SharePoint/tenant_id
    3. /DataLake/GlueIngest/SharePoint/client_id/list
    4. /DataLake/GlueIngest/SharePoint/client_secret/list
    5. /DataLake/GlueIngest/SharePoint/client_id/file
    6. /DataLake/GlueIngest/SharePoint/client_secret/file
    7. /DataLake/GlueIngest/SharePoint/url/list
    8. /DataLake/GlueIngest/SharePoint/url/file

Deploy the solution with AWS CloudFormation

For a quick start of this solution, you can deploy the provided CloudFormation stack. This creates all the required resources in your account.

The CloudFormation template generates the following resources:

  • S3 bucket – Stores data, CloudTrail logs, job scripts, and any temporary files generated during the AWS Glue extract, transform, and load (ETL) job run.
  • CloudTrail trail with S3 data events enabled – Enables EventBridge to receive PutObject API call data in a specific bucket.
  • AWS Glue Job – A Python shell job that fetches the data from the SharePoint server.
  • AWS Glue workflow – A data processing pipeline that is comprised of a crawler, jobs, and triggers. This workflow converts uploaded data files into Apache Parquet format.
  • AWS Glue database – The AWS Glue Data Catalog database that holds the tables created in this walkthrough.
  • AWS Glue table – The Data Catalog table representing the Parquet files being converted by the workflow.
  • AWS Lambda function – The AWS Lambda function is used as an AWS CloudFormation custom resource to copy job scripts from an AWS Glue-managed GitHub repository and an AWS Big Data blog S3 bucket to your S3 bucket.
  • IAM roles and policies – We use the following AWS Identity and Access Management (IAM) roles:
    • LambdaExecutionRole – Runs the Lambda function that has permission to upload the job scripts to the S3 bucket.
    • GlueServiceRole – Runs the AWS Glue job that has permission to download the script, read data from the source, and write data to the destination after conversion.
    • EventBridgeGlueExecutionRole – Has permissions to invoke the NotifyEvent API for an AWS Glue workflow.
    • IngestGlueRole – Runs the AWS Glue job that has permission to ingest data into the S3 bucket.

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For pS3BucketName, enter the unique name of your new S3 bucket.
  5. Leave pWorkflowName and pDatabaseName as the default.

cloud formation 1

  1. For pDatasetName, enter the SharePoint list name or file name you want to ingest.
  2. Choose Next.

cloud formation 2

  1. On the next page, choose Next.
  2. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  3. Choose Create.

It takes a few minutes for the stack creation to complete; you can follow the progress on the Events tab.

You can run the ingest AWS Glue job either on a schedule or on demand. As the job successfully finishes and ingests data into the raw prefix of the S3 bucket, the AWS Glue workflow runs and transforms the ingested raw CSV files into Parquet files and loads them into the transformed prefix.

Review the EventBridge rule

The CloudFormation template created an EventBridge rule to forward S3 PutObject API events to AWS Glue. Let’s review the configuration of the EventBridge rule:

  1. On the EventBridge console, under Events, choose Rules.
  2. Choose the rule s3_file_upload_trigger_rule-<CloudFormation-stack-name>.
  3. Review the information in the Event pattern section.

event bridge

The event pattern shows that this rule is triggered when an S3 object is uploaded to s3://<bucket_name>/data/SharePoint/tablename_raw/. CloudTrail captures the PutObject API calls made and relays them as events to EventBridge.

  1. In the Targets section, you can verify that this EventBridge rule is configured with an AWS Glue workflow as a target.

event bridge target section

Run the ingest AWS Glue job and verify the AWS Glue workflow is triggered successfully

To test the workflow, we run the ingest-glue-job-SharePoint-file job using the following steps:

  1. On the AWS Glue console, select the ingest-glue-job-SharePoint-file job.

glue job

  1. On the Action menu, choose Run job.

glue job action menu

  1. Choose the History tab and wait until the job succeeds.

glue job history tab

You can now see the CSV files in the raw prefix of your S3 bucket.

csv file s3 location

Now the workflow should be triggered.

  1. On the AWS Glue console, validate that your workflow is in the RUNNING state.

glue workflow running status

  1. Choose the workflow to view the run details.
  2. On the History tab of the workflow, choose the current or most recent workflow run.
  3. Choose View run details.

glue workflow visual

When the workflow run status changes to Completed, let’s check the converted files in your S3 bucket.

  1. Switch to the Amazon S3 console, and navigate to your bucket.

You can see the Parquet files under s3://<bucket_name>/data/SharePoint/tablename_transformed/.

parquet file s3 location

Congratulations! Your workflow ran successfully based on S3 events triggered by uploading files to your bucket. You can verify everything works as expected by running a query against the generated table using Amazon Athena.

Sample wine dataset

Let’s analyze a sample red wine dataset. The following screenshot shows a SharePoint list that contains various readings that relate to the characteristics of the wine and an associated wine category. This is populated by various wine tasters from multiple countries.

redwine dataset

The following screenshot shows a supplier dataset from the data lake with wine categories ordered per supplier.

supplier dataset

We process the red wine dataset using this solution and use Athena to query the red wine data and supplier data where wine quality is greater than or equal to 7.

athena query and results

We can visualize the processed dataset using QuickSight.

Clean up

To avoid incurring unnecessary charges, you can use the AWS CloudFormation console to delete the stack that you deployed. This removes all the resources you created when deploying the solution.

Conclusion

Event-driven architectures provide access to near-real-time information and help you make business decisions on fresh data. In this post, we demonstrated how to ingest and process SharePoint data using AWS serverless services like AWS Glue and EventBridge. We saw how to configure a rule in EventBridge to forward events to AWS Glue. You can use this pattern for your analytical use cases, such as joining SharePoint data with other data in your lake to generate insights, or auditing SharePoint data and compliance requirements.


About the Author

Venkata Sistla is a Big Data & Analytics Consultant on the AWS Professional Services team. He specializes in building data processing capabilities and helping customers remove constraints that prevent them from leveraging their data to develop business insights.

How MEDHOST’s cardiac risk prediction successfully leveraged AWS analytic services

Post Syndicated from Pandian Velayutham original https://aws.amazon.com/blogs/big-data/how-medhosts-cardiac-risk-prediction-successfully-leveraged-aws-analytic-services/

MEDHOST has been providing products and services to healthcare facilities of all types and sizes for over 35 years. Today, more than 1,000 healthcare facilities are partnering with MEDHOST and enhancing their patient care and operational excellence with its integrated clinical and financial EHR solutions. MEDHOST also offers a comprehensive Emergency Department Information System with business and reporting tools. Since 2013, MEDHOST’s cloud solutions have been utilizing Amazon Web Services (AWS) infrastructure, data source, and computing power to solve complex healthcare business cases.

MEDHOST can utilize the data available in the cloud to provide value-added solutions for hospitals solving complex problems, like predicting sepsis, cardiac risk, and length of stay (LOS) as well as reducing re-admission rates. This requires a solid foundation of data lake and elastic data pipeline to keep up with multi-terabyte data from thousands of hospitals. MEDHOST has invested a significant amount of time evaluating numerous vendors to determine the best solution for its data needs. Ultimately, MEDHOST designed and implemented machine learning/artificial intelligence capabilities by leveraging AWS Data Lab and an end-to-end data lake platform that enables a variety of use cases such as data warehousing for analytics and reporting.

Since you’re reading this post, you may also be interested in the following:

Getting started

MEDHOST’s initial objectives in evaluating vendors were to:

  • Build a low-cost data lake solution to provide cardiac risk prediction for patients based on health records
  • Provide an analytical solution for hospital staff to improve operational efficiency
  • Implement a proof of concept to extend to other machine learning/artificial intelligence solutions

The AWS team proposed AWS Data Lab to architect, develop, and test a solution to meet these objectives. The collaborative relationship between AWS and MEDHOST, AWS’s continuous innovation, excellent support, and technical solution architects helped MEDHOST select AWS over other vendors and products. AWS Data Lab’s well-structured engagement helped MEDHOST define clear, measurable success criteria that drove the implementation of the cardiac risk prediction and analytical solution platform. The MEDHOST team consisted of architects, builders, and subject matter experts (SMEs). By connecting MEDHOST experts directly to AWS technical experts, the MEDHOST team gained a quick understanding of industry best practices and available services allowing MEDHOST team to achieve most of the success criteria at the end of a four-day design session. MEDHOST is now in the process of moving this work from its lower to upper environment to make the solution available for its customers.

Solution

For this solution, MEDHOST and AWS built a layered pipeline consisting of ingestion, processing, storage, analytics, machine learning, and reinforcement components. The following diagram illustrates the Proof of Concept (POC) that was implemented during the four-day AWS Data Lab engagement.

Ingestion layer

The ingestion layer is responsible for moving data from hospital production databases to the landing zone of the pipeline.

The hospital data was stored in an Amazon RDS for PostgreSQL instance and moved to the landing zone of the data lake using AWS Database Migration Service (DMS). DMS made migrating databases to the cloud simple and secure. Using its ongoing replication feature, MEDHOST and AWS implemented change data capture (CDC) quickly and efficiently so MEDHOST team could spend more time focusing on the most interesting parts of the pipeline.

Processing layer

The processing layer was responsible for performing extract, tranform, load (ETL) on the data to curate them for subsequent uses.

MEDHOST used AWS Glue within its data pipeline for crawling its data layers and performing ETL tasks. The hospital data copied from RDS to Amazon S3 was cleaned, curated, enriched, denormalized, and stored in parquet format to act as the heart of the MEDHOST data lake and a single source of truth to serve any further data needs. During the four-day Data Lab, MEDHOST and AWS targeted two needs: powering MEDHOST’s data warehouse used for analytics and feeding training data to the machine learning prediction model. Even though there were multiple challenges, data curation is a critical task which requires an SME. AWS Glue’s serverless nature, along with the SME’s support during the Data Lab, made developing the required transformations cost efficient and uncomplicated. Scaling and cluster management was addressed by the service, which allowed the developers to focus on cleaning data coming from homogenous hospital sources and translating the business logic to code.

Storage layer

The storage layer provided low-cost, secure, and efficient storage infrastructure.

MEDHOST used Amazon S3 as a core component of its data lake. AWS DMS migration tasks saved data to S3 in .CSV format. Crawling data with AWS Glue made this landing zone data queryable and available for further processing. The initial AWS Glue ETL job stored the parquet formatted data to the data lake and its curated zone bucket. MEDHOST also used S3 to store the .CSV formatted data set that will be used to train, test, and validate its machine learning prediction model.

Analytics layer

The analytics layer gave MEDHOST pipeline reporting and dashboarding capabilities.

The data was in parquet format and partitioned in the curation zone bucket populated by the processing layer. This made querying with Amazon Athena or Amazon Redshift Spectrum fast and cost efficient.

From the Amazon Redshift cluster, MEDHOST created external tables that were used as staging tables for MEDHOST data warehouse and implemented an UPSERT logic to merge new data in its production tables. To showcase the reporting potential that was unlocked by the MEDHOST analytics layer, a connection was made to the Redshift cluster to Amazon QuickSight. Within minutes MEDHOST was able to create interactive analytics dashboards with filtering and drill-down capabilities such as a chart that showed the number of confirmed disease cases per US state.

Machine learning layer

The machine learning layer used MEDHOST’s existing data sets to train its cardiac risk prediction model and make it accessible via an endpoint.

Before getting into Data Lab, the MEDHOST team was not intimately familiar with machine learning. AWS Data Lab architects helped MEDHOST quickly understand concepts of machine learning and select a model appropriate for its use case. MEDHOST selected XGBoost as its model since cardiac prediction falls within regression technique. MEDHOST’s well architected data lake enabled it to quickly generate training, testing, and validation data sets using AWS Glue.

Amazon SageMaker abstracted underlying complexity of setting infrastructure for machine learning. With few clicks, MEDHOST started Jupyter notebook and coded the components leading to fitting and deploying its machine learning prediction model. Finally, MEDHOST created the endpoint for the model and ran REST calls to validate the endpoint and trained model. As a result, MEDHOST achieved the goal of predicting cardiac risk. Additionally, with Amazon QuickSight’s SageMaker integration, AWS made it easy to use SageMaker models directly in visualizations. QuickSight can call the model’s endpoint, send the input data to it, and put the inference results into the existing QuickSight data sets. This capability made it easy to display the results of the models directly in the dashboards. Read more about QuickSight’s SageMaker integration here.

Reinforcement layer

Finally, the reinforcement layer guaranteed that the results of the MEDHOST model were captured and processed to improve performance of the model.

The MEDHOST team went beyond the original goal and created an inference microservice to interact with the endpoint for prediction, enabled abstracting of the machine learning endpoint with the well-defined domain REST endpoint, and added a standard security layer to the MEDHOST application.

When there is a real-time call from the facility, the inference microservice gets inference from the SageMaker endpoint. Records containing input and inference data are fed to the data pipeline again. MEDHOST used Amazon Kinesis Data Streams to push records in real time. However, since retraining the machine learning model does not need to happen in real time, the Amazon Kinesis Data Firehose enabled MEDHOST to micro-batch records and efficiently save them to the landing zone bucket so that the data could be reprocessed.

Conclusion

Collaborating with AWS Data Lab enabled MEDHOST to:

  • Store single source of truth with low-cost storage solution (data lake)
  • Complete data pipeline for a low-cost data analytics solution
  • Create an almost production-ready code for cardiac risk prediction

The MEDHOST team learned many concepts related to data analytics and machine learning within four days. AWS Data Lab truly helped MEDHOST deliver results in an accelerated manner.


About the Authors

Pandian Velayutham is the Director of Engineering at MEDHOST. His team is responsible for delivering cloud solutions, integration and interoperability, and business analytics solutions. MEDHOST utilizes modern technology stack to provide innovative solutions to our customers. Pandian Velayutham is a technology evangelist and public cloud technology speaker.

 

 

 

 

George Komninos is a Data Lab Solutions Architect at AWS. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent 3 years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

Deploy data lake ETL jobs using CDK Pipelines

Post Syndicated from Ravi Itha original https://aws.amazon.com/blogs/devops/deploying-data-lake-etl-jobs-using-cdk-pipelines/

Many organizations are building data lakes on AWS, which provides the most secure, scalable, comprehensive, and cost-effective portfolio of services. Like any application development project, a data lake must answer a fundamental question: “What is the DevOps strategy?” Defining a DevOps strategy for a data lake requires extensive planning and multiple teams. This typically requires multiple development and test cycles before maturing enough to support a data lake in a production environment. If an organization doesn’t have the right people, resources, and processes in place, this can quickly become daunting.

What if your data engineering team uses basic building blocks to encapsulate data lake infrastructure and data processing jobs? This is where CDK Pipelines brings the full benefit of infrastructure as code (IaC). CDK Pipelines is a high-level construct library within the AWS Cloud Development Kit (AWS CDK) that makes it easy to set up a continuous deployment pipeline for your AWS CDK applications. The AWS CDK provides essential automation for your release pipelines so that your development and operations team remain agile and focus on developing and delivering applications on the data lake.

In this post, we discuss a centralized deployment solution utilizing CDK Pipelines for data lakes. This implements a DevOps-driven data lake that delivers benefits such as continuous delivery of data lake infrastructure, data processing, and analytical jobs through a configuration-driven multi-account deployment strategy. Let’s dive in!

Data lakes on AWS

A data lake is a centralized repository where you can store all of your structured and unstructured data at any scale. Store your data as is, without having to first structure it, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning in order to guide better decisions. To further explore data lakes, refer to What is a data lake?

We design a data lake with the following elements:

  • Secure data storage
  • Data cataloging in a central repository
  • Data movement
  • Data analysis

The following figure represents our data lake.

Data Lake on AWS

We use three Amazon Simple Storage Service (Amazon S3) buckets:

  • raw – Stores the input data in its original format
  • conformed – Stores the data that meets the data lake quality requirements
  • purpose-built – Stores the data that is ready for consumption by applications or data lake consumers

The data lake has a producer where we ingest data into the raw bucket at periodic intervals. We utilize the following tools: AWS Glue processes and analyzes the data. AWS Glue Data Catalog persists metadata in a central repository. AWS Lambda and AWS Step Functions schedule and orchestrate AWS Glue extract, transform, and load (ETL) jobs. Amazon Athena is used for interactive queries and analysis. Finally, we engage various AWS services for logging, monitoring, security, authentication, authorization, alerting, and notification.

A common data lake practice is to have multiple environments such as dev, test, and production. Applying the IaC principle for data lakes brings the benefit of consistent and repeatable runs across multiple environments, self-documenting infrastructure, and greater flexibility with resource management. The AWS CDK offers high-level constructs for use with all of our data lake resources. This simplifies usage and streamlines implementation.

Before exploring the implementation, let’s gain further scope of how we utilize our data lake.

The solution

Our goal is to implement a CI/CD solution that automates the provisioning of data lake infrastructure resources and deploys ETL jobs interactively. We accomplish this as follows: 1) applying separation of concerns (SoC) design principle to data lake infrastructure and ETL jobs via dedicated source code repositories, 2) a centralized deployment model utilizing CDK pipelines, and 3) AWS CDK enabled ETL pipelines from the start.

Data lake infrastructure

Our data lake infrastructure provisioning includes Amazon S3 buckets, S3 bucket policies, AWS Key Management Service (KMS) encryption keys, Amazon Virtual Private Cloud (Amazon VPC), subnets, route tables, security groups, VPC endpoints, and secrets in AWS Secrets Manager. The following diagram illustrates this.

Data Lake Infrastructure

Data lake ETL jobs

For our ETL jobs, we process New York City TLC Trip Record Data. The following figure displays our ETL process, wherein we run two ETL jobs within a Step Functions state machine.

AWS Glue ETL Jobs

Here are a few important details:

  1. A file server uploads files to the S3 raw bucket of the data lake. The file server is a data producer and source for the data lake. We assume that the data is pushed to the raw bucket.
  2. Amazon S3 triggers an event notification to the Lambda function.
  3. The function inserts an item in the Amazon DynamoDB table in order to track the file processing state. The first state written indicates the AWS Step Function start.
  4. The function starts the state machine.
  5. The state machine runs an AWS Glue job (Apache Spark).
  6. The job processes input data from the raw zone to the data lake conformed zone. The job also converts CSV input data to Parquet formatted data.
  7. The job updates the Data Catalog table with the metadata of the conformed Parquet file.
  8. A second AWS Glue job (Apache Spark) processes the input data from the conformed zone to the purpose-built zone of the data lake.
  9. The job fetches ETL transformation rules from the Amazon S3 code bucket and transforms the input data.
  10. The job stores the result in Parquet format in the purpose-built zone.
  11. The job updates the Data Catalog table with the metadata of the purpose-built Parquet file.
  12. The job updates the DynamoDB table and updates the job status to completed.
  13. An Amazon Simple Notification Service (Amazon SNS) notification is sent to subscribers that states the job is complete.
  14. Data engineers or analysts can now analyze data via Athena.

We will discuss data formats, Glue jobs, ETL transformation logics, data cataloging, auditing, notification, orchestration, and data analysis in more detail in AWS CDK Pipelines for Data Lake ETL Deployment GitHub repository. This will be discussed in the subsequent section.

Centralized deployment

Now that we have data lake infrastructure and ETL jobs ready, let’s define our deployment model. This model is based on the following design principles:

  • A dedicated AWS account to run CDK pipelines.
  • One or more AWS accounts into which the data lake is deployed.
  • The data lake infrastructure has a dedicated source code repository. Typically, data lake infrastructure is a one-time deployment and rarely evolves. Therefore, a dedicated code repository provides a landing zone for your data lake.
  • Each ETL job has a dedicated source code repository. Each ETL job may have unique AWS service, orchestration, and configuration requirements. Therefore, a dedicated source code repository will help you more flexibly build, deploy, and maintain ETL jobs.

We organize our source code repo into three branches: dev (main), test, and prod. In the deployment account, we manage three separate CDK Pipelines and each pipeline is sourced from a dedicated branch. Here we choose a branch-based software development method in order to demonstrate the strategy in more complex scenarios where integration testing and validation layers require human intervention. As well, these may not immediately follow with a corresponding release or deployment due to their manual nature. This facilitates the propagation of changes through environments without blocking independent development priorities. We accomplish this by isolating resources across environments in the central deployment account, allowing for the independent management of each environment, and avoiding cross-contamination during each pipeline’s self-mutating updates. The following diagram illustrates this method.

Centralized deployment

 

Note: This centralized deployment strategy can be adopted for trunk-based software development with minimal solution modification.

Deploying data lake ETL jobs

The following figure illustrates how we utilize CDK Pipelines to deploy data lake infrastructure and ETL jobs from a central deployment account. This model follows standard nomenclature from the AWS CDK. Each repository represents a cloud infrastructure code definition. This includes the pipelines construct definition. Pipelines have one or more actions, such as cloning the source code (source action) and synthesizing the stack into an AWS CloudFormation template (synth action). Each pipeline has one or more stages, such as testing and deploying. In an AWS CDK app context, the pipelines construct is a stack like any other stack. Therefore, when the AWS CDK app is deployed, a new pipeline is created in AWS CodePipeline.

This provides incredible flexibility regarding DevOps. In other words, as a developer with an understanding of AWS CDK APIs, you can harness the power and scalability of AWS services such as CodePipeline, AWS CodeBuild, and AWS CloudFormation.

Deploying data lake ETL jobs using CDK Pipelines

Here are a few important details:

  1. The DevOps administrator checks in the code to the repository.
  2. The DevOps administrator (with elevated access) facilitates a one-time manual deployment on a target environment. Elevated access includes administrative privileges on the central deployment account and target AWS environments.
  3. CodePipeline periodically listens to commit events on the source code repositories. This is the self-mutating nature of CodePipeline. It’s configured to work with and can update itself according to the provided definition.
  4. Code changes made to the main repo branch are automatically deployed to the data lake dev environment.
  5. Code changes to the repo test branch are automatically deployed to the test environment.
  6. Code changes to the repo prod branch are automatically deployed to the prod environment.

CDK Pipelines starter kits for data lakes

Want to get going quickly with CDK Pipelines for your data lake? Start by cloning our two GitHub repositories. Here is a summary:

AWS CDK Pipelines for Data Lake Infrastructure Deployment

This repository contains the following reusable resources:

  • CDK Application
  • CDK Pipelines stack
  • CDK Pipelines deploy stage
  • Amazon VPC stack
  • Amazon S3 stack

It also contains the following automation scripts:

  • AWS environments configuration
  • Deployment account bootstrapping
  • Target account bootstrapping
  • Account secrets configuration (e.g., GitHub access tokens)

AWS CDK Pipelines for Data Lake ETL Deployment

This repository contains the following reusable resources:

  • CDK Application
  • CDK Pipelines stack
  • CDK Pipelines deploy stage
  • Amazon DynamoDB stack
  • AWS Glue stack
  • AWS Step Functions stack

It also contains the following:

  • AWS Lambda scripts
  • AWS Glue scripts
  • AWS Step Functions State machine script

Advantages

This section summarizes some of the advantages offered by this solution.

Scalable and centralized deployment model

We utilize a scalable and centralized deployment model to deliver end-to-end automation. This allows DevOps and data engineers to use the single responsibility principal while maintaining precise control over the deployment strategy and code quality. The model can readily be expanded to more accounts, and the pipelines are responsive to custom controls within each environment, such as a production approval layer.

Configuration-driven deployment

Configuration in the source code and AWS Secrets Manager allow deployments to utilize targeted values that are declared globally in a single location. This provides consistent management of global configurations and dependencies such as resource names, AWS account Ids, Regions, and VPC CIDR ranges. Similarly, the CDK Pipelines export outputs from CloudFormation stacks for later consumption via other resources.

Repeatable and consistent deployment of new ETL jobs

Continuous integration and continuous delivery (CI/CD) pipelines allow teams to deploy to production more frequently. Code changes can be safely and securely propagated through environments and released for deployment. This allows rapid iteration on data processing jobs, and these jobs can be changed in isolation from pipeline changes, resulting in reliable workflows.

Cleaning up

You may delete the resources provisioned by utilizing the starter kits. You can do this by running the cdk destroy command using AWS CDK Toolkit. For detailed instructions, refer to the Clean up sections in the starter kit README files.

Conclusion

In this post, we showed how to utilize CDK Pipelines to deploy infrastructure and data processing ETL jobs of your data lake in dev, test, and production AWS environments. We provided two GitHub repositories for you to test and realize the full benefits of this solution first hand. We encourage you to fork the repositories, bring your ETL scripts, bootstrap your accounts, configure account parameters, and continuously delivery your data lake ETL jobs.

Let’s stay in touch via the GitHub—AWS CDK Pipelines for Data Lake Infrastructure Deployment and AWS CDK Pipelines for Data Lake ETL Deployment.


About the authors

Ravi Itha

Ravi Itha is a Sr. Data Architect at AWS. He works with customers to design and implement Data Lakes, Analytics, and Microservices on AWS. He is an open-source committer and has published more than a dozen solutions using AWS CDK, AWS Glue, AWS Lambda, AWS Step Functions, Amazon ECS, Amazon MQ, Amazon SQS, Amazon Kinesis Data Streams, and Amazon Kinesis Data Analytics for Apache Flink. His solutions can be found at his GitHub handle. Outside of work, he is passionate about books, cooking, movies, and yoga.

 

 

Isaiah Grant

Isaiah Grant is a Cloud Consultant at 2nd Watch. His primary function is to design architectures and build cloud-based applications and services. He leads customer engagements and helps customers with enterprise cloud adoptions. In his free time, he is engaged in local community initiatives and enjoys being outdoors with his family.

 

 

 

 

Zahid Ali

Zahid Ali is a Data Architect at AWS. He helps customers design, develop, and implement data warehouse and Data Lake solutions on AWS. Outside of work he enjoys playing tennis, spending time outdoors, and traveling.

 

Query a Teradata database using Amazon Athena Federated Query and join with data in your Amazon S3 data lake

Post Syndicated from Navnit Shukla original https://aws.amazon.com/blogs/big-data/query-a-teradata-database-using-amazon-athena-federated-query-and-join-with-data-in-your-amazon-s3-data-lake/

If you use data lakes in Amazon Simple Storage Service (Amazon S3) and use Teradata as your transactional data store, you may need to join the data in your data lake with Teradata in the cloud, Teradata running on Amazon Elastic Compute Cloud (Amazon EC2), or with an on-premises Teradata database, for example to build a dashboard or create consolidated reporting.

In these use cases, the Amazon Athena Federated Query feature allows you to seamlessly access the data from Teradata database without having to move the data to your S3 data lake. This removes the overhead in managing such jobs.

In this post, we will walk you through a step-by-step configuration to set up Athena Federated Query using AWS Lambda to access data in a Teradata database running on premises.

For this post, we will be using the Oracle Athena Federated Query connector developed by Trianz. The runtime includes a Teradata instance on premises. Your Teradata instance can be on the cloud, on Amazon EC2, or on premises. You can deploy the Trianz Oracle Athena Federated Query connector from the AWS Serverless Application Repository.

Let’s start with discussing the solution and then detailing the steps involved.

Solution overview

Data federation is the capability to integrate data in another data store using a single interface (Athena). The following diagram depicts how Athena Federated Query works by using Lambda to integrate with a federated data source.

Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. If you have data in sources other than Amazon S3, you can use Athena Federated Query to query the data in place or build pipelines to extract data from multiple data sources and store them in Amazon S3. With Athena Federated Query, you can run SQL queries across data stored in relational, non-relational, object, and custom data sources.

When a federated query is run, Athena identifies the parts of the query that should be routed to the data source connector and executes them with Lambda. The data source connector makes the connection to the source, runs the query, and returns the results to Athena. If the data doesn’t fit into Lambda RAM runtime memory, it spills the data to Amazon S3 and is later accessed by Athena.

Athena uses data source connectors which internally use Lambda to run federated queries. Data source connectors are pre-built and can be deployed from the Athena console or from the Serverless Application Repository. Based on the user submitting the query, connectors can provide or restrict access to specific data elements.

To implement this solution, we complete the following steps:

  1. Create a secret for the Teradata instance using AWS Secrets Manager.
  2. Create an S3 bucket and subfolder for Lambda to use.
  3. Configure Athena federation with the Teradata instance.
  4. Run federated queries with Athena.

Prerequisite

Before you start this walkthrough, make sure your Teradata database is up and running.

Create a secret for the Teradata instance

Our first step is to create a secret for the Teradata instance with a username and password using Secrets Manager.

  1. On the Secrets Manager console, choose Secrets.
  2. Choose Store a new secret.
  3. Select Other types of secrets.
  4. Set the credentials as key-value pairs (username, password) for your Teradata instance.

  1. For Secret name, enter a name for your secret. Use the prefix TeradataAFQ so it’s easy to find.
  2. Leave the remaining fields at their defaults and choose Next.
  3. Complete your secret creation.

Set up your S3 bucket for Lambda

On the Amazon S3 console, create a new S3 bucket and subfolder for Lambda to use. For this post, we create athena-accelerator/teradata.

Configure Athena federation with the Teradata instance

To configure Athena federation with Teradata instance, complete the following steps:

  1. On the AWS Serverless Application Repository console, choose Available applications.
  2. Select Show apps that create custom IAM roles or resource policies.
  3. In the search field, enter TrianzTeradataAthenaJDBC.
  4. Choose the application.

  1. For SecretNamePrefix, enter TeradataAFQ.
  2. For SpillBucket, enter Athena-accelerator/teradata.
  3. For JDBCConnectorConfig, use the format teradata://jdbc:teradata://hostname/user=testUser&password=testPassword.
  4. For DisableSpillEncryption, enter false.
  5. For LambdaFunctionName, enter teradataconnector.
  6. For SecurityGroupID, enter the security group ID where the Teradata instance is deployed.

Make sure to apply valid inbound and outbound rules based on your connection.

  1. For SpillPrefix, create a folder under the S3 bucket you created and specify the name (for example, athena-spill).
  2. For Subnetids, use the subnets where the Teradata instance is running with comma separation.

Make sure the subnet is in a VPC and has NAT gateway and internet gateway attached.

  1. Select the I acknowledge check box.
  2. Choose Deploy.

Make sure that the AWS Identity and Access Management (IAM) roles have permissions to access AWS Serverless Application Repository, AWS CloudFormation, Amazon S3, Amazon CloudWatch, Amazon CloudTrail, Secrets Manager, Lambda, and Athena. For more information about Athena IAM access, see Example IAM Permissions Policies to Allow Athena Federated Query.

Run federated queries with Athena

Run your queries using lambda:teradataconnector to run against tables in the Teradata database. teradataconnector is the name of lambda function which we have created in step 7 of previous section of this blog.

lambda:teradataconnector references a data source connector Lambda function using the format lambda:MyLambdaFunctionName. For more information, see Writing Federated Queries.

The following screenshot shows the query that joins the dataset between Teradata and the S3 data lake.

Key performance best practices

If you’re considering Athena Federated Query with Teradata, we recommend the following best practices:

  • Athena Federated query works great for queries with predicate filtering because the predicates are pushed down to the Teradata database. Use filter and limited-range scans in your queries to avoid full table scans.
  • If your SQL query requires returning a large volume of data from the Teradata database to Athena (which could lead to query timeouts or slow performance), you may consider moving data from Teradata to your S3 data lake.
  • The star schema is a commonly used data model in Teradata. In the star schema model, unload your large fact tables into your S3 data lake and leave the dimension tables in Teradata. If large dimension tables are contributing to slow performance or query timeouts, unload those tables to your S3 data lake.
  • When you run federated queries, Athena spins up multiple Lambda functions, which causes a spike in database connections. It’s important to monitor the Teradata database WLM queue slots to ensure there is no queuing. Additionally, you can use concurrency scaling on your Teradata database cluster to benefit from concurrent connections to queue up.

Conclusion

In this post, you learned how to configure and use Athena Federated Query with Teradata. Now you don’t need to wait for all the data in your Teradata data warehouse to be unloaded to Amazon S3 and maintained on a day-to-day basis to run your queries.

You can use the best practices outlined in the post to help minimize the data transferred from Teradata for better performance. When queries are well written for Athena Federated Query, the performance penalties are negligible.

For more information, see the Athena User Guide and Using Amazon Athena Federated Query.


About the Author

Navnit Shukla is an AWS Specialist Solution Architect in Analytics. He is passionate about helping customers uncover insights from their data. He has been building solutions to help organizations make data-driven decisions.

Query Snowflake using Athena Federated Query and join with data in your Amazon S3 data lake

Post Syndicated from Navnit Shukla original https://aws.amazon.com/blogs/big-data/query-snowflake-using-athena-federated-query-and-join-with-data-in-your-amazon-s3-data-lake/

If you use data lakes in Amazon Simple Storage Service (Amazon S3) and use Snowflake as your data warehouse solution, you may need to join your data in your data lake with Snowflake. For example, you may want to build a dashboard by joining historical data in your Amazon S3 data lake and the latest data in your Snowflake data warehouse or create consolidated reporting.

In such use cases, Amazon Athena Federated Query allows you to seamlessly access the data from Snowflake without building ETL pipelines to copy or unload the data to the S3 data lake or Snowflake. This removes the overhead of creating additional extract, transform, and load (ETL) processes and shortens the development cycle.

In this post, we will walk you through a step-by-step configuration to set up Athena Federated Query using AWS Lambda to access data in a Snowflake data warehouse.

For this post, we are using the Snowflake connector for Amazon Athena developed by Trianz.

Let’s start with discussing the solution and then detailing the steps involved.

Solution overview

Data Federation refers to the capability to query data in another data store using a single interface (Amazon Athena). The following diagram depicts how a single Amazon Athena federated query uses Lambda to query the underlying data source and parallelizes execution across many workers.

Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. If you have data in sources other than Amazon S3, you can use Athena Federated Query to query the data in place or build pipelines to extract data from multiple data sources and store them in Amazon S3. With Athena Federated Query, you can run SQL queries across data stored in relational, non-relational, object, and custom data sources.

When a federated query is run, Athena identifies the parts of the query that should be routed to the data source connector and executes them with Lambda. The data source connector makes the connection to the source, runs the query, and returns the results to Athena. If the data doesn’t fit into Lambda RAM runtime memory, it spills the data to Amazon S3 and is later accessed by Athena.

Athena uses data source connectors which internally use Lambda to run federated queries. Data source connectors are pre-built and can be deployed from the Athena console or from the Serverless Application Repository. Based on the user submitting the query, connectors can provide or restrict access to specific data elements.

To implement this solution, we complete the following steps:

  1. Create a secret for the Snowflake instance using AWS Secrets Manager.
  2. Create an S3 bucket and subfolder for Lambda to use.
  3. Configure Athena federation with the Snowflake instance.
  4. Run federated queries with Athena.

Prerequisites

Before getting started, make sure you have a Snowflake data warehouse up and running.

Create a secret for the Snowflake instance

Our first step is to create a secret for the Snowflake instance with a username and password using Secrets Manager.

  1. On the Secrets Manager console, choose Secrets.
  2. Choose Store a new secret.
  3. Select Other types of secrets.
  4. Enter the credentials as key-value pairs (username, password) for your Snowflake instance.
  5. For Secret name, enter a name for your secret. Use the prefix snowflake so it’s easy to find.

  1. Leave the remaining fields at their defaults and choose Next.
  2. Complete your secret creation.

Create an S3 bucket for Lambda

On the Amazon S3 console, create a new S3 bucket and subfolder for Lambda to use. For this post, we use athena-accelerator/snowflake.

Configure Athena federation with the Snowflake instance

To configure Athena data source connector for Snowflake with your Snowflake instance, complete the following steps:

  1. On the AWS Serverless Application Repository console, choose Available applications.
  2. In the search field, enter TrianzSnowflakeAthenaJDBC.

  1. For Application name, enter TrianzSnowflakeAthenaJDBC.
  2. For SecretNamePrefix, enter trianz-snowflake-athena.
  3. For SpillBucket, enter Athena-accelerator/snowflake.
  4. For JDBCConnectorConfig, use the format snowflake://jdbc:snowflake://{snowflake_instance_url}/?warehouse={warehousename}&db={databasename}&schema={schemaname}&${secretname}

For example, we enter snowflake://jdbc:snowflake://trianz.snowflakecomputing.com/?warehouse=ATHENA_WH&db=ATHENA_DEV&schema=ATHENA&${trianz-snowflake-athena}DisableSpillEncyption – False

  1. For LambdaFunctionName, enter trsnowflake.
  2. For SecurityGroupID, enter the security group ID where the Snowflake instance is deployed.

Make sure to apply valid inbound and outbound rules based on your connection.

  1. For SpillPrefix, create a folder under the S3 bucket you created and specify the name (for example, athena-spill).
  2. For Subnetids, use the subnets where the Snowflake instance is running with comma separation.

Make sure the subnet is in a VPC and has NAT gateway and internet gateway attached.

  1. Select the I acknowledge check box.
  2. Choose Deploy.

Make sure that the AWS Identity and Access Management (IAM) roles have permissions to access AWS Serverless Application Repository, AWS CloudFormation, Amazon S3, Amazon CloudWatch, AWS CloudTrail, Secrets Manager, Lambda, and Athena. For more information, see Example IAM Permissions Policies to Allow Athena Federated Query.

Run federated queries with Athena

Before running your federated query, be sure that you have selected Athena engine version 2. The current Athena engine version for any workgroup can be found in the Athena console page.

Run your federated queries using lambda:trsnowflake to run against tables in the Snowflake database. This is the name of lambda function which we have created in step 7 of previous section of this blog.

lambda:trsnowflake is a reference data source connector Lambda function using the format lambda:MyLambdaFunctionName. For more information, see Writing Federated Queries.

The following screenshot is a unionall query example of data in Amazon S3 with a table in the AWS Glue Data Catalog and a table in Snowflake.

Key performance best practices

If you’re considering Athena Federated Query with Snowflake, we recommend the following best practices:

  • Athena Federated query works great for queries with predicate filtering because the predicates are pushed down to the Snowflake database. Use filter and limited-range scans in your queries to avoid full table scans.
  • If your SQL query requires returning a large volume of data from Snowflake to Athena (which could lead to query timeouts or slow performance), you may consider copying data from Snowflake to your S3 data lake.
  • The Snowflake schema, which is an extension of the star schema, is used as a data model in Snowflake. In the Snowflake schema model, unload your large fact tables into your S3 data lake and leave the dimension tables in Snowflake. If large dimension tables are contributing to slow performance or query timeouts, unload those tables to your S3 data lake.
  • When you run federated queries, Athena spins up multiple Lambda functions, which causes a spike in database connections. It’s important to monitor the Snowflake database WLM queue slots to ensure there is no queuing. Additionally, you can use concurrency scaling on your Snowflake database cluster to benefit from concurrent connections to queue up.

Conclusion

In this post, you learned how to configure and use Athena federated with Snowflake using Lambda. With Athena Federated query user can leverage all of their data to produce analytics, derive business value without building ETL pipelines to bring data from different datastore such as Snowflake to Data Lake.

You can use the best practice considerations outlined in the post to help minimize the data transferred from Snowflake for better performance. When queries are well written for federation, the performance penalties are negligible.

For more information, see the Athena User Guide and Using Amazon Athena Federated Query.


About the Author

Navnit Shukla is AWS Specialist Solution Architect in Analytics. He is passionate about helping customers uncover insights from their data. He has been building solutions to help organizations make data-driven decisions.

Effective data lakes using AWS Lake Formation, Part 1: Getting started with governed tables

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-1-effective-data-lakes-using-aws-lake-formation-part-1-getting-started-with-governed-tables/

Thousands of customers are building their data lakes on Amazon Simple Storage Service (Amazon S3). You can use AWS Lake Formation to build your data lakes easily—in a matter of days as opposed to months. However, there are still some difficult challenges to address with your data lakes:

  • Supporting streaming updates and deletes in your data lakes, for example, database replication, and supporting privacy regulations such as GDPR and CCPA
  • Achieving fine-grained secure sharing not only with table- or column-level access control, but with row-level access control
  • Optimizing the layout of various tables and files on Amazon S3 to improve analytics performance

We announced Lake Formation transactions, row-level security, and acceleration for preview at AWS re:Invent 2020. These capabilities are available via new, open, and public update and access APIs for data lakes. These APIs extend the governance capabilities of Lake Formation with row-level security, and provide transactions semantics on data lakes.

In this series of the posts, we provide a step-by-step instruction to use these new Lake Formation features. In this post, we focus on the first step of setting up governed tables.

Lake Formations transactions, row-level security, and acceleration are currently available for preview in the US East (N. Virginia) AWS Region. To get early access to these capabilities, sign up for the preview. You need to be approved for the preview to gain access to these features.

Governed Table

The Data Catalog supports a new type of metadata tables: governed tables. Governed tables are unique to Lake Formation. Governed tables are a new Amazon S3 table type that supports atomic, consistent, isolated, and durable (ACID) transactions. Lake Formation transactions simplify ETL script and workflow development, and allow multiple users to concurrently and reliably insert, delete, and modify rows across multiple governed tables. Lake Formation automatically compacts and optimizes storage of governed tables in the background to improve query performance. When you create a table, you can specify whether or not the table is governed.

Setting up resources with AWS CloudFormation

In this post, I demonstrate how you can create a new governed table using existing data on Amazon S3. We use the Amazon Customer Reviews Dataset, which is stored in a public S3 bucket as sample data. You don’t need to copy the data to your bucket or worry about Amazon S3 storage costs. You can just set up a governed table pointing to this existing public data to see how it works.

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs. If you prefer setting up resources on the AWS Management Console rather than AWS CloudFormation, see the instructions in the appendix at the end of this post.

The CloudFormation template generates the following resources:

To create your resources, complete the following steps:

  1. Sign in to the CloudFormation console in us-east-1 Region.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For DatalakeAdminUserNameand DatalakeAdminUserPassword, enter your IAM user name and password for data lake admin user.
  5. For DataAnalystUserNameand DataAnalystUserPassword, enter your IAM user name and password for data analyst user.
  6. For DatabaseName, leave as the default.
  7. Choose Next.
  8. On the next page, choose Next.
  9. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  10. Choose Create.

Stack creation can take up to 2 minutes.

Setting up a governed table

Now you can create and configure your first governed table in AWS Lake Formation.

Creating a governed table

To create your governed table, complete the following steps:

  1. Sign in to the Lake Formation console in us-east-1 Region using the DatalakeAdmin1 user.
  2. Choose Tables.
  3. Choose Create table.
  4. For Name, enter amazon_reviews_governed.
  5. For Database, enter lakeformation_tutorial_amazon_reviews.
  6. Select Enable governed data access and management.
  7. Select Enable row based permissions.

Select Enable row based permissions.

    1. For Data is located in, choose Specified path in another account.
    2. Enter the path s3://amazon-reviews-pds/parquet/.
    3. For Classification, choose PARQUET.
    4. Choose Upload Schema.
    5. Enter the following JSON array into the text box:
[
    {
        "Name": "marketplace",
        "Type": "string"
    },
    {
        "Name": "customer_id",
        "Type": "string"
    },
    {
        "Name": "review_id",
        "Type": "string"
    },
    {
        "Name": "product_id",
        "Type": "string"
    },
    {
        "Name": "product_parent",
        "Type": "string"
    },
    {
        "Name": "product_title",
        "Type": "string"
    },
    {
        "Name": "star_rating",
        "Type": "int"
    },
    {
        "Name": "helpful_votes",
        "Type": "int"
    },
    {
        "Name": "total_votes",
        "Type": "int"
    },
    {
        "Name": "vine",
        "Type": "string"
    },
    {
        "Name": "verified_purchase",
        "Type": "string"
    },
    {
        "Name": "review_headline",
        "Type": "string"
    },
    {
        "Name": "review_body",
        "Type": "string"
    },
    {
        "Name": "review_date",
        "Type": "bigint"
    },
    {
        "Name": "year",
        "Type": "int"
    }
]
  1. Choose Upload.
  2. Choose Add column.
  3. For Column name, enter product_category.
  4. For Data type, choose String.
  5. Select Partition Key.
  6. Choose Add.
  7. Choose Submit.

Now you can see that the new governed table has been created.

When you choose the table name, you can see the details of the governed table, and you can also see Governance: Enabled in this view. It means that it’s a Lake Formation governed table. If you have other existing tables, it should show as Governance: Disabled because the tables are not governed tables.
Now you can see that the new governed table has been created.

You can also see lakeformation.aso.status: true under Table properties. It means that automatic compaction is enabled for this table. For this post, we use a read-only table and don’t utilize automatic compaction. To disable the automatic compaction, complete the following steps:

  1. Choose Edit table.
  2. Deselect Automatic compaction.
  3. Choose Save.

Currently, no data and no partitions are registered to this governed table. In the next step, we register existing S3 objects to the governed table using Lake Formation manifest APIs.

Even if you locate your data in the table location of the governed table, the data isn’t recognized yet. To make the governed table aware of the data, you need to make a Lake Formation API call, or use an AWS Glue job with Lake Formation transactions.

Even if you locate your data in the table location of the governed table, the data isn’t recognized yet.

Configuring Lake Formation permissions

You need to grant Lake Formation permissions for your governed table. Complete the following steps:

Table-level permissions

  1. Sign in to the Lake Formation console in us-east-1 Region using the DatalakeAdmin1 user.
  2. Under Permissions, choose Data permissions.
  3. Under Data permission, choose Grant.
  4. For Database, choose lakeformation_tutorial_amazon_reviews.
  5. For Table, choose amazon_reviews_governed.
  6. For IAM users and roles, choose the role LFRegisterLocationServiceRole-<CloudFormation stack name> and the user DatalakeAdmin1.
  7. Select Table permissions.
  8. Under Table permissions, select Alter, Insert, Drop, Delete, Select, and Describe.
  9. Choose Grant.
  10. Under Data permission, choose Grant.
  11. For Database, choose lakeformation_tutorial_amazon_reviews.
  12. For Table, choose amazon_reviews_governed.
  13. For IAM users and roles, choose the user DataAnalyst1.
  14. Under Table permissions, select Select and Describe.
  15. Choose Grant.

Row-level permissions

  1. Under Permissions, choose Data permissions.
  2. Under Data permission, choose Grant.
  3. For Database, choose lakeformation_tutorial_amazon_reviews.
  4. For Table, choose amazon_reviews_governed.
  5. For IAM users and roles, choose the role LFRegisterLocationServiceRole-<CloudFormation stack name>, the users DatalakeAdmin1 and DataAnalyst.
  6. Select Row-based permissions.
  7. For Filter name, enter allowAll.
  8. For Choose filter type, select Allow access to all rows.
  9. Choose Grant.

Adding table objects into the governed table

To register S3 objects to a governed table, you need to call the UpdateTableObjects API needs for the objects. You can call it using the AWS Command Line Interface (AWS CLI) and SDK, and also the AWS Glue ETL library (the API is called implicitly in the library). For this post, we use the AWS CLI to explain the behavior in the API level. If you don’t have the AWS CLI, see Installing, updating, and uninstalling the AWS CLI. You also need to install the service model file provided in the Lake Formation preview program. You need to run the following commands using DatalakeAdmin1 user’s credential (or an IAM role or user where sufficient permissions are granted).

First, begin a new transaction with the BeginTransaction API:

$ aws lakeformation-preview begin-transaction
{
    "TransactionId": "7e5d506a757f32252ae3402a10191b13bfd1d7aa1c26a099d4a1911241589b8f"
}

Now you can register any files on the location. For this post, we choose one sample partition product_category=Camera from the amazon-reviews-pds table, and choose one file under this partition. Uri, ETag, and Size are the required information for further steps, so you need to copy them.

$ aws s3 ls s3://amazon-reviews-pds/parquet/product_category=Camera/
2018-04-09 15:37:05   65386769 part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:06   65619234 part-00001-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:06   64564669 part-00002-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:07   65148225 part-00003-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:07   65227429 part-00004-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:07   65269357 part-00005-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:08   65595867 part-00006-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:08   65012056 part-00007-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:09   65137504 part-00008-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:09   64992488 part-00009-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet

$ aws s3api head-object --bucket amazon-reviews-pds --key parquet/product_category=Camera/part-00004-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
{
    "AcceptRanges": "bytes",
    "LastModified": "Mon, 09 Apr 2018 06:37:07 GMT",
    "ContentLength": 65227429,
    "ETag": "\"980669fcf6ccf31d2d686b9cccdd45e3-8\"",
    "ContentType": "binary/octet-stream",
    "Metadata": {}
}

Create a new file named write-operations1.json and enter the following JSON: (replace Uri, ETag, and Size with the values you copied.)

[
    {
        "AddObject": {
            "Uri": "s3://amazon-reviews-pds/parquet/product_category=Camera/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet",
            "ETag": "d4c25c40f33071620fb31cf0346ed2ec-8",
            "Size": 65386769,
            "PartitionValues": [
                "Camera"
            ]
        }
    }
]

Let’s register an existing object on the bucket to the governed table by making an UpdateTableObjects API call using write-operations1.json you created. (replace <transaction-id> with the transaction id you got in begin-transaction command.)

$ aws lakeformation-preview update-table-objects --database-name lakeformation_tutorial_amazon_reviews --table-name amazon_reviews_governed --transaction-id <transaction-id> --write-operations file://./write-operations1.json$ 
Note current date time right after making the UpdateTableObjects API call here. We use this timestamp for time travel queries later.
$ date -u
Tue Feb  2 12:12:00 UTC 2021

You can ensure the change before the transaction commit by making the GetTableObjects API call with the same transaction ID: (Replace <transaction-id> with the id you got in begin-transaction command.)

$ aws lakeformation-preview get-table-objects --database-name lakeformation_tutorial_amazon_reviews --table-name amazon_reviews_governed --transaction-id <transaction-id>
{
    "Objects": [
        {
            "PartitionValues": [
                "Camera"
            ],
            "Objects": [
                {
                    "Uri": "s3://amazon-reviews-pds/parquet/product_category=Camera/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet",
                    "ETag": "d4c25c40f33071620fb31cf0346ed2ec-8",
                    "Size": 65386769
                }
            ]
        }
    ]
}

To make this data available for other transactions, you need to call the CommitTransaction API: (replace <transaction-id> with the transaction id you got in begin-transaction command.)

$ aws lakeformation-preview commit-transaction --transaction-id <transaction-id>
After running the preceding command, you can see the partition on the Lake Formation console.

After running the preceding command, you can see the partition on the Lake Formation console.

Let’s add one more partition into this table. This time we add one file per partition, and add only two partitions as an example. For actual usage, you need to add all the files under all the partitions that you need.

Add partitions with following commands:

  1. Call the BeginTransaction API to start another Lake Formation transaction:
    $ aws lakeformation-preview begin-transaction
    {
         "TransactionId": "d70c60e859e832b312668723cf48c1b84ef9109c5dbf6e9dbe8834c481c0ec81"
    }

  2. List Amazon S3 objects located on amazon-reviews-pds bucket to choose another sample file:
    $ aws s3 ls s3://amazon-reviews-pds/parquet/product_category=Books/
    2018-04-09 15:35:58 1094842361 part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:35:59 1093295804 part-00001-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:00 1095643518 part-00002-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:00 1095218865 part-00003-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:00 1094787237 part-00004-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:33 1094302491 part-00005-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:35 1094565655 part-00006-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:35 1095288096 part-00007-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:35 1092058864 part-00008-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:35 1093613569 part-00009-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet

  3. Call the HeadObject API against one sample file in order to copy ETag and Size
    $ aws s3api head-object --bucket amazon-reviews-pds --key parquet/product_category=Books/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    {
         "AcceptRanges": "bytes",
         "LastModified": "Mon, 09 Apr 2018 06:35:58 GMT",
         "ContentLength": 1094842361,
         "ETag": "\"9805c2c9a0459ccf337e01dc727f8efc-131\"",
         "ContentType": "binary/octet-stream",
         "Metadata": {}
    }

  4. Create a new file named write-operations2.json and enter the following JSON: (Replace Uri, ETag, and Size with the values you copied.)
    [
        {
                "AddObject": {
                "Uri": "s3://amazon-reviews-pds/parquet/product_category=Books/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet",
                "ETag": "9805c2c9a0459ccf337e01dc727f8efc-131",
                "Size": 1094842361,
                "PartitionValues": [
                    "Books"
               ]
           }
        }
    ]

  5. Call the UpdateTableObjects API using write-operations2.json: (replace <transaction-id> with the transaction id you got in begin-transaction command.)
    $ aws lakeformation-preview update-table-objects --database-name lakeformation_tutorial_amazon_reviews --table-name amazon_reviews_governed --transaction-id <transaction-id> --write-operations file://./write-operations2.json

    Call the CommitTransaction API: (replace <transaction-id> with the transaction id you got in begin-transaction command.)

    $ aws lakeformation-preview commit-transaction --transaction-id <transaction-id>

    Now the two partitions are visible on the Lake Formation console.

Now the two partitions are visible on the Lake Formation console.

Querying the governed table using Amazon Athena

Now your governed table is ready! Let’s start querying the governed table using Amazon Athena. Sign in to the Athena console in us-east-1 Region using DataAnalyst1 user.

If it’s your first time running queries on Athena, you need to configure a query result location. For more information, see Specifying a Query Result Location.

To utilize Lake Formation preview features, you need to create a special workgroup named AmazonAthenaLakeFormationPreview, and join the workgroup. For more information, see Managing Workgroups.

Running a simple query

Sign in to the Athena console in us-east-1 Region using the DataAnalyst1 user. First, let’s preview 10 records stored in a governed table:

SELECT * 
FROM lakeformation.lakeformation_tutorial_amazon_reviews.amazon_reviews_governed
LIMIT 10

The following screenshot shows the query results.

The following screenshot shows the query results.

Running an analytic query

Next, let’s run an analytic query with aggregation for simulating real-world use cases:

SELECT product_category, count(*) as TotalReviews, avg(star_rating) as AverageRating
FROM lakeformation.lakeformation_tutorial_amazon_reviews.amazon_reviews_governed 
GROUP BY product_category

The following screenshot shows the results. This query returned the total number of reviews and average rating per product category.

The following screenshot shows the results

Running an analytic query with time travel

Each governed table maintains a versioned manifest of the Amazon S3 objects that it comprises. You can use previous versions of the manifest for time travel queries. Your queries against governed tables in Athena can include a timestamp to indicate that you want to discover the state of the data at a particular date and time.

To submit a time travel query in Athena, add a WHERE clause that sets the column __asOfDate to the epoch time (long integer) representation of the required date and time. Let’s run the time travel query: (replace <epoch-milliseconds> with the timestamp which is right after you made the first UpdateTableObjects call. To retrieve the epoch milliseconds, see the tips introduced after the screenshots in this post.)

SELECT product_category, count(*) as TotalReviews, avg(star_rating) as AverageRating
FROM lakeformation.lakeformation_tutorial_amazon_reviews.amazon_reviews_governed
WHERE __asOfDate = <epoch-milliseconds>
GROUP BY product_category

The following screenshot shows the query results. The result only includes the record of product_category=Camera. This is because that the file under product_category=Books has been added after this timestamp (1612267920000 ms = 2021/02/02 12:12:00 UTC), which has been specified in the time travel column __asOfDate.

The following screenshot shows the query results.

To retrieve epoch time from commands, you can run below commands.

The following command is for Linux (GNU date command):

$ echo $(($(date -u -d '2021/02/02 12:12:00' +%s%N)/1000000)) 
1612267920000

The following command is for OSX (BSD date command):

$ echo $(($(date -u -j -f "%Y/%m/%d %T" "2021/02/02 12:12:00" +'%s * 1000 + %-N / 1000000')))
1612267920000

Cleaning up

Now to the final step, cleaning up the resources.

  1. Delete the CloudFormation stack. The governed table you created is automatically deleted with the stack.
  2. Delete the Athena workgroup AmazonAthenaLakeFormationPreview.

Conclusion

In this blog post, we explained how to create a Lake Formation governed table with existing data in an AWS public dataset. In addition, we explained how to query against governed tables and how to run time travel queries for governed tables. With Lake Formation governed tables, you can achieve transactions, row-level security, and query acceleration. In Part 2 of this series, we show you how to create a governed table for streaming data sources and demonstrate how Lake Formation transactions work.

Lake Formations transactions, row-level security, and acceleration are currently available for preview in the US East (N. Virginia) AWS Region. To get early access to these capabilities, please sign up for the preview.


Appendix: Setting up resources via the console

When following the steps in this section, use the Region us-east-1 because as of this writing, this Lake Formation preview feature is available only in us-east-1.

Configuring IAM roles and IAM users

First, you need to set up two IAM roles, one is for AWS Glue ETL jobs, another is for the Lake Formation data lake location.

IAM policies

To create your policies, complete the following steps:

  1. On the IAM console, create a new Policy for Amazon S3.
  2. Save the policy as S3DataLakePolicy as follows:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::amazon-reviews-pds/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::amazon-reviews-pds"
                ]
            }
        ]
    }

  3. Create a new IAM policy named LFLocationPolicy with the following statements:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "LFPreview1",
                "Effect": "Allow",
                "Action": "execute-api:Invoke",
                "Resource": "arn:aws:execute-api:*:*:*/*/POST/reportStatus"
            },
            {
                "Sid": "LFPreview2",
                "Effect": "Allow",
                "Action": [
                    "lakeformation:BeginTransaction",
                    "lakeformation:CommitTransaction",
                    "lakeformation:AbortTransaction",
                    "lakeformation:GetTableObjects",
                    "lakeformation:UpdateTableObjects"
                ],
                "Resource": "*"
            }
        ]
    }

    
    

  4. Create a new IAM policy named LFQuery Policy with the following statements:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "LFPreview1",
                "Effect": "Allow",
                "Action": "execute-api:Invoke",
                "Resource": "arn:aws:execute-api:*:*:*/*/POST/reportStatus"
            },
            {
                "Sid": "LFPreview2",
                "Effect": "Allow",
                "Action": [
                    "lakeformation:BeginTransaction",
                    "lakeformation:CommitTransaction",
                    "lakeformation:AbortTransaction",
                    "lakeformation:ExtendTransaction",
                    "lakeformation:PlanQuery",
                    "lakeformation:GetTableObjects",
                    "lakeformation:GetQueryState",
                    "lakeformation:GetWorkUnits",
                    "lakeformation:Execute"
                ],
                "Resource": "*"
            }
        ]
    }

    IAM role for AWS Lake Formation

To create your IAM role for the Lake Formation data lake location, complete the following steps:

  1. Create a new Lake Formation role called LFRegisterLocationServiceRole with a Lake Formation trust relationship:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": [
              "lakeformation.amazonaws.com"
            ]
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
    

    Attach the customer managed policies S3DataLakePolicy and LFLocationPolicy you created in the previous step.

This role is used to register locations with Lake Formation which in-turn performs credential vending for Athena at query time.

IAM users

To create your users, complete the following steps:

  1. Create an IAM user named DatalakeAdmin.
  2. Attach the following AWS managed policies:
    1. AWSLakeFormationDataAdmin
    2. AmazonAthenaFullAccess
    3. IAMReadOnlyAccess
  3. Attach the customer managed policy LFQueryPolicy.
  4. Create an IAM user named DataAnalyst that can use Athena to query data.
  5. Attach the AWS managed policy AmazonAthenaFullAccess.
  6. Attach the customer managed policy LFQueryPolicy.

Configuring Lake Formation

If you’re new to Lake Formation, you can follow below steps for getting started with AWS Lake Formation.

  1. On the Lake Formation console, under Permissions, choose Admins and database creators.
  2. In the Data lake administratorssection, choose Grant.
  3. For IAM users and roles, choose your IAM user DatalakeAdmin.
  4. Choose Save.
  5. In the Database creators section, choose Grant.
  6. For IAM users and roles, choose the LFRegisterLocationServiceRole.
  7. Select Create Database.
  8. Choose Grant.
  9. Under Register and ingest, choose Data lake locations.
  10. Choose Register location.
  11. For Amazon S3 path, enter your Amazon S3 path to the bucket where your data is stored. This needs to be the same bucket you listed in LFLocationPolicy. Lake Formation uses this role to vend temporary Amazon S3 credentials to query services that need read/write access to the bucket and all prefixes under it.
  12. For IAM role, choose the LFRegisterLocationServiceRole.
  13. Choose Register location.
  14. Under Data catalog, choose Settings.
  15. Make sure that both check boxes for Use only IAM access control for new databases and Use only IAM access control for new tables in new databases are deselected.
  16. Under Data catalog, choose Databases.
  17. Choose Create database.
  18. Select Database.
  19. For Name, enter lakeformation_tutorial_amazon_reviews.
  20. Choose Create database.

About the Author

Noritaka Sekiyama is a Senior Big Data Architect at AWS Glue & Lake Formation. His passion is for implementing software artifacts for building data lakes more effectively and easily. During his spare time, he loves to spend time with his family, especially hunting bugs—not software bugs, but bugs like butterflies, pill bugs, snails, and grasshoppers.

Run usage analytics on Amazon QuickSight using AWS CloudTrail

Post Syndicated from Sunil Salunkhe original https://aws.amazon.com/blogs/big-data/run-usage-analytics-on-amazon-quicksight-using-aws-cloudtrail/

Amazon QuickSight is a cloud-native BI service that allows end users to create and publish dashboards in minutes, without provisioning any servers or requiring complex licensing. You can view these dashboards on the QuickSight product console or embed them into applications and websites. After you deploy a dashboard, it’s important to assess how they and other assets are being adopted, accessed, and used across various departments or customers.

In this post, we use a QuickSight dashboard to present the following insights:

  • Most viewed and accessed dashboards
  • Most updated dashboards and analyses
  • Most popular datasets
  • Active users vs. idle users
  • Idle authors
  • Unused datasets (wasted SPICE capacity)

You can use these insights to reduce costs and create operational efficiencies in a deployment. The following diagram illustrates this architecture.

The following diagram illustrates this architecture.

Solution components

The following table summarizes the AWS services and resources that this solution uses.

Resource Type Name Purpose
AWS CloudTrail logs CloudTrailMultiAccount Capture all API calls for all AWS services across all AWS Regions for this account. You can use AWS Organizations to consolidate trails across multiple AWS accounts.
AWS Glue crawler

QSCloudTrailLogsCrawler

QSProcessedDataCrawler

Ensures that all CloudTrail data is crawled periodically and that partitions are updated in the AWS Glue Data Catalog.
AWS Glue ETL job QuickSightCloudTrailProcessing Reads catalogued data from the crawler, processes, transforms, and stores it in an S3 output bucket.
AWS Lambda function ExtractQSMetadata_func Extracts event data using the AWS SDK for Python, Boto3. The event data is enriched with QuickSight metadata objects like user, analysis, datasets, and dashboards.
Amazon Simple Storage Service (s3)

CloudTrailLogsBucket

QuickSight-BIonBI-processed

One bucket stores CloudTrail data. The other stores processed data.
Amazon QuickSight Quicksight_BI_On_BO_Analysis Visualizes the processed data.

 Solution walkthrough

AWS CloudTrail is a service that enables governance, compliance, operational auditing, and risk auditing of your AWS account. You can use CloudTrail to log, continuously monitor, and retain account activity related to actions across your AWS infrastructure. You can define a trail to collect API actions across all AWS Regions. Although we have enabled a trail for all Regions in our solution, the dashboard shows the data for single Region only.

After you enable CloudTrail, it starts capturing all API actions and then, at 15-minute intervals, delivers logs in JSON format to a configured Amazon Simple Storage Service (Amazon S3) bucket. Before the logs are made available to our ad hoc query engine, Amazon Athena, they must be parsed, transformed, and processed by the AWS Glue crawler and ETL job.

Before the logs are made available to our ad hoc query engine

This will be handled by AWS Glue Crawler & AWS Glue ETL Job. The AWS Glue crawler crawls through the data every day and populates new partitions in the Data Catalog. The data is later made available as a table on the Athena console for processing by the AWS Glue ETL job. Glue ETL Job QuickSightCloudtrail_GlueJob.txt filters logs and processes only those events where the event source is QuickSight. (for example, eventSource = quicksight.amazonaws.com’).

  This will be handled by AWS Glue Crawler & AWS Glue ETL Job.

The following screenshot shows the sample JSON for the QuickSight API calls.

The following screenshot shows the sample JSON for the QuickSight API calls.

The job processes those events and creates a Parquet file. The following table summarizes the file’s data points.

Quicksightlogs
Field Name Data Type
eventtime Datetime
eventname String
awsregion String
accountid String
username String
analysisname String
Date Date

The processed data is stored in an S3 folder at s3://<BucketName>/processedlogs/. For performance optimization during querying and connecting this data to QuickSight for visualization, these logs are partitioned by date field. For this reason, we recommend that you configure the AWS Glue crawler to detect the new data and partitions and update the Data Catalog for subsequent analysis. We have configured the crawler to run one time a day.

We need to enrich this log data with metadata from QuickSight, such as a list of analyses, users, and datasets. This metadata can be extracted using descibe_analysis, describe_user, describe_data_set in the AWS SDK for Python.

We provide an AWS Lambda function that is ideal for this extraction. We configured it to be triggered once a day through Amazon EventBridge. The extracted metadata is stored in the S3 folder at s3://<BucketName>/metadata/.

Now that we have processed logs and metadata for enrichment, we need to prepare the data visualization in QuickSight. Athena allows us to build views that can be imported into QuickSight as datasets.

We build the following views based on the tables populated by the Lambda function and the ETL job:

CREATE VIEW vw_quicksight_bionbi 
AS 
  SELECT Date_parse(eventtime, '%Y-%m-%dT%H:%i:%SZ') AS "Event Time", 
         eventname  AS "Event Name", 
         awsregion  AS "AWS Region", 
         accountid  AS "Account ID", 
         username   AS "User Name", 
         analysisname AS "Analysis Name", 
         dashboardname AS "Dashboard Name", 
         Date_parse(date, '%Y%m%d') AS "Event Date" 
  FROM   "quicksightbionbi"."quicksightoutput_aggregatedoutput" 

CREATE VIEW vw_users 
AS 
  SELECT usr.username "User Name", 
         usr.role     AS "Role", 
         usr.active   AS "Active" 
  FROM   (quicksightbionbi.users 
          CROSS JOIN Unnest("users") t (usr)) 

CREATE VIEW vw_analysis 
AS 
  SELECT aly.analysisname "Analysis Name", 
         aly.analysisid   AS "Analysis ID" 
  FROM   (quicksightbionbi.analysis 
          CROSS JOIN Unnest("analysis") t (aly)) 

CREATE VIEW vw_analysisdatasets 
AS 
  SELECT alyds.analysesname "Analysis Name", 
         alyds.analysisid   AS "Analysis ID", 
         alyds.datasetid    AS "Dataset ID", 
         alyds.datasetname  AS "Dataset Name" 
  FROM   (quicksightbionbi.analysisdatasets 
          CROSS JOIN Unnest("analysisdatasets") t (alyds)) 

CREATE VIEW vw_datasets 
AS 
  SELECT ds.datasetname AS "Dataset Name", 
         ds.importmode  AS "Import Mode" 
  FROM   (quicksightbionbi.datasets 
          CROSS JOIN Unnest("datasets") t (ds))

QuickSight visualization

Follow these steps to connect the prepared data with QuickSight and start building the BI visualization.

  1. Sign in to the AWS Management Console and open the QuickSight console.

You can set up QuickSight access for end users through SSO providers such as AWS Single Sign-On (AWS SSO), Okta, Ping, and Azure AD so they don’t need to open the console.

You can set up QuickSight access for end users through SSO providers

  1. On the QuickSight console, choose Datasets.
  2. Choose New dataset to create a dataset for our analysis.

Choose New dataset to create a dataset for our analysis.

  1. For Create a Data Set, choose Athena.

In the previous steps, we prepared all our data in the form of Athena views.

  1. Configure permission for QuickSight to access AWS services, including Athena and its S3 buckets. For information, see Accessing Data Sources.

Configure permission for QuickSight to access AWS services,

  1. For Data source name, enter QuickSightBIbBI.
  2. Choose Create data source.

Choose Create data source.

  1. On Choose your table, for Database, choose quicksightbionbi.
  2. For Tables, select vw_quicksight_bionbi.
  3. Choose Select.

Choose Select.

  1. For Finish data set creation, there are two options to choose from:
    1. Import to SPICE for quicker analytics – Built from the ground up for the cloud, SPICE uses a combination of columnar storage, in-memory technologies enabled through the latest hardware innovations, and machine code generation to run interactive queries on large datasets and get rapid responses. We use this option for this post.
    2. Directly query your data – You can connect to the data source in real time, but if the data query is expected to bring bulky results, this option might slow down the dashboard refresh.
  2. Choose Visualize to complete the data source creation process.

Choose Visualize to complete the data source creation process.

Now you can build your visualizations sheets. QuickSight refreshes the data source first. You can also schedule a periodic refresh of your data source.

Now you can build your visualizations sheets.

The following screenshot shows some examples of visualizations we built from the data source.

The following screenshot shows some examples of visualizations we built from the data source.

 

This dashboard presents us with two main areas for cost optimization:

  • Usage analysis – We can see how analyses and dashboards are being consumed by users. This area highlights the opportunity for cost saving by looking at datasets that have not been used for the last 90 days in any of the analysis but are still holding a major chunk of SPICE capacity.
  • Account governance – Because author subscriptions are charged on a fixed fee basis, it’s important to monitor if they are actively used. The dashboard helps us identify idle authors for the last 60 days.

Based on the information in the dashboard, we could do the following to save costs:

Conclusion

In this post, we showed how you can use CloudTrail logs to review the use of QuickSight objects, including analysis, dashboards, datasets, and users. You can use the information available in dashboards to save money on storage, subscriptions, understand maturity of QuickSight Tool adoption and more.


About the Author

Sunil SalunkheSunil Salunkhe is a Senior Solution Architect working with Strategic Accounts on their vision to leverage the cloud to drive aggressive growth strategies. He practices customer obsession by solving their complex challenges in all the aspects of the cloud journey including scale, security and reliability. While not working, he enjoys playing cricket and go cycling with his wife and a son.

Retaining data streams up to one year with Amazon Kinesis Data Streams

Post Syndicated from Nihar Sheth original https://aws.amazon.com/blogs/big-data/retaining-data-streams-up-to-one-year-with-amazon-kinesis-data-streams/

Streaming data is used extensively for use cases like sharing data between applications, streaming ETL (extract, transform, and load), real-time analytics, processing data from internet of things (IoT) devices, application monitoring, fraud detection, live leaderboards, and more. Typically, data streams are stored for short durations of time before being loaded into a permanent data store like a data lake or analytics service.

Additional use cases are becoming more prevalent that may require you retain data in streams for longer periods of time. For example, compliance programs like HIPAA and FedRAMP may require you to store raw data for more than a few days or weeks, or you may want to backtest machine learning (ML) algorithms with historical data that may be several months old.

A challenge arises when you want to process historical data and newly arriving data streams. This requires complex logic to access your data lake and your data stream store, or two sets of code—one to process data from your data lake and one to process your new data streams.

Amazon Kinesis Data Streams solves this challenge by storing your data streams up to 1 year with long-term retention. You can use the same Kinesis Data Streams code base to process both historical and newly arriving data streams, and continue to use features like enhanced fan-out to read large data volumes at very high throughput.

In this post, we describe how long-term retention enables new use cases by bridging real-time and historical data processing. We also demonstrate how you can reduce the time to retrieve 30 days of data from a data stream by an order of magnitude using Kinesis Data Streams enhanced fan-out.

Simple setup, no resource provisioning

Kinesis Data Streams durably stores all data stream records in a shard, an append-only log ordered by arrival time. The time period from when a record is added to when it’s no longer accessible is called the retention period. A Kinesis data stream stores records for 24 hours by default, up to 365 days (8,760 hours). Applications can start reading data at any point in the retention period in the exact order in which the data stream is stored. Shards enable these applications to process data in parallel and at low-latency.

You can select a preset retention period or define a custom retention period in days or hours using the Kinesis Data Streams console, as in the following screenshot.

You can select a preset retention period or define a custom retention period in days or hours using the Kinesis Data Streams console, as in the following screenshot.

The default retention period is 24 hours and covers scenarios where intermittent lags in processing need to catch up with the real-time data. You can extend retention up to 7 days to reprocess slightly aged data to resolve potential downstream data losses. You can also use long-term retention to store data for more than 7 days and up to 365 days to reprocess historical data for use cases like algorithm backtesting, data store backfills, and auditing. For more information, see Changing the Data Retention Period.

Similarly, you can use the following AWS Command Line Interface (AWS CLI) command to set the retention period in hours (the following code sets it to 9 days, or 216 hours):

aws kinesis increase-stream-retention-period \
    --stream-name samplestream \
    --retention-period-hours 216

Read new and historical data, no code changes necessary

All the data captured in the stream is stored in a durable, encrypted, and secure manner for the specified retention period up to a maximum of 1 year. You can store any amount of data, retrieve it by specifying a start position, and read sequentially using the familiar getRecords and SubscribeToShard APIs. The start position can be the sequence number of a data record in a shard or a timestamp. This enables you to use the same code to process older data. You can set up multiple consuming applications to start processing data at different points in the data stream.

Speed up data reads using enhanced fan-out consumers

Kinesis Data Streams provides two types of models to consume data: shared throughput consumer and enhanced fan-out (EFO) consumer. In the shared throughput consumer model, all the consuming applications share 2 MB/s per shard read throughput and a 5 transactions per second (TPS) quota. In the enhanced fan-out model, each consumer gets a dedicated read throughput of 2MB/s per shard. Because it uses an HTTP/2 data retrieval API, there is no longer a limit of 5 TPS. You can attach up to 20 EFO consumers to a single stream and read data at a total rate of 40MB/s per shard. Because each consumer gets dedicated read throughput, processing one doesn’t impact another. So you can attach new consumers to process old data without worrying about the performance of the existing consumer processing real-time data. For example, you can retrain an ML model in an ad hoc fashion without impacting real-time workflows.

You can add and remove EFO consumers at any time and avoid paying for over-provisioned resources. For example, when backtesting, you can register EFO consumers before the test and remove them after completion. You’re only charged for resources used during the test. Also, you can use EFO consumers to accelerate the speed of processing. Each consuming application can process different parts of streams across the retention period to process all the data in parallel, thereby dramatically reducing the total processing time.

Clickstream pipeline use case

Let’s look at a clickstream use case to see how this works for an existing streaming pipeline like the one in the following diagram.

Let’s look at a clickstream use case to see how this works for an existing streaming pipeline like the one in the following diagram.

This pipeline takes clickstream data and creates an alert every time a user leaves your ecommerce site without purchasing the items in their cart. A simple pipeline like this is a great way to start with stream processing, but soon you may want to implement a recommendation system based on user activity on your website and mobile app. To do this, you need to gather historical data in your existing data stream and send it to Amazon Simple Storage Service (Amazon S3) so it can be used for training a recommendation ML model. This scenario illustrates a key benefit of enabling long-term retention: it gives you the flexibility to “go back in time” and replay the existing data in your stream to generate new analytics that you may not have considered when you initially set up the streaming pipeline.

Let’s say you enabled 30 days of retention on your Kinesis data stream. After you train your ML model, you can set up a new streaming pipeline that generates recommendations by calling an inference endpoint hosted on Amazon SageMaker based on the trained ML model. The following diagram illustrates the final state of this architecture.

The following diagram illustrates the final state of this architecture.

You can efficiently and quickly consume the existing data in the stream and write it to Amazon S3 so it can be used for training your ML model. The following diagram illustrates the architecture of this intermediate pipeline to generate training data.

The following diagram illustrates the architecture of this intermediate pipeline to generate training data.

You may wonder, why read from Kinesis Data Streams and write to Amazon S3? Why not write to Amazon S3 directly without enabling long-term retention? First, ingesting into Kinesis Data Streams with long-term retention enabled gives you the flexibility to generate additional streaming analytics as time passes. Second, this gives you the flexibility to filter and transform the data being read from Kinesis Data Streams before generating analytics or writing to Amazon S3. Lastly, you can use this approach to render analytics onto other systems besides Amazon S3, such as Amazon Elasticsearch Service (Amazon ES) using the Elasticsearch sink for Apache Flink.

Keep in mind that we only use this pipeline to bootstrap our second, long-lived pipeline that does recommendations, but this is an important step and we need a way to do this efficiently. Although there are multiple options for consuming data from Kinesis Data Streams, Amazon Kinesis Data Analytics for Apache Flink provides an elegant way to attach multiple EFO consumers in the same consuming application.

You can find more information at the official Apache Flink website, and about Kinesis Data Analytics for Apache Flink in the Kinesis Data Analytics developer guide. Apache Flink has a number of connectors, like the recently released FlinkKinesisConsumer, which supports enhanced fan-out for consuming from Kinesis Data Streams, or the Streaming File Sink to write to Amazon S3 from your Apache Flink application.

Accelerating data consumption

For the sake of simplicity, let’s use just one shard in our data stream, ingest data at the maximum rate of 1MB/s, and specify a retention period of 30 days. To bootstrap our new analytics, reading the full amount of data over 30 days with one EFO consumer at 2MB/s could potentially take up to 15 days to load this data into Amazon S3. However, you can accelerate this to 20 times faster using 20 EFO consumers at the same time, each reading from different points in the stream at 2 MB/s. The following diagram illustrates the architecture of multiple EFO consumers reading from multiple time slices.

The following diagram illustrates the architecture of multiple EFO consumers reading from multiple time slices.

This gives us a total of 40MB/s in consumption capacity as opposed to 2MB/s per shard with just one EFO consumer, reducing the overall time by 95%. In most use cases, this combination of Kinesis Data Analytics and EFO allows you to process 30 days of data in hours, instead of days.

A point of clarification regarding our approach: When all 20 consumers are finished reading past their respective endpoints in the stream, we stop the Apache Flink application. You can do this by raising an exception when all 20 consumers finish reading their respective time slices—effectively stopping the application. The following diagram illustrates the time savings we get from using 20 EFO consumers.

The following diagram illustrates the time savings we get from using 20 EFO consumers.

For more information about implementing this approach, see the GitHub repo.

Pricing

An additional cost is associated with long-term retention (from 7–365 days) and EFO consumers. For more information, see Amazon Kinesis Data Streams pricing. Because you can register EFO consumers on demand, you pay only for the limited time you used all 20 consumers to load data, resulting in faster loads. It’s important to point out that you pay roughly the same amount to consume a fixed volume of data from the stream with 20 EFO consumers as you do with 1 EFO consumer because of the shorter duration required when using 20 consumers. 

Summary

In this post, we discussed long-term retention use cases of Kinesis Data Streams, how to increase the retention of a data stream, and related feature enhancements with Kinesis Data Streams APIs and KCL. We took a deep dive into the Apache Flink-based enhanced-fan out consumer approach to replay long-term data quickly. We shared open-source code based on this approach so you can easily implement your use cases using Kinesis Data Streams long-term retention. 

You should use long-term retention if you’re planning to develop ML systems, generate customer behavior insights, or have compliance requirements for retaining raw data for more than 7 days. We would love to hear about your use cases with the long-term retention feature. Please submit your feedback to [email protected].


About the Authors

Nihar ShethNihar Sheth is a Senior Product Manager on the Amazon Kinesis Data Streams team at Amazon Web Services. He is passionate about developing intuitive product experiences that solve complex customer problems and enables customers to achieve their business goals. Outside of work, he is focusing on hiking 200 miles of beautiful PNW trails with his son in 2021.

 

 

Karthi Thyagarajan is a Solutions Architect on the Amazon Kinesis Team focusing on all things streaming and he enjoys helping customers tackle distributed systems challenges.

 

 

 

 

Sai Maddali is a Sr. Product Manager – Tech at Amazon Web Services where he works on Amazon Kinesis Data Streams . He is passionate about understanding customer needs, and using technology to deliver services that empowers customers to build innovative applications. Besides work, he enjoys traveling, cooking, and running.

 

 

Larry Heathcote is a Senior Product Marketing Manager at Amazon Web Services for data streaming and analytics. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys walking his Samoyed Sasha in the mornings so she can look for squirrels to bark at.

Integrating Datadog data with AWS using Amazon AppFlow for intelligent monitoring

Post Syndicated from Gopalakrishnan Ramaswamy original https://aws.amazon.com/blogs/big-data/integrating-datadog-data-with-aws-using-amazon-appflow-for-intelligent-monitoring/

Infrastructure and operation teams are often challenged with getting a full view into their IT environments to do monitoring and troubleshooting. New monitoring technologies are needed to provide an integrated view of all components of an IT infrastructure and application system.

Datadog provides intelligent application and service monitoring by bringing together data from servers, databases, containers, and third-party services in the form of a software as a service (SaaS) offering. It provides operations and development professionals the ability to measure application and infrastructure performance, visualize metrics with the help of a unified dashboard and create alerts and notifications.

Amazon AppFlow is a fully managed service that provides integration capabilities by enabling you to transfer data between SaaS applications like Datadog, Salesforce, Marketo, and Slack and AWS services like Amazon Simple Storage Service (Amazon S3) and Amazon Redshift. It provides capabilities to transform, filter, and validate data to generate enriched and usable data in a few easy steps.

In this post, I walk you through the process of extracting log data from Datadog, using Amazon AppFlow and storing it in Amazon S3, and querying it with Amazon Athena.

Solution overview

The following diagram shows the flow of our solution.

The following diagram shows the flow of our solution.

The Datadog Agent is a lightweight software that can be installed in many different platforms, either directly or as a containerized version. It collects events and metrics from hosts and sends them to Datadog. Amazon AppFlow extracts the log data from Datadog and stores it in Amazon S3, which is then queried using Athena.

To implement the solution, you complete the following steps:

  1. Install and configure the Datadog Agent.
  2. Create a new Datadog application key.
  3. Create an Amazon AppFlow connection for Datadog.
  4. Create a flow in Amazon AppFlow.
  5. Run the flow and query the data.

Prerequisites

The walkthrough requires the following:

  • An AWS account
  • A Datadog account

Installing and configuring the Datadog Agent

The Datadog Agent is lightweight software installed on your hosts. With additional setup, the Agent can report live processes, logs, and traces. The Agent needs an API key, which is used to associate the Agent’s data with your organization. Complete the following steps to install and configure the Datadog Agent:

  1. Create a Datadog account if you haven’t already.
  2. Login to your account.
  3. Under Integrations, choose APIs.
  4. Copy the API key.
  5. Download the Datadog Agent software for the selected platform.
  6. Install the Agent on the hosts using the API key you copied.

Collecting logs is disabled by default in Datadog Agent. To enable Agent log collection and configure a custom log collection, perform the following steps on your host:

  1. Update the Datadog Agent’s main configuration file (datadog.yaml) with the following code:
    logs_enabled: true

In Windows this file is in C:\ProgramData\Datadog.

  1. Create custom log collection by customizing the conf.yaml file.

For example in Windows this file would be in the path C:\ProgramData\Datadog\conf.d\win32_event_log.d. The following code is a sample entry in the conf.yaml file that enables collection of Windows security events:

logs:
  - type: windows_event
    channel_path: Security
    source: Security
    service: windowsOS
    sourcecategory: windowsevent

Getting the Datadog application key

The application keys in conjunction with your organization’s API key give you full access to Datadog’s programmatic API. Application keys are associated with the user account that created them. The application key is used to log all requests made to the API. Get your application key with the following steps:

  1. Login into your Datadog account.
  2. Under Integrations, choose APIs.
  3. Expand Application Keys.
  4. For Application key name, enter a name.
  5. Choose Create Application key.

Creating an Amazon AppFlow connection for Datadog

A connection defines the source or destination to use in a flow. To create a new connection for Datadog, complete the following steps:

  1. On the Amazon AppFlow console, in the navigation pane, choose Connections. 
  2. For Connectors, choose Datadog.
  3. Choose Create Connection.
  4. For API key and Application Key, enter the keys procured from the previous steps.
  5. For Connection Name, enter a name; for example, myappflowconnection.
  6. Choose Connect.

Choose Connect.

Creating a flow in Amazon AppFlow

After you create the data connection, you can create a flow that uses the connection and defines the destination, data mapping, transformation, and filters.

Creating an S3 bucket

Create an S3 bucket as your Amazon AppFlow transfer destination.

  1. On the Amazon S3 console, choose Create bucket.
  2. Enter a name for your bucket; for example, mydatadoglogbucket.
  3. Ensure that Block all public access is selected.
  4. Enable bucket versioning and encryption (optional).
  5. Choose Create bucket.
  6. Enable Amazon S3 server access logging (optional).

Configuring the flow source

After you create the Datadog agent and the S3 bucket, complete the following steps to create a flow:

  1. On the Amazon AppFlow console, in the navigation pane, choose Flows.
  2. Choose Create flow.
  3. For Flow name, enter a name for your flow; for example mydatadogflow.
  4. For Source name, choose Datadog.
  5. For Choose Datadog connection, choose the connection created earlier.
  6. For Choose Datadog object, choose Logs.

For Choose Datadog object, choose Logs.

Choosing a destination

In the Destination details section, provide the following information:

  1. For Destination name, Choose Amazon S3.
  2. For Bucket details, choose the name of the S3 bucket created earlier.

This step create a folder with the flow name you specified within the bucket to store the logs.

This step creates a folder with the flow name you specified within the bucket to store the logs.

Additional settings

You can provide additional settings for data format (JSON, CSV, Parquet), data transfer preference, filename preference, flow trigger and transfer mode. Leave all settings as default:

  • For Data format preference, choose JSON format.
  • For Data transfer preference, choose No aggregation.
  • For Filename preference, choose No timestamp.
  • For Folder structure preference, choose No timestamped folder.

Adding a flow trigger

Flows can be run on a schedule, based on an event or on demand. For this post, we choose Run on demand.

Mapping data fields

You can map manually or using a CSV file. This determines how data is transferred from source to destination. You can apply transformations like concatenation, masking, and truncation to the mappings.

  1. In the Map data fields section, for Mapping method, choose Manually map fields.
  2. For Source field name, choose Map all fields directly.
  3. Choose Next.Choose Next.

Validation

You can add validation to perform certain actions based on conditions on field values.

  1. In the Validations section, for Field name choose Content.
  2. For Condition, choose Values are missing or null.
  3. For Action, choose Ignore record.For Action, choose Ignore record.

Filters

Filters specify which records to transfer. You can add multiple filters with criterion. For the Datadog data source, it’s mandatory to specify filters for Date_Range and Query. The format for specifying filter query for metrics and logs are different.

  1. In the Add filters section, for Field name, choose Date_Range.
  2. For Condition, choose is between.
  3. For Criterion 1 and Criterion 2, enter start and end dates for log collection.
  4. Choose Add filter.
  5. For your second filter, for Field name, choose
  6. For Condition, enter host:<yourhostname> AND service:(windowsOS OR LinuxOS).
  7. Choose Save.

Choose Save.

The service names specified in the filter should have Datadog logs enabled (refer to the earlier step when you installed and configured the Datadog Agent).

The following are some examples of the filter Query for metrics:

  • load.1{*} by {host}
  • avg:system.cpu.idle{*}
  • avg:system.cpu.system{*}
  • avg:system.cpu.user{*}
  • avg:system.cpu.guest{*}
  • avg:system.cpu.user{host:yourhostname}

The following are some examples of the filter Query for logs:

  • service:servicename
  • host:myhostname
  • host:hostname1 AND service:(servicename1 OR servicename2) 

Running the Flow and querying the data

If a flow is based on a trigger, you can activate or deactivate it. If it’s on demand, it must be run each time data needs to be transferred. When you run the flow, the logs or metrics are pulled into files residing in Amazon S3. The data is in the form of a nested JSON in this example. Use AWS Glue and Athena to create a schema and query the log data.

Querying data with Athena

When the Datadog data is in AWS, there are a host of possibilities to store, process, integrate with other data sources, and perform advanced analytics. One such method is to use Athena to query the data directly from Amazon S3.

  1. On the AWS Glue console, in the navigation pane, choose Databases.
  2. Choose Add database.
  3. For Database name, enter a name such as mydatadoglogdb.
  4. Choose Create.
  5. In the navigation pane, choose Crawlers.
  6. Choose Add Crawler.
  7. For Crawler name, enter a name, such as mylogcrawler.
  8. Choose Next.
  9. For Crawler source type, select Data stores.
  10. Choose Next.
  11. In the Add a data store section, choose S3 for the data store.
  12. Enter the path to the S3 folder that has the log files; for example s3://mydatadoglogbucket/logfolder/.
  13. In the Choose an IAM role section, select Create an IAM role and provide a name.
  14. For Frequency select Run on demand.
  15. In the Configure the crawler’s output section, for Database, select the database created previously.
  16. Choose Next.
  17. Review and choose Finish.
  18. When the crawler’s status changes to Active, select it and choose Run Crawler.

When the crawler finishes running, it creates the tables and populates them with data based on the schema it infers from the JSON log files.

  1. On the Athena console, choose Settings.
  2. Select an S3 bucket and folder where Athena results are stored.
  3. In the Athena query window, enter the following query:
    select * 
    from mydatadoglogdb.samplelogfile
    where content.attributes.level = 'Information'
    

  4. Choose Run Query.

This sample query gets all the log entries where the level is Information. We’re traversing a nested JSON object in the Athena query, simply with a dot notation.

Summary

In this post, I demonstrated how we can bring Datadog data into AWS. Doing so opens a host of opportunities to use the tools available in AWS to drive advance analytics and monitoring while integrating with data from other sources.

With Amazon AppFlow, you can integrate applications in a few minute, transfer data at massive scale, and enrich the data as it flows, using mapping, merging, masking, filtering, and validation. For more information about integrating SaaS applications and AWS, see Amazon AppFlow.


About the Author

Gopalakrishnan Ramaswamy is a Solutions Architect at AWS based out of India with extensive background in database, analytics, and machine learning. He helps customers of all sizes solve complex challenges by providing solutions using AWS products and services. Outside of work, he likes the outdoors, physical activities and spending time with friends and family.

Building an administrative console in Amazon QuickSight to analyze usage metrics

Post Syndicated from Ying Wang original https://aws.amazon.com/blogs/big-data/building-an-administrative-console-in-amazon-quicksight-to-analyze-usage-metrics/

Given the scalability of Amazon QuickSight to hundreds and thousands of users, a common use case is to monitor QuickSight group and user activities, analyze the utilization of dashboards, and identify usage patterns of an individual user and dashboard. With timely access to interactive usage metrics, business intelligence (BI) administrators and data team leads can efficiently plan for stakeholder engagement and dashboard improvements. For example, you can remove inactive authors to reduce license cost, as well as analyze dashboard popularity to understand user acceptance and stickiness.

This post demonstrates how to build an administrative console dashboard and serverless data pipeline. We combine QuickSight APIs with AWS CloudTrail logs to create the datasets to collect comprehensive information of user behavior and QuickSight asset usage patterns.

This post provides a detailed workflow that covers the data pipeline, sample Python code, and a sample dashboard of this administrative console. With the guidance of this post, you can configure this administrative console in your own environment.

Let’s look at Forwood Safety, an innovative, values-driven company with a laser focus on fatality prevention. An early adopter of QuickSight, they have collaborated with AWS to deploy this solution to collect BI application usage insights.

“Our engineers love this admin console solution,” says Faye Crompton, Leader of Analytics and Benchmarking at Forwood. “It helps us to understand how users analyze critical control learnings by helping us to quickly identify the most frequently visited dashboards in Forwood’s self-service analytics and reporting tool, FAST.”

Solution overview

The following diagram illustrates the workflow of the solution.

The following diagram illustrates the workflow of the solution.

The workflow involves the following steps:

  1. The AWS Lambda function Data_Prepare is scheduled to run hourly. This function calls QuickSight APIs to get QuickSight namespace, group, user, and assets access permissions information and saves the results to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. CloudTrail logs are stored in S3 bucket.
  3. Based on the file in Amazon S3 that contains user-group information, the QuickSight assets access permissions information, as well as view dashboard and user login events in CloudTrail logs. Three Amazon Athena tables and several views are created. Optionally, the BI engineer can combine these two tables with employee information tables to display human resource information of the users.
  4. Two QuickSight datasets fetch the data in the Athena tables created in Step 3 through SPICE mode. Then, based on these datasets, a QuickSight dashboard is created.

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • Access to the following AWS services:
    • Amazon QuickSight
    • Amazon Athena
    • AWS Lambda
    • Amazon S3
  • Basic knowledge of Python
  • Optionally, Security Assertion Markup Language 2.0 (SAML 2.0) or OpenID Connect (OIDC) single sign-on (SSO) configured for QuickSight access

Creating resources

Create your resources by launching the following AWS CloudFormation stack:

After the stack creation is successful, you have one Amazon CloudWatch Events rule, one Lambda function, one S3 bucket, and the corresponding AWS Identity and Access Management (IAM) policies.

To create the resources in a Region other than us-east-1, download the Lambda function.

Creating Athena tables

The Data_Prepare Lambda function is scheduled to run hourly with the CloudWatch Events rule admin-console-every-hour. This function calls the QuickSight APIs list_namespaces, list_users, list_user_groups, list_dashboards, list_datasets, list_datasources, list_analyses, list_themes, describe_data_set_permissions, describe_dashboard_permissions, describe_data_source_permissions, describe_analysis_permissions, and describe_theme_permissions to get QuickSight users and assets access permissions information. Finally, this function creates two files, group_membership.csv and object_access.csv, and saves these files to an S3 bucket.

Run the following SQL query to create two Athena tables (group_membership and object_access):

CREATE EXTERNAL TABLE `group_membership`(
`namespace` string,   
`group` string, 
`user` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3:// admin-console<aws_account_id>/monitoring/quicksight/group_membership/'
TBLPROPERTIES (
  'areColumnsQuoted'='false', 
  'classification'='csv', 
  'columnsOrdered'='true', 
  'compressionType'='none', 
  'delimiter'=',',
  'typeOfData'='file')
CREATE EXTERNAL TABLE `object_access`(
`aws_region` string,   
`object_type` string, 
`object_name` string,
`object_id` string,
`principal_type` string,
`principal_name` string,
`namespace` string,
`permissions` string
)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3:// admin-console<aws_account_id>/monitoring/quicksight/object_access/'
TBLPROPERTIES (
  'areColumnsQuoted'='false', 
  'classification'='csv', 
  'columnsOrdered'='true', 
  'compressionType'='none', 
  'delimiter'=',',
  'typeOfData'='file')

The following screenshot is sample data of the group_membership table.

The following screenshot is sample data of the group_membership table.

The following screenshot is sample data of the object_access table.

The following screenshot is sample data of the object_access table.

For instructions on building an Athena table with CloudTrail events, see Amazon QuickSight Now Supports Audit Logging with AWS CloudTrail. For this post, we create the table cloudtrail_logs in the default database.

Creating views in Athena

Now we have the tables ready in Athena and can run SQL queries against them to generate some views to analyze the usage metrics of dashboards and users.

Create a view of a user’s role status with the following code:

CREATE OR REPLACE VIEW users AS
(select Namespace,
 Group,
 User,
(case 
when Group in ('quicksight-fed-bi-developer', 'quicksight-fed-bi-admin') 
then 'Author' 
else 'Reader' 
end) 
as author_status
from "group_membership" );

Create a view of GetDashboard events that happened in the last 3 months with the following code:

CREATE OR REPLACE VIEW getdashboard AS 
(SELECT 
"useridentity"."type",   "split_part"("useridentity"."sessioncontext"."sessionissuer"."arn",'/', 2) AS "assumed_role", COALESCE("useridentity"."username","concat"("split_part"("userid
entity"."arn", '/', 2), '/', "split_part"("useridentity"."arn",
'/', 3))) AS "user_name",
awsregion,
"split_part"("split_part"("serviceeventdetails", 'dashboardName":', 2),',', 1) AS dashboard_name, "split_part"("split_part"("split_part"("split_part"("serviceeventdetails", 'dashboardId":', 2),',', 1), 'dashboard/', 2),'"}',1) AS dashboardId,
date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ') AS event_time, max(date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ')) AS latest_event_time
FROM cloudtrail_logs
WHERE 
eventsource = 'quicksight.amazonaws.com' 
AND
eventname = 'GetDashboard' 
AND
DATE_TRUNC('day',date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ')) > cast(current_date - interval '3' month AS date)
GROUP BY  1,2,3,4,5,6,7)

In the preceding query, the conditions defined in the where clause only fetch the records of GetDashboard events of QuickSight.

How can we design queries to fetch records of other events? We can review the CloudTrail logs to look for the information. For example, let’s look at the sample GetDashboard CloudTrail event:

{
    "userIdentity": {
        "type": "AssumedRole",
        "principalId": "<principal_id>: <user_name>",
        "arn": "arn:aws:sts:: <aws_account_id>:assumed-role/<IAM_role_ name>/<user_name>",
        "accountId": "<aws_account_id>",
        "sessionContext": {
            "sessionIssuer": {
                "type": "Role",
                "principalId": "<principal_id>",
                …
            }
        }
    },
    "eventTime": "2021-01-13T16:55:36Z",
    "eventSource": "quicksight.amazonaws.com",
    "eventName": "GetDashboard",
    "awsRegion": "us-east-1",
    "eventID": "a599c8be-003f-46b7-a40f-2319efb6b87a",
    "readOnly": true,
    "eventType": "AwsServiceEvent",
    "serviceEventDetails": {
        "eventRequestDetails": {
            "dashboardId": "arn:aws:quicksight:us-east-1: <aws_account_id>:dashboard/<dashboard_id>"
        },
        "eventResponseDetails": {
            "dashboardDetails": {
                "dashboardName": "Admin Console",
                "dashboardId": "arn:aws:quicksight:us-east-1: <aws_account_id>:dashboard/<dashboard_id>",
                "analysisIdList": [
                    "arn:aws:quicksight:us-east-1: <aws_account_id>:analysis/<analysis_id>"
            }
        }
    }
}

With eventSource=“quicksight.amazonaws.com” and eventName=“GetDashboard”, we can get all the view QuickSight dashboard events.

Similarly, we can define the condition as eventname = ‘AssumeRoleWithSAML‘ to fetch the user login events. (This solution assumes that the users log in to their QuickSight account with identity federation through SAML.) For more information about querying CloudTrail logs to monitor other interesting user behaviors, see Using administrative dashboards for a centralized view of Amazon QuickSight objects.

Furthermore, we can join with employee information tables to get a QuickSight user’s human resources information.

Finally, we can generate a view called admin_console with QuickSight group and user information, assets information, CloudTrail logs, and, optionally, employee information. The following screenshot shows an example preview.

The following screenshot shows an example preview.

Creating datasets

With the Athena views ready, we can build some QuickSight datasets. We can load the view called admin_console to build a SPICE dataset called admin_console and schedule this dataset to be refreshed hourly. Optionally, you can create a similar dataset called admin_console_login_events with the Athena table based on eventname = ‘AssumeRoleWithSAML‘ to analyze QuickSight users log in events. According to the usage metrics requirement in your organization, you can create other datasets to serve the different requests.

Building dashboards

Now we can build a QuickSight dashboard as the administrative console to analyze usage metrics. The following steps are based on the dataset admin_console. The schema of the optional dataset admin_console_login_events is the same as admin_console. You can apply the same logic to create the calculated fields to analyze user login activities.

  1. Create parameters.

For example, we can create a parameter called InActivityMonths, as in the following screenshot.For example, we can create a parameter called InActivityMonths, as in the following screenshot.Similarly, we can create other parameters such as InActivityDays, Start Date, and End Date.

  1. Create controls based on the parameters.

Create controls based on the parameters.

  1. Create calculated fields.

For instance, we can create a calculated field to detect the active or inactive status of QuickSight authors. If the time span between the latest view dashboard activity and now is larger or equal to the number defined in the Inactivity Months control, the author status is Inactive. The following screenshot shows the relevant code.

The following screenshot shows the relevant code.

According to end user’s requirement, we can define several calculated fields to perform the analysis.

  1. Create visuals.

For example, we create an insight to display the top three dashboards view by readers and a visual to display the authors of these dashboards.

For example, we create an insight to display the top three dashboards view by readers and a visual to display the authors of these dashboards.

  1. We can add URL action to define some extra features to email inactive authors or check details of users.

We can add URL action to define some extra features to email inactive authors or check details of users.

The following sample code defines the action to email inactive authors:

mailto:<<email>>?subject=Alert to inactive author! &body=Hi, <<username>>, any author without activity for more than a month will be deleted. Please log in to your QuickSight account to continue accessing and building analyses and dashboards!

The following sample code defines the action to email inactive authors:
The following screenshots show an example dashboard that you can make using our data.

The following is the administrative console landing page. We provide the overview, terminology explanation and thumbnails of the other two tabs in this page.

The following is the administrative console landing page.

The following screenshots show the User Analysis tab.

The following screenshots show the User Analysis tab.

The following screenshots show the Dashboards Analysis tab.

The following screenshots show the Dashboards Analysis tab.

You can interactively play with the sample dashboard in the following Interactive Dashboard Demo.

You can reference to public template of the preceding dashboard in create-template, create-analysis, and create-dashboard API calls to create this dashboard and analysis in your account. The public template of this dashboard with the template ARN is 'TemplateArn': 'arn:aws:quicksight:us-east-1:889399602426:template/admin-console'.

Additional usage metrics

Additionally, we can perform some complicated analysis to collect advanced usage metrics. For example, Forwood Safety raised a unique request to analyze the readers who log in but don’t do any viewing of dashboard actions (see the following code). This helps their clients identify and prevent any wasting of reader sessions fees. Leadership teams value the ability to minimize uneconomical user activity.

CREATE OR REPLACE VIEW "loginwithoutviewdashboard" AS
with login as
(SELECT COALESCE("useridentity"."username", "split_part"("useridentity"."arn", '/', 3)) AS "user_name", awsregion,
date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ') AS event_time
FROM cloudtrail_logs
WHERE
eventname = 'AssumeRoleWithSAML'
GROUP BY  1,2,3),
dashboard as
(SELECT COALESCE("useridentity"."username", "split_part"("useridentity"."arn", '/', 3)) AS "user_name", awsregion,
date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ') AS event_time
FROM cloudtrail_logs
WHERE
eventsource = 'quicksight.amazonaws.com'
AND
eventname = 'GetDashboard'
GROUP BY  1,2,3),
users as 
(select Namespace,
Group,
User,
(case
when Group in (‘quicksight-fed-bi-developer’, ‘quicksight-fed-bi-admin’)
then ‘Author’
else ‘Reader’
end)
as author_status
from "group_membership" )
select l.* 
from login as l 
join dashboard as d 
join users as u 
on l.user_name=d.user_name 
and 
l.awsregion=d.awsregion 
and 
l.user_name=u.user_name
where d.event_time>(l.event_time + interval '30' minute ) 
and 
d.event_time<l.event_time 
and 
u.author_status='Reader'

Cleaning up

To avoid incurring future charges, delete the resources you created with the CloudFormation template.

Conclusion

This post discussed how BI administrators can use QuickSight, CloudTrail, and other AWS services to create a centralized view to analyze QuickSight usage metrics. We also presented a serverless data pipeline to support the administrative console dashboard.

You can request a demo of this administrative console to try for yourself.


About the Authors

Ying Wang is a Data Visualization Engineer with the Data & Analytics Global Specialty Practice in AWS Professional Services.

 

 

 

Jill FlorantJill Florant manages Customer Success for the Amazon QuickSight Service team

Create a custom data connector to Slack’s Member Analytics API in Amazon QuickSight with Amazon Athena Federated Query

Post Syndicated from Pablo Redondo Sanchez original https://aws.amazon.com/blogs/big-data/create-a-custom-data-connector-to-slacks-member-analytics-api-in-amazon-quicksight-with-amazon-athena-federated-query/

Amazon QuickSight recently added support for Amazon Athena Federated Query, which allows you to query data in place from various data sources. With this capability, QuickSight can extend support to query additional data sources like Amazon CloudWatch Logs, Amazon DynamoDB, and Amazon DocumentDB (with Mongo DB compatibility) via their existing Amazon Athena data source. You can also use the Athena Query Federation SDK to write custom connectors and query any source accessible with a Java API, whether it is relational, non-relational, object, or a custom data endpoint.

A common analytics use case is to access data from a REST API endpoint and blend it with information from other sources. In this post, I walk you through the process of setting up a custom federated query connector in Athena to query data from a REST API endpoint and build a QuickSight dashboard that blends data from the REST API endpoint with other data sources.

To illustrate this use case, we work with Slack, the makers of a leading channel-based messaging platform, to test their Member Analytics API, which can help our mock company, Example Corp, understand Slack adoption and member engagement across different teams.

How the Slack Member Analytics API works

The following diagram illustrates the Slack Member Analytics API.

The following diagram illustrates the Slack Member Analytics API.

The Slack Member Analytics API is a REST API endpoint available for Slack Enterprise Grid customers. Authorized users and services can access the member usage stats dataset via the admin.analytics.getFile endpoint of the Slack Web API. The data consists on a new-line delimited JSON file with daily Slack activity stats at the member level. A record looks like the following code:

{ 
    "enterprise_id":"AAAAAAA",
    "date":"2020-11-10",
    "user_id":"U01ERHY4589",
    "email_address":"[email protected]",
    "is_guest":false,
    "is_billable_seat":false,
    "is_active":true,
    "is_active_ios":false,
    "is_active_android":false,
    "is_active_desktop":true,
    "reactions_added_count":3,
    "messages_posted_count":10, 
    "channel_messages_posted_count":0,
    "files_added_count":0
}

To request data, you must provide a date argument in the format of YYYY-MM-DD, a type argument with the value member, and an OAuth bearer token as the header. The response is a compressed (.gzip) JSON file with data for the requested date. See the following code of a sample request:

curl -X GET -H “Authorization: Bearer xoxp-..."  https://slack.com/api/admin.analytics.getFile?date=2020-09-01&type=member > data.gzip

Building the solution for Example Corp

For our use case, Example Corp has recently purchased Slack for 1,000 users and as the Collaboration team onboards new teams to Slack, they want to measure Slack adoption and engagement within each new team. If they see low adoption or engagement within a group at the company, they can work with that group to understand why they aren’t using Slack and provide education and support, as needed.

Example Corp wants to provide analysts access to the Slack member usage stats to run ad hoc queries in place (directly from the source) without maintaining a new extract, transform, and load (ETL) pipeline. They use the QuickSight cross data source join feature to blend their Slack usage stats with their HR dataset.

To achieve this, Example Corp implements the following steps:

  1. Authorize the custom federated query connector with Slack to access the Member Analytics API.
  2. Develop and deploy a custom federated query connector in the Example Corp AWS account.
  3. Create a dataset in the Example Corp QuickSight environment that reads Slack member usage data for the last 30 days and blends it with an HR dataset.
  4. Create a QuickSight dashboard that shows usage trends of provisioned vs. active users.

Example Corp program managers can now monitor slack engagement using their QuickSight Dashboard (see the following screenshot).

Example Corp program managers can now monitor slack engagement using their QuickSight Dashboard

The following diagram illustrates the overall architecture of the solution.

The following diagram illustrates the overall architecture of the solution.

The following sections describe the components in detail and provide sample code to implement the solution in your environment.

Authorizing the custom federated query connector to access the Slack Analytics API

Data REST API endpoints typically have an authentication mechanism such as standard HTTP authentication or a bearer token. In the case of the Slack Web API, a bearer token is required on every request. The Slack Member Analytics API uses an OAuth protocol to authorize applications’ read access to data from an organization’s Slack environment.

To perform the OAuth handshake, Example Corp deploys a custom web application on Amazon Elastic Compute Cloud (Amazon EC2) and registers it as a new Slack application. When it’s deployed, Example Corp Slack admins can access the web application UI to authenticate with Slack and authorize read access to the custom federated query connector. After successful authentication, the custom web application stores the bearer token as a secret in AWS Secrets Manager. Only the custom application server and the federated query connector have access to this secret.

The following is an architecture diagram and brief description of the OAuth authorization workflow between Slack.com and the custom web application. As a prerequisite, you need to register your custom application with Slack.

The following is an architecture diagram and brief description of the OAuth authorization workflow between Slack.com and the custom web application.

  1. The Slack admin accesses the custom application UI from their browser and chooses Add to Slack to begin the authorization process.

The Slack admin accesses the custom application UI from their browser and chooses Add to Slack to begin the authorization process.

  1. The custom application redirects the admin to Slack.com to authenticate and authorize the client with an admin.analytics:read access for Example Corp Slack Enterprise Grid.

The custom application redirects the admin to Slack.com to authenticate and authorize the client with an admin.analytics:read access for Example Corp Slack Enterprise Grid.

  1. Slack.com redirects the admin back to the custom application UI, passing a temporary authorization code in the request.
  2. On the backend, the custom application retrieves Slack client secrets from a Secrets Manager secret. The Slack client secrets are obtained during the Slack application registration.
  3. The custom application server makes a request for a bearer token to the Slack API, passing both the temporary authorization code and the Slack client secrets.
  4. If both the temporary authorization code and the client secrets are valid, then the Slack API returns a bearer token to the custom application server.
  5. The custom application saves the bearer token in the Secrets Manager secret.
  6. Finally, the application sends a confirmation of successful authorization to the admin.

Slack admins can revoke access to the application from the organization’s console at any time.

You can find the source code and detailed instructions to deploy this sample OAuth web application in the GitHub repo. When the authorization workflow is complete, you can pause or stop the resources running the web application. Going forward, the federated query connector accesses the token from Secrets Manager.

Deploying the custom federated query connector

When the OAuth workflow is complete, we can deploy the custom federated query connector in the Example Corp AWS environment. For Example Corp, we develop a custom AWS Lambda function using the Athena Query Federation Java SDK and a Java HTTP client to connect with the Slack Member Analytics REST API. Finally, we register it as a new data source within Athena.

The following is a diagram of how the custom connector workflow operates.

The following is a diagram of how the custom connector workflow operates.

The workflow includes the following steps:

  1. Users submit a query to Athena using the following query: select * from <catalog_name>.slackanalytics.member_analytics where date='2020-11-10', where <catalog_name> is the name specified when creating the Athena data source.
  2. Athena compiles the query and runs the Lambda function to retrieve the Slack authorization token from Secrets Manager and determine the number of partitions based on the query predicates (where clause).
  3. The Slack Member Analytics Connector partitions the data by date and runs a Lambda function for each partition (date) specified in the query. For example, if the predicate is WHERE date IN (‘2020-11-10’, ‘2020-11-12’), Athena runs two instances of the Lambda function. When no dates are specified in the where clause, the connector gets data for the last 30 days.
  4. Each instance of the Lambda function makes a request to the Slack Member API to retrieve data for each day.
  5. Finally, Athena performs any aggregation and computation specified in the query and return the results to the client.

You can deploy this sample Slack Member Analytics Lambda function in your AWS environment via AWS CloudFormation with the following template. If you want to modify and build the connector from scratch, you can find the source code and instructions in the GitHub repo.

After the Lambda function has been deployed, create a new data source in Athena. For step-by-step instructions, see Deploying a Connector and Connecting to a Data Source.

  1. On the Athena console, in the query editor, choose Connect data source.

On the Athena console, in the query editor, choose Connect data source.

  1. Select All other data sources.
  2. Point your catalog to your new Lambda function.

Point your catalog to your new Lambda function.

You should be able to browse your new catalog within Athena from the Athena console and query the Slack Member Analytics API using SQL.

You should be able to browse your new catalog within Athena from the Athena console and query the Slack Member Analytics API using SQL.

Creating a dataset that reads Slack member usage data and blends it with an HR dataset

As a prerequisite to query the Slack Member Analytics API from QuickSight, we must provide the proper permission for QuickSight to access the federated query data source in Athena. We do this directly from the QuickSight admin UI following these steps:

  1. As an admin, on the Admin menu, choose Manage QuickSight.
  2. Under Security & Permissions, choose QuickSight access to AWS services.
  3. Choose Add or Remove services.
  4. Select Athena.
  5. Choose Next when prompted to set the Amazon Simple Storage Service (Amazon S3) bucket and Lambda function permissions.

QuickSight browses the Athena catalogs and displays any Lambda functions associated with your account. If you don’t see a Lambda function, it means you haven’t mapped a data source within Athena.

  1. Select the function.
  2. Choose Finish.

Choose Finish.

When the Example Corp QuickSight environment has the proper permissions, analysts can query the Slack Analytics Member API using their existing Athena data source. For instructions on creating your own dataset, see Creating a Dataset Using Amazon Athena Data.

The custom connector appears as a new Catalog, Database, and Tables option.

  1. In QuickSight, on the Datasets page, choose New dataset.

In QuickSight, on the Datasets page, choose New dataset.

  1. Choose Athena as your data source.
  2. Choose Create dataset.

Choose Create dataset.

  1. Choose your table or, for this use case, choose Use custom SQL.

Choose your table or, for this use case, choose Use custom SQL.

For this analysis, we write a custom SQL that gets member activity for the last 30 days:

SELECT date,
       is_active,
       email_address,
       messages_posted_count
FROM   slackanalytics_catalog.slackanalytics.member_analytics
WHERE  date >= date_format(date_trunc('month',current_date),'%Y-%m-%d')

With the QuickSight cross data source join feature, analysts can enrich the Slack member stats with their HR info. For this use case, we imported a local HR_dataset.csv file containing the list of subscribed users with their respective Example Corp department, and joined them via the employee_email field.

With the QuickSight cross data source join feature, analysts can enrich the Slack member stats with their HR info.

The result is a dataset with Slack activity by employee and department. We’ve also updated the date field from a String type to a Date type using the QuickSight Data Prep page to take advantage of additional visualization features with Date type fields.

The result is a dataset with Slack activity by employee and department.

Creating a QuickSight dashboard that shows usage trends of provisioned vs. active users

Example Corp Analysts want to visualize the trend of provisioned users vs. active users and understand Slack adoption by department. To support these visualizations, we created the following calculated fields within our QuickSight analysis:

  • active distinct_countIf(employee,{is_active}='true')
  • provisioneddistinct_count(employee)

You can also create these calculated fields when you create your dataset. This way, you can reuse them in other QuickSight analyses. 

We use QuickSight narrative insights, a line chart, a bar chart, and a pivot table with conditional formatting to create the following analysis.

We use QuickSight narrative insights, a line chart, a bar chart, and a pivot table with conditional formatting to create the following analysis.

From this analysis, Example Corp can see that the adoption trend is positive; however, there is an adoption gap within the Marketing team. The program managers can engage the Marketing department leads and focus their training resources to improve their adoption.

From this analysis, Example Corp can see that the adoption trend is positive; however, there is an adoption gap within the Marketing team.

This dashboard can now be published to stakeholders within the organization as needed—either within the QuickSight app or embedded within existing enterprise applications. 

Conclusion

With the recent integration of QuickSight and Athena Federated Query, organizations can access additional data sources beyond those already supported by QuickSight. Analysts can leverage QuickSight capabilities to analyze and build dashboards that blend data from a variety of data sources, and with the Athena Query Federation SDK, you can build custom connectors to access relational, non-relational, object, and custom data endpoints using standard SQL.

To get started, try the lab Athena Deploying Custom Connector.


About the Author

Pablo Redondo SanchezPablo Redondo Sanchez is a Senior Solutions Architect at Amazon Web Services. He is a data enthusiast and works with customers to help them achieve better insights and faster outcomes from their data analytics workflows. In his spare time, Pablo enjoys woodworking and spending time outdoor with his family in Northern California.

 

 

 

Getting started with Trace Analytics in Amazon Elasticsearch Service

Post Syndicated from Jeff Wright original https://aws.amazon.com/blogs/big-data/getting-started-with-trace-analytics-in-amazon-elasticsearch-service/

Trace Analytics is now available for Amazon Elasticsearch Service (Amazon ES) domains running versions 7.9 or later. Developers and IT Ops teams can use this feature to troubleshoot performance and availability issues in their distributed applications. It provides end-to-end insights that are not possible with traditional methods of collecting logs and metrics from each component and service individually.

This feature provides a mechanism to ingest OpenTelemetry-standard trace data to be visualized and explored in Kibana. Trace Analytics introduces two new components that fit into the OpenTelemetry and Amazon ES ecosystems:

  • Data Prepper: A server-side application that collects telemetry data and transforms it for Amazon ES.
  • Trace Analytics Kibana plugin: A plugin that provides at-a-glance visibility into your application performance and the ability to drill down on individual traces. The plugin relies on trace data collected and transformed by Data Prepper.

Here is a component overview:

Here is a component overview:

Applications are instrumented with OpenTelemetry instrumentation, which emit trace data to OpenTelemetry Collectors. Collectors can be run as agents on Amazon EC2, as sidecars for Amazon ECS, or as sidecars or DaemonSets for Amazon EKS. They are configured to export traces to Data Prepper, which transforms the data and writes it to Amazon ES. The Trace Analytics Kibana plugin can then be used to visualize and detect problems in your distributed applications.

OpenTelemetry is a Cloud Native Computing Foundation (CNCF) project that aims to define an open standard for the collection of telemetry data. Using an OpenTelemetry Collector in your service environment allows you to ingest trace data from a other projects like Jaeger, Zipkin, and more. As of version 0.7.1, Data Prepper is currently an alpha release. It is a monolithic, vertically scaling component. Work on the next version is underway. It will support more features, including horizontal scaling.

In this blog post, we cover:

  • Launching Data Prepper to send trace data to your Amazon ES domain.
  • Configuring an OpenTelemetry Collector to send trace data to Data Prepper.
  • Exploring the Kibana Trace Analytics plugin using a sample application.

Prerequisites

To get started, you need:

  • An Amazon ES domain running version 7.9 or later.
    • An IAM role for EC2 that has been added to the domain’s access policy. For information, see Create an IAM role in the Amazon EC2 User Guide for Linux Instances.
  • This CloudFormation template, which you use in the walkthrough. Be sure to download it now.
  • An SSH key pair to be deployed to a new EC2 instance.

Deploy to EC2 with CloudFormation

Use the CloudFormation template to deploy Data Prepper to EC2.

  1. Open the AWS CloudFormation console, and choose Create stack.
  2. In Specify template, choose Upload a template file, and then upload the CloudFormation template.
  3. All fields on the Specify stack details page are required. Although you can use the defaults for most fields, enter your values for the following:
    • AmazonEsEndpoint
    • AmazonEsRegion
    • AmazonEsSubnetId (if your Amazon ES domain is in a VPC)
    • IAMRole
    • KeyName

The InstanceType parameter allows you to specify the size of the EC2 instance that will be created. For recommendations on instance sizing by workload, see Right Sizing: Provisioning Instances to Match Workloads, and the Scaling and Tuning guide of the Data Prepper repository.

It should take about three minutes to provision the stack. Data Prepper starts during the CloudFormation deployment. To view output logs, use SSH to connect to the EC2 host and then inspect the /var/log/data-prepper.out file.

Configure OpenTelemetry Collector

Now that Data Prepper is running on an EC2 instance, you can send trace data to it by running an OpenTelemetry Collector in your service environment. For information about installation, see Getting Started in the OpenTelemetry documentation. Make sure that the Collector is configured with an exporter that points to the address of the Data Prepper host. The following otel-collector-config.yaml example receives data from various sources and exports it to Data Prepper.

receivers:
  jaeger:
    protocols:
      grpc:
  otlp:
    protocols:
      grpc:
  zipkin:

exporters:
  otlp/data-prepper:
    endpoint: <data-prepper-address>:21890
    insecure: true

service:
  pipelines:
    traces:
      receivers: [jaeger, otlp, zipkin]
      exporters: [otlp/data-prepper]

Be sure to allow traffic to port 21890 on the EC2 instance. You can do this by adding an inbound rule to the instance’s security group.

Explore the Trace Analytics Kibana plugin by using a sample application

If you don’t have an OpenTelemetry Collector running and would like to send sample data to your Data Prepper instance to try out the trace analytics dashboard, you can quickly set up an instance of the Jaeger Hot R.O.D. application on the EC2 instance with Docker Compose. Our setup script creates three containers on the EC2 instance:

  • Jaeger Hot R.O.D.: The example application to generate trace data.
  • Jaeger Agent: A network daemon that batches trace spans and sends them to the Collector.
  • OpenTelemetry Collector: A vendor-agnostic executable capable of receiving, processing, and exporting telemetry data.

Although your application, the OpenTelemetry Collectors, and Data Prepper instances typically wouldn’t reside on the same host in a real production environment, for simplicity and cost, we use one EC2 instance.

To start the sample application

  1. Use SSH to connect to the EC2 instance using the private key specified in the CloudFormation stack.
    1. When connecting, add a tunnel to port 8080 (the Hot R.O.D. container accepts connections from localhost only). You can do this by adding -L 8080:localhost:8080 to your SSH command.
  2. Download the setup script by running:
    wget https://raw.githubusercontent.com/opendistro-for-elasticsearch/data-prepper/master/examples/aws/jaeger-hotrod-on-ec2/setup-jaeger-hotrod.sh

  3. Run the script with sh setup-jaeger-hotrod.sh.
  4. Visit http://localhost:8080/ to access the Hot R.O.D. dashboard and start sending trace data!

Figure 2: Hot R.O.D. Rides on Demand

  1. After generating sample data with the Hot R.O.D. application, navigate to your Kibana dashboard and from the left navigation pane, choose Trace Analytics. The Dashboard view groups traces together by HTTP method and path so that you can see the average latency, error rate, and trends associated with an operation.

Figure 3: Dashboard page

  1. For a more focused view, choose Traces to drill down into a specific trace.

Figure 4: Traces page

  1. Choose Services to view all services in the application and an interactive map that shows how the various services connect to each other.

Figure 5: Services pageConclusion

Trace Analytics adds to the existing log analytics capabilities of Amazon ES, enabling developers to isolate sources of performance problems and diagnose root causes in their distributed applications. We encourage you to start sending your trace data to Amazon ES so you can benefit from Trace Analytics today.


About the Authors

Jeff Wright is a Software Development Engineer at Amazon Web Services where he works on the Search Services team. His interests are designing and building robust, scalable distributed applications. Jeff is a contributor to Open Distro for Elasticsearch.

 

 

Kowshik Nagarajaan is a Software Development Engineer at Amazon Web Services where he works on the Search Services team. His interests are building and automating distributed analytics applications. Kowshik is a contributor to Open Distro for Elasticsearch.

 

 

Anush Krishnamurthy is an Engineering Manager working on the Search Services team at Amazon Web Services.

How the Yahoo! JAPAN Smart Devices Team is improving voice user interfaces with Amazon QuickSight business intelligence

Post Syndicated from Kazuhide Fujita original https://aws.amazon.com/blogs/big-data/how-the-yahoo-japan-smart-devices-team-is-improving-voice-user-interfaces-with-amazon-quicksight-business-intelligence/

This is a guest blog post by Kazuhide Fujita, Product Manager at Yahoo! JAPAN.

Yahoo! JAPAN is a large internet search and media company, with Yahoo! JAPAN’s web portal being the one of the most commonly used websites in Japan. Our smart devices team is responsible for building and improving Yahoo! JAPAN apps for voice user interfaces (VUI) such as Amazon Alexa and Google Assistant. We see VUI as a market that will grow exponentially in the future, and we want to be ready to lead the consumer experience with such devices. In this post, I discuss how we’re using Amazon QuickSight business intelligence (BI) to help our product teams improve these services.

Enhanced access to insights at lower cost

To continuously improve our services, we use data to understand how consumers are interacting with the software and to identify growth trends. However, the data we get directly from smart device makers is limited. So, we built our own log system to capture more granular data, such as the types of commands customers are using, the time of day they use the application, and how frequently they use it.

Early on, we used Amazon Elasticsearch Service (Amazon ES) and Kibana to analyze data. Although this solution was very capable, it came at a higher price point than we were targeting. Another option was to export data directly to Microsoft Excel for ad hoc analysis. However, this was very time consuming and limited us to working with extracts of historical data rather than the latest information.

We decided to look for a solution that would suit the full spectrum of our needs while being cost-effective for our specific use case. While we were searching, our data team made the decision to standardize on a data lake architecture using Amazon Simple Storage Service (Amazon S3) and Amazon Athena. This approach provided a high level of flexibility and scalability. To visualize our data, it made sense to use QuickSight, the serverless BI solution with pay-per-session pricing on AWS.

Unifying data to understand customers better

This system has proven to be a good fit for our needs. The data lake allows us to accumulate different types of data from monitoring many KPIs and VUI products. For example, we might want to know the number of active users over a given period, and then drill down into how active those users were in the 2 weeks from when they registered. The data lake makes this possible. It’s easy to maintain even though the data is very diverse. For aggregating and performing calculations on the data, we use Athena because it provides optimal performance for complex queries thanks to the distributed computing model.

For ad hoc analysis, dashboards, and reporting, QuickSight connects seamlessly to our data lake. QuickSight makes it easy to view trends in customer behavior such as the time of usage, method of interaction, typical settings, and so on. The following screenshot shows a sample dashboard in QuickSight.

The following screenshot shows a sample dashboard in QuickSight.

For example, the default wake word for Alexa-powered devices is to say the name of the voice assistant: “Hey, Alexa.” However, Japanese customers may prefer to say “ohayō,” which means “good morning” in Japanese. Which setting customers prefer could be an important trend for us to know when we configure our offerings. With QuickSight, it’s easy to compare trends for this type of behavior across other user characteristics.

This is only one small example of the kinds of insights we glean by using QuickSight. Another use case is regarding initiatives to increase product usage through marketing or incentives. We can track the outcome of these programs using QuickSight by tracking whether they result in an uptick in usage relative to the communications we send out.

The freedom to focus on what matters to the business

One of the big advantages of using QuickSight and other AWS services is that we don’t have to worry about maintaining on-premises systems for our data lake and analytics. It’s easy to manage and we can focus on gaining insights and improving our products—not running data center infrastructure. Building our end-to-end data-to-insights pipeline on AWS ensures that we can easily apply security and governance policies to all our data.

Overall, QuickSight provides us with the flexibility to analyze all kinds of data quickly, so we can aim to be the market leader in the VUI marketplace. We’re excited to see what the future holds for this powerful tool—and to apply the knowledge we gain to improving our services.


About the Author

Kazuhide Fujita is the Skill Team Product Manager, Smart Device Division, at Yahoo Japan Corporation

Implementing multi-tenant patterns in Amazon Redshift using data sharing

Post Syndicated from Rajesh Francis original https://aws.amazon.com/blogs/big-data/implementing-multi-tenant-patterns-in-amazon-redshift-using-data-sharing/

Software service providers offer subscription-based analytics capabilities in the cloud with Analytics as a Service (AaaS), and increasingly customers are turning to AaaS for business insights. A multi-tenant storage strategy allows the service providers to build a cost-effective architecture to meet increasing demand.

Multi-tenancy means a single instance of software and its supporting infrastructure is shared to serve multiple customers. For example, a software service provider could generate data that is housed in a single data warehouse cluster, but accessed securely by multiple customers. This storage strategy offers an opportunity to centralize management of data, simplify ETL processes, and optimize costs. However, service providers have to constantly balance between cost and providing a better user experience for their customers.

With the new data sharing feature, you can use Amazon Redshift to scale and meet both objectives of managing costs by simplifying storage and ETL pipelines while still providing consistent performance to customers.  You can ingest data into a cluster designated as a producer cluster, and share this live data with one or more consumer clusters. Clusters accessing this shared data are isolated from each other, therefore performance of a producer cluster isn’t impacted by workloads on consumer clusters. This enables consuming clusters to get consistent performance based on individual compute capacity.

In this post, we focus on various AaaS patterns, and discuss how you can use data sharing in a multi-tenant architecture to scale for virtually unlimited users. We discuss detailed steps to use data sharing with different storage strategies.

Multi-tenant storage patterns

Multi-tenant storage patterns help simplify the architecture and long-term maintenance of the analytics platform. In a multi-tenant strategy, data is stored centrally in a single cluster for all tenants, enabling simplification of the ETL ingestion pipeline and data management. In the previously published whitepaper SaaS Storage Strategies, various models of storage and benefits are covered for a single cluster scenario.

The three strategies you can choose from are:

  • Pool model – Data is stored in a single database schema for all tenants, and a new column (tenant_id) is used to scope and control access to individual tenant data. Access to the multi-tenant data is controlled using views built on the tables.
  • Bridge model – Storage and access to data for each tenant is controlled at individual schema level in the same database.
  • Silo model – Storage and access control to data for each tenant is maintained in separate databases

The following diagram illustrates the architecture of these multi-tenant storage strategies.

The following diagram illustrates the architecture of these multi-tenant storage strategies.

In the following sections, we will discuss how these multi-tenant strategies can be implemented using Amazon Redshift data sharing feature with a multi-cluster architecture.

Scaling your multi-tenant architecture using data sharing

AaaS providers implementing multi-tenant architectures were previously limited to resources of a single cluster to meet the compute and concurrency requirements of users across all the tenants. As the number of tenants increased, you could either turn on concurrency scaling or create additional clusters. However, the addition of new clusters means additional ingestion pipelines and increased operational overhead.

With data sharing in Amazon Redshift, you can easily and securely share data across clusters. Data ingested into the producer cluster is shared with one or more consumer clusters, which allows total separation of ETL and BI workloads. Several consumer clusters can read data from the managed storage of a producer cluster. This enables instant, granular, and high-performance access without data copies and movement. Workloads accessing shared data are isolated from each other and the producer. You can distribute workloads across multiple clusters while simplifying and consolidating the ETL ingestion pipeline into one main producer cluster, providing optimal price for performance.

Consumer clusters can in turn be producers for the data sets they own. Customers can optimize costs even further by collocating multiple tenants on the same consumer cluster. For instance, you can group low volume tier 3 tenants into a single consumer cluster to provider a lower cost offering, while high volume tier 1 tenants get their own isolated compute clusters. Consumer clusters can be created in the same account as producer or in a different AWS account. With this you can have separate billing for the consumer clusters, where you can chargeback to the business group that uses the consumer cluster or even allow your customers to use their own Redshift cluster in their account, so they pay for usage of the consumer cluster. The following diagram shows the difference in ETL and consumer access patterns in a multi-tenant architecture using data sharing versus a single cluster approach without data sharing.

Consumer clusters can in turn be producers for the data sets they own.

Multi-tenant architecture with data sharing compared to single cluster approach

Creating a multi-tenant architecture for an AaaS solution

For this post, we use a simple data model with a fact and a dimension table to demonstrate how to leverage data sharing to design a scalable multi-tenant AaaS solution. We cover detailed steps involved for each storage strategy using this data model. The tables are as follows:

  • Customer – dimension table containing customer details
  • Sales – fact table containing sales transactions

We use two Amazon Redshift ra3.4xl clusters, with 2 nodes each, and designate one cluster as producer and other as consumer.

The high-level steps involved in enabling data sharing across clusters are as follows:

  1. Create a data share in the producer cluster and assign database objects to the data share.
  2. From the producer cluster, grant usage on the data share to consumer clusters, identified by namespace or AWS account.
  3. From the consumer cluster, create an external database using the data share from the producer
  4. Query the tables in the data share through the external shared database in the consumer cluster. Grant access to other users to access this shared database and objects.

Creating producer and consumer Amazon Redshift clusters

Let us start by creating two Amazon Redshift ra3.4xl clusters with 2-nodes each, one for the producer and other for consumer.

  1. On the Amazon Redshift cluster, create two clusters of RA3 instance type, and name them ds-producer and ds-consumer-c1, respectively.
  1. Next, log in to Amazon Redshift using the query editor. You can also use a SQL client tool like DBeaver, SQL Workbench, or Aginity Workbench. For configuration information, see Connecting to an Amazon Redshift cluster using SQL client tools.

Get the cluster namespace of the producer and consumer clusters from the console. We will use the namespaces to create the tenant table and to create and access the data shares. You can also get the cluster namespaces by logging into each of the clusters and executing the SELECT CURRENT_NAMESPACE statement in the query editor.

Please note to replace the corresponding namespaces in the code sections wherever producercluster_namespace, consumercluster1_namespace, and consumercluster_namespace is referenced.

The following screenshot shows the namespace on the Amazon Redshift console.

Now that we have the clusters created, we will go through the detailed steps for the three models. First, we will cover the Pool model, followed by Bridge model and finally the Silo model.

Pool model

The pool model represents an all-in, multi-tenant model where all tenants share the same storage constructs and provides the most benefit in simplifying the AaaS solution.

With this model, data storage is centralized in one cluster database, and data is stored for all tenants in the same set of data models. To scope and control access to tenant data, we introduce a column (tenant_id) that serves as a unique identifier for each tenant.

Security management to prevent cross-tenant access is one of the main aspects to address with the pool model. We can implement row-level security and provide secure access to the data by creating database views and set application-level policies by creating groups with specific access and assigning users to the groups. The following diagram illustrates the pool model architecture.

The following diagram illustrates the pool model architecture.

To create a multi-tenant solution using the pool model, you create data shares for the pool model in the producer cluster, and share data with the consumer cluster. We provide more detail on these steps in the following sections.

Creating data shares for the pool model in the producer cluster

To create data shares for the pool model in the producer cluster, complete the following steps:

  1. Log in to the producer cluster as an admin user and run the following script.

Note that we have a tenant table to store unique identifiers for each tenant or consumer (tenant).

We add a column (tenant_id) to the sales and customer tables to uniquely identify tenant data. This tenant_id references the tenant_id in the tenant table to uniquely identify the tenant and consumer records. See the following code:

/**********************************************/
/*       datasharing datasetup pool model     */
/**********************************************/

-- Create Schema
create schema sales;

CREATE TABLE IF NOT EXISTS sales.tenant ( 
t_tenantid int8 not null,
t_name varchar(50) not null,
t_namespace varchar(50),
t_account varchar(16)
)
DISTSTYLE AUTO
SORTKEY AUTO;

-- Create tables for multi-tenant sales schema
drop table sales.customer;
CREATE TABLE IF NOT EXISTS sales.customer(
  c_tenantid int8 not null,
  c_custid int8 not null,
  c_name varchar(25) not null,
  c_region varchar(40) not null,
  Primary Key(c_tenantid, c_custid)
) 
DISTSTYLE AUTO
SORTKEY AUTO;

CREATE TABLE IF NOT EXISTS sales.sales (
  s_tenantid int8 not null,
  s_orderid int8 not null,
  s_custid int8 not null,
  s_totalprice numeric(12,2) not null,
  s_orderdate date not null,
  Primary Key(s_tenantid, s_orderid)
) 
DISTSTYLE AUTO
SORTKEY AUTO;
  1. Set up the tenant table with the details for each consumer cluster, and ingest data into the customer dimension and sales fact tables. Using the COPY command is the recommended way to ingest data into Amazon Redshift, but for illustration purposes, we use INSERT statements:
    -- Ingest data 
    insert into sales.tenant values
    (0, 'primary', '<producercluster_namespace>',''),
    (1, 'tenant1', '<consumercluster1_namespace>',''),
    (2, 'tenant2', '<consumercluster2_namespace>','');
    
    insert into sales.customer values
    (1, 1, 'Customer 1', 'NorthEast'),
    (1, 2, 'Customer 2', 'SouthEast'),
    (2, 1, 'Customer 3', 'NorthWest'),
    (2, 2, 'Customer 4', 'SouthEast');
    
    truncate table sales.sales;
    insert into sales.sales values
    (1, 1, 1, 2434.33, '2020-11-21'),
    (1, 2, 2, 54.90, '2020-5-5'),
    (1, 3, 2, 9678.99, '2020-3-8'),
    (2, 1, 2, 452.24, '2020-1-23'),
    (2, 2, 1, 76523.10, '2020-11-3'),
    (2, 3, 1, 6745.20, '2020-10-01');
    
    select count(*) from sales.tenant;
    select count(*) from sales.customer;
    select count(*) from sales.sales;

Securing data on the producer cluster by restricting access

In the pool model, no external user has direct access to underlying tables. All access is restricted using views.

  1. Create a view for each of the fact and dimension tables to include a condition to filter records from the consumer tenant’s namespace. In our example, we create v_customersales to combine sales fact and customer dimension tables with a restrictive filter for tenant.namespace = current_namespace. See the following code:
    /**********************************************/
    /* We will create late binding views          */
    /* but materialized views could also be used  */
    /**********************************************/
    
    create or replace view sales.v_customer as
    select * 
    from sales.customer c, sales.tenant t
    where c.c_tenantid = t.t_tenantid 
    and t.t_namespace  = current_namespace;
    
    create or replace view sales.v_sales as
    select * 
    from sales.sales s, sales.tenant t
    where s.s_tenantid = t.t_tenantid 
    and t.t_namespace  = current_namespace;
    
    create or replace view sales.v_customersales as 
    select c_tenantid, c_name, c_region, 
    	date_part(w, to_date(s_orderdate,'YYYY-MM-DD')) as "week", 
    	date_part(mon, to_date(s_orderdate,'YYYY-MM-DD')) as "month", 
    	date_part(dow, to_date(s_orderdate,'YYYY-MM-DD')) as "dow",
    	date_part(yr, to_date(s_orderdate,'YYYY-MM-DD')) as "year",
    	date_part(d, to_date(s_orderdate,'YYYY-MM-DD')) as "dom", 
    	t.t_namespace
    from sales.tenant t, sales.customer c, sales.sales s
    where t.t_tenantid = c.c_tenantid 
    and c.c_tenantid = s.s_tenantid 
    and c.c_custid = s.s_custid 
    and t.t_namespace = current_namespace 
    WITH NO SCHEMA BINDING;
    
    select * from sales.v_customersales;
          
    

Now that we have database objects created in the producer cluster, we can share the data with the consumer clusters.

Sharing data with the consumer cluster

To share data with the consumer cluster, complete the following steps:

  1. Create a data share for the sales data:
    /***************************************************/
    /* Create Datashare and add objects to the share    */
    /****************************************************/
    CREATE DATASHARE salesshare;
    

  1. Enter the following code to alter the data share, add the sales schema to be shared with the consumer clusters, and add all tables in the sales schema to be shared with the consumer cluster:
    /************************************************************/
    /* Add objects at desired granularities: schemas, tables,   */
    /* views include materialized, and SQL UDFs                 */
    /************************************************************/
    ALTER DATASHARE salesshare ADD SCHEMA sales;  -- New addition to create SCHEMA first
    
    /*For pool model, we share only the views and not tables */
    ALTER DATASHARE SalesShare ADD TABLE sales.v_customer;
    ALTER DATASHARE SalesShare ADD TABLE sales.v_customersales;
    

For the pool model, we share only the views with the consumer cluster and not the tables. The ALTER statement ADD TABLE is used to add both views and tables.

  1. Grant usage on the sales data share to the namespace of the BI consumer cluster. You can get the namespace of the BI cluster from the console or using the SELECT CURRENT_NAMESPACE statement in the BI cluster. See the following code:
    /********************************************************************/
    /* Grant access to consumer clusters                                */
    /* login to Consumer BI Cluster and get the Namespace from          */
    /* the Redshift console or using SELECT CURRENT_NAMESPACE           */
    /********************************************************************/
    SELECT CURRENT_NAMESPACE;
    
    --Namespace refers to the namespace GUID of the consumer cluster in the account
    GRANT USAGE ON DATASHARE salesshare TO NAMESPACE '<consumercluster1_namespace>'
    --Account numbers are 12 digit long
    GRANT USAGE ON DATASHARE salesshare TO ACCOUNT 'Consumer_AWSAccount';
    

  1. View data shares that are shared from the producer cluster:
    SELECT * FROM SVV_DATASHARES;

The following screenshot shows the output.

The following screenshot shows the output.

You can also see the data shares and their detailed objects and consumers using the following commands:

SHOW DATASHARES;
DESC DATASHARE salesshare;
select * from SVV_DATASHARE_OBJECTS;
select * from SVV_DATASHARE_CONSUMERS;

Viewing and querying data shares for the pool model from the consumer cluster

To view and query data shares from the consumer cluster, complete the following steps:

  1. Log in to the consumer cluster as an admin user and view the data share objects:
    /**********************************************************/
    /* Login to Consumer cluster as awsuser:                  */
    /* View datashares and create local database for querying */
    /**********************************************************/
    select * from SVV_DATASHARE_OBJECTS;

The following screenshot shows the results.

The following screenshot shows the results.

  1. Create a new database from the data share of the producer cluster:
    /**********************************************************/
    /* Create a local database and schema reference           */
    /**********************************************************/
    CREATE DATABASE sales_db FROM DATASHARE salesshare
    OF NAMESPACE '<producercluster_namespace>';

  1. Optionally, you can create an external schema in the consumer cluster pointing to the schema in the database of the producer cluster.

Creating a local external schema in the consumer cluster allows schema-level access controls within the consumer cluster, and uses a two-part notation when referencing shared data objects (localschema.table; vs. external_db.producerschema.table). See the following code:

/*********************************************************/
/* Create External Schema - Optional                     */
/* reason for schema: give specific access to schema     */
/* using shared alias get access to a secondary database */
/*********************************************************/
CREATE EXTERNAL SCHEMA sales_schema 
FROM REDSHIFT DATABASE 'sales_db' SCHEMA 'sales';
  1. Now you can query the shared data from the producer cluster by using the syntax tenant.schema.table:
    select * from sales_db.sales.customer;
    select * from sales_db.sales.v_customersales;

  1. From the tenant1 consumer cluster, you can view the databases and the tenants that are accessible to tenant1. tenant1_schema is as follows:
    select * from SVV_REDSHIFT_DATABASES;
    

The following screenshot shows the results.

The following screenshot shows the results.

Creating local consumer users and controlling access

You can control access to users in your consumer cluster by creating users and groups, and assigning access to the data share objects.

  1. Log in as an admin user on consumer cluster 1 and enter the following code to create tenant1_group, grant usage on the local database sales_db and schema sales_schema to the group, and assign the user tenant1_user to the tenant1_group:
    /********************************************************/
    /* Consumers can create own users and assign privileges */
    /* Create tenant1_group and assign privileges to read   */
    /* sales_db and the sales_schema                        */
    /* Create tenant1_user in tenant1_group                 */
    /********************************************************/
    create group tenant1_group;
    create user tenant1_user password 'Redshift#123!' in group tenant1_group;
    
    GRANT USAGE ON DATABASE sales_db TO tenant1_group;
    GRANT USAGE ON SCHEMA sales_schema TO GROUP tenant1_group;
    

  1. Now, login as tenant1_user to consumer cluster 1 and select data from the views v_customer and v_customersales:
    /*******************************************************/
    /* select from view returns only sales records related */
    /* to Consumer A namespace                             */
    /*******************************************************/
    select * from sales_db.sales.v_customer;
    select * from sales_db.sales.v_customersales;
    

You should see only the data relevant to tenant 1 and not the data that is associated with tenant 2.

You should see only the data relevant to tenant 1 and not the data that is associated with tenant 2.

 

Create Materialized views to optimize performance

Consumer clusters can have their own database objects which are local to the consumer. You can also create materialized views on the datashare objects and control when to refresh the dataset for your consumers. This provides another level of isolation from the producer cluster, and will ensure the consumer clusters go against their local dataset.

  1. Log in as an admin user on consumer cluster 1 and enter the following code to create a materialized view for customersales. This will create a local view that can be periodically refreshed from the consumer cluster.

 

/*******************************************************/
/* Create materialized view in consumer cluster        */
/*******************************************************/
create MATERIALIZED view tenant1_sales.mv_customersales as 
select c_tenantid, c_name, c_region, 
	date_part(w, to_date(s_orderdate,'YYYY-MM-DD')) as "week", 
	date_part(mon, to_date(s_orderdate,'YYYY-MM-DD')) as "month", 
	date_part(dow, to_date(s_orderdate,'YYYY-MM-DD')) as "dow",
	date_part(yr, to_date(s_orderdate,'YYYY-MM-DD')) as "year",
	date_part(d, to_date(s_orderdate,'YYYY-MM-DD')) as "dom", 
	t.t_namespace
from sales_db.tenant t, sales_db.customer c, sales_db.sales s
where t.t_tenantid = c.c_tenantid 
and c.c_tenantid = s.s_tenantid 
and c.c_custid = s.s_custid 
and t.t_namespace = current_namespace;

select * from tenant1_sales.mv_customersales top 100;

REFRESH MATERIALIZED VIEW tenant1_sales.mv_customersales;

 

With the preceding steps, we have demonstrated how you can control access to the tenant data in the same datastore using views. We also reviewed how data shares help efficiently share data between producer and consumer clusters with transaction consistency. We also saw how a local materialized view can be created to further isolate your BI workloads for your customers and provide a consistent, performant user experience. In the next section we will discuss the Bridge model.

Bridge model

In the bridge model, data for each tenant is stored in its own schema in a database and contains a similar set of tables. Data shares are created for each schema and shared with the corresponded consumer. This is an appealing balance between silo and pool model, providing both data isolation and ETL consolidation. With Amazon Redshift, you can create up to 9,900 schemas. For more information, see Quotas and limits in Amazon Redshift.

With data sharing, separate consumer clusters can be provisioned to use the same managed storage from producer cluster. Consumer clusters have all the capabilities of a producer cluster, and can in turn be producer clusters for data objects they own. Consumers can’t share data that is already shared with them. Without data sharing, queries from all customers are directed to a single cluster. The following diagram illustrates the bridge model.

The following diagram illustrates the bridge model.

To create a multi-tenant architecture using bridge model, complete the steps in the following sections.

Creating database schemas and tables for the bridge model in the producer cluster

As we did in the pool model, the first step is to create the database schema and tables. We log in to the producer cluster as an admin user and create separate schemas for each tenant. For our post, we create two schemas, tenant1 and tenant2, to store data for two tenants.

  1. Log in to the producer cluster as the admin user.
  1. Use the script below to create two schemas, tenant1 and tenant2, and create tables for customer dimension and sales facts under each of the two schemas. See the following code:
    /****************************************/
    /* Bridge -  Data Model */
    /****************************************/
    -- Create schemas tenant1 and tenant2
    create schema tenant1;
    create schema tenant2;
    
    -- Create tables for tenant1
    CREATE TABLE IF NOT EXISTS tenant1.customer (
      c_custid int8 not null ,
      c_name varchar(25) not null,
      c_region varchar(40) not null,
      Primary Key(c_custid)
    ) diststyle ALL sortkey(c_custid);
    
    CREATE TABLE IF NOT EXISTS tenant1.sales (
      s_orderid int8 not null,
      s_custid int8 not null,
      s_totalprice numeric(12,2) not null,
      s_orderdate date not null,
      Primary Key(s_orderid)
    ) distkey(s_orderid) sortkey(s_orderdate, s_orderid) ;
    
      
    
    -- Create tables for tenant2
    CREATE TABLE IF NOT EXISTS tenant2.customer (
      c_custid int8 not null ,
      c_name varchar(25) not null,
      c_region varchar(40) not null,
      Primary Key(c_custid)
    ) diststyle ALL sortkey(c_custid);
    
    CREATE TABLE IF NOT EXISTS tenant2.sales (
      s_orderid int8 not null,
      s_custid int8 not null,
      s_totalprice numeric(12,2) not null,
      s_orderdate date not null,
      Primary Key(s_orderid)
    ) distkey(s_orderid) sortkey(s_orderdate, s_orderid) ;
    

  1. Ingest data into the customer dimension and sales fact tables. Using the COPY command is the recommended way to ingest data into Amazon Redshift, but for illustration purposes, we use the INSERT statement:
    -- ingest data for tenant1
    -- ingest customer data
    insert into tenant1.customer values
    (1, 'Customer 1', 'NorthEast'),
    (2, 'Customer 2', 'SouthEast');
    
    -- ingest sales data
    insert into tenant1.sales values
    (1, 1, 2434.33, '2020-11-21'),
    (2, 2, 54.90, '2020-5-5'),
    (3, 2, 9678.99, '2020-3-8');
    
    select count(*) from tenant1.customer;
    select count(*) from tenant1.sales;
    
    
    -- ingest data for tenant2
    -- ingest customer data
    insert into tenant2.customer values
    (1, 'Customer 3', 'NorthWest'),
    (2, 'Customer 4', 'SouthEast');
    
    -- ingest sales data
    truncate table tenant2.sales;
    insert into tenant2.sales values
    (1, 2, 452.24, '2020-1-23'),
    (2, 1, 76523.10, '2020-11-3'),
    (3, 1, 6745.20, '2020-10-01');
    
    
    select count(*) from tenant2.customer;
    select count(*) from tenant2.sales;
    

Creating data shares and granting usage to the consumer cluster

In the following code, we create two data shares, tenant1share and tenant2share, to share the database objects under the two schemas to the respective consumer clusters.

  1. Create two datashares tenant1share and tenant2share to share the database objects under the two schemas to the respective consumer clusters.
    /******************************************************************/
    /*   Create Datashare and add database objects to the datashare   */
    /******************************************************************/
    CREATE DATASHARE tenant1share;
    CREATE DATASHARE tenant2share;

  1. Alter the datashare and add the schema(s) for respective tenants to be shared with the consumer cluster
    ALTER DATASHARE tenant1share ADD SCHEMA tenant1;
    ALTER DATASHARE tenant2share ADD SCHEMA tenant2;

  1. Alter the datashare and add all tables in the schema(s) to be shared with the consumer cluster
    ALTER DATASHARE tenant1share ADD ALL TABLES IN SCHEMA tenant1;
    ALTER DATASHARE tenant2share ADD ALL TABLES IN SCHEMA tenant2;

Getting the namespace of the first consumer cluster

  1. Log in to the consumer cluster and get the namespace from the console or by running the select current_namespace command:
    /* Grant access to tenant1 schema for Tenant1 BI Cluster */
    /* login to tenant1 BI Cluster and get the Namespace 
     * or get the Namespace from the Redshift console */
    SELECT CURRENT_NAMESPACE;

 

  1. Grant usage on the data share for the first tenant to the namespace of the BI cluster. You can get the namespace of the BI cluster from the console or using the SELECT CURRENT_NAMESPACE statement in the BI cluster:
    -- Grant usage on the datashare to the first consumer cluster
    -- Namespace refers to the namespace GUID of the consumer cluster 
    GRANT USAGE ON DATASHARE tenant1share TO NAMESPACE '<consumercluster1_namespace>'
    --Account numbers are 12 digit long (optional)
    --GRANT USAGE ON DATASHARE tenant1share TO ACCOUNT '<Consumer_AWSAccount>';

Getting the namespace of the second consumer cluster

  1. Log in to the second consumer cluster and get the namespace from the console or by running the select current_namespace command:
    /* Grant access to tenant2 schema for Tenant2 BI Cluster */
    /*login to tenant1 BI Cluster and get the Namespace      */
    SELECT CURRENT_NAMESPACE;
    

  1. Grant usage on the data share for the second tenant to the namespace of the second consumer cluster you just got from the previous step:
    -- Grant usage on the datashare to the second consumer cluster
    GRANT USAGE ON DATASHARE tenant2share TO NAMESPACE '<consumercluster2_namespace>'
    --Account numbers are 12 digit long (optional)
    --GRANT USAGE ON DATASHARE tenant2share TO ACCOUNT '<Consumer_AWSAccount>';

  1. To view data shares from the producer cluster, enter the following code:
    /******************************************************************/
    /*   View Datashares created, Datashare objects and consumers     */
    /******************************************************************/
    select * from SVV_DATASHARES;
    select * from SVV_DATASHARE_OBJECTS;
    select * from SVV_DATASHARE_CONSUMERS;

The following screenshot shows the commands in the query editor.

The following screenshot shows the commands in the query editor.

The following screenshot shows the query results.

The following screenshot shows the query results.

Accessing data using the consumer cluster from the data share

To access data using the consumer cluster, complete the following steps:

  1. Log in to the first consumer cluster ds-consumer-c1, as an admin user.
  1. View data share objects from the SVV_DATASHARE_OBJECTS system view:
    /*****************************************************/
    /* Consumer cluster as adminuser
    /* List the shares available and review contents for each 
    /********************************************************/
    -- You can view datashare objects associated with the cluster
    -- using either of the two commands
    SHOW DATASHARES;
    select * from SVV_DATASHARES;

The following screenshot shows the query results.

The following screenshot shows the query results.

--View objects shared in inbound share for consumer
select * from SVV_DATASHARE_OBJECTS;

The following screenshot shows the query results.

The following screenshot shows the query results.

--View namespace or clusters granted usage to a datashare
select * from svv_datashare_consumers;
  1. Create a local database in the first consumer cluster, and an external schema to be able to provide controlled access to the specific schema to the consumer clusters:
    /*******************************************************/
    /* Create a local database and schema reference        */
    /* to the share objects                                */
    /*******************************************************/
    CREATE DATABASE tenant1_db FROM DATASHARE tenant1share
    OF NAMESPACE '<producercluster_namespace>';

  1. Query the database tables using the three-part notation db.tenant.table:
    select * from tenant1_db.tenant1.customer;
    select * from tenant1_db.tenant1.sales;

  1. Optionally, you can create an external schema.

There are two reasons to create an external schema: either to enable two-part notation access to the tables from the consumer cluster, or to provide restricted access to the specific schemas for selected users, when multiple schemas are shared from the producer cluster. See the following code for our external schema:

/* Create External Schema */
CREATE EXTERNAL SCHEMA tenant1_schema FROM REDSHIFT DATABASE 'tenant1_db' SCHEMA 'tenant1';
  1. If you created the local schemas, you can use the following two-part notation to query the database tables:
    select * from tenant1_schema.customer;
    select * from tenant1_schema.sales;

  1. You can view the shared databases by querying the SVV_REDSHIFT_DATABASES table:
    select * from SVV_REDSHIFT_DATABASES;

The following screenshot shows the query results.

The following screenshot shows the query results.

Creating consumer users for managing access

Still logged in as an admin user to the consumer cluster, you can create other users who have access to the database objects.

  1. Create users and groups, and assign users and object privileges to the groups with the following code:
    /*******************************************************/
    /* Consumer can create own users and assign privileges */
    /* Create tenant1_user and assign privileges to        */
    /* read datashare from tenant1 schema                  */
    /*******************************************************/
    create group tenant1_group;
    create user tenant1_user password 'Redshift#123!' in group tenant1_group;
    
    GRANT USAGE ON DATABASE tenant1_db TO tenant1_user;
    GRANT USAGE ON SCHEMA tenant1_schema TO GROUP tenant1_group;
    

Now tenant1_user can log in and query the shared tables from tenant schema.

  1. Log in to the consumer cluster as tenant1_user and query the tables:
    /************************************************************/
    /* Consumer cluster as tenant1_user - As a consumer cluster */
    /* administrator, you must follow these steps:              */
    /************************************************************/
    
    select * from tenant1_db.tenant1.customer;
    select * from tenant1_db.tenant1.sales;
    
    
    /************************************************************/
    /* If you have created and External Schema                  */
    /* you can use the two-part notation to query tables.       */
    /************************************************************/
    
    select * from tenant1_schema.customer;
    select * from tenant1_schema.sales;
    

Revoking access to a data share (optional)

  1. At any point, if you want to revoke access to the data share, you can the REVOKE USAGE command:
    /*************************************************************/
    /* To revoke access at any time use the REVOKE USAGE command */
    /*************************************************************/
    --Namespace refers to the namespace GUID of the consumer cluster in the account
    REVOKE USAGE ON DATASHARE Salesshare FROM NAMESPACE '<consumercluster1_namespace>'
    --Account numbers are 12 digit long
    REVOKE USAGE ON DATASHARE Salesshare FROM ACCOUNT '<Consumer_AWSAccount>';
    

Silo model

The third option is to store data for each tenant in separate databases within a cluster. If you need your data isolated from other tenants, you can use the silo model and each database may have distinct data models, monitoring, management, and security footprints.

Amazon Redshift supports cross-database queries across databases, which allow you to simplify data organization. You can store common or granular datasets used across all tenants in a centralized database, and use the cross-database query capability to join relevant data for each tenant.

The steps to create a data share in a silo model is similar to a bridge model; however, unlike a bridge model (where data share is for each schema), the silo model has a data share created for each database. The following diagram illustrates the architecture of the silo model.

The following diagram illustrates the architecture of the silo model.

Creating data shares for the silo model in the producer cluster

To create data shares for the silo model in the producer cluster, complete the following steps:

  1. Log in to the producer cluster as an admin user and create separate databases for each tenant:
    /*****************************************************/
    /** Silo Model – Create databases for the 2 tenants **/
    /*****************************************************/
    create database tenant1_silodb;
    
    create database tenant2_silodb;
    

  1. Log in again to the producer cluster with the database name and user ID for the database that you want to share (tenant1_silodb) and create the schema and tables:
    /***********************************************************/
    /* login to tenant1_db and create schema tenant1 and tables*/
    /***********************************************************/
    create schema tenant1_siloschema;
    
    -- Create tables for tenant1
    CREATE TABLE IF NOT EXISTS tenant1_silodb.tenant1_siloschema.customer (
      c_custid int8 not null ,
      c_name varchar(25) not null,
      c_region varchar(40) not null,
      Primary Key(c_custid)
    ) diststyle ALL sortkey(c_custid);
    
    CREATE TABLE IF NOT EXISTS tenant1_silodb.tenant1_siloschema.sales (
      s_orderid int8 not null,
      s_custid int8 not null,
      s_totalprice numeric(12,2) not null,
      s_orderdate date not null,
      Primary Key(s_orderid)
    ) distkey(s_orderid) sortkey(s_orderdate, s_orderid) ;
    
    insert into tenant1_siloschema.customer values
    (1, 'Customer 1', 'NorthEast'),
    (2, 'Customer 2', 'SouthEast');
    
    truncate table tenant1_siloschema.sales;
    insert into tenant1.sales values
    (1, 1, 2434.33, '2020-11-21'),
    (2, 2, 54.90, '2020-5-5'),
    (3, 2, 9678.99, '2020-3-8');
    

  1. Create a data share with a name for the first tenant (for example, tenant1dbshare):
    /******************************************************************/
    /*   Create datashare and add database objects to the datashare   */
    /******************************************************************/
    CREATE DATASHARE tenant1_silodbshare;
    

  1. Run Alter datashare commands to add the schemas to be shared with the consumer cluster and add all tables in the schemas to be shared with the consumer cluster:
    ALTER DATASHARE tenant1_silodbshare ADD SCHEMA tenant1_siloschema;
    ALTER DATASHARE tenant1_silodbshare ADD ALL TABLES IN SCHEMA tenant1_siloschema;

  1. Grant usage on the data share for first tenant to the namespace of the BI cluster. You can get the namespace of the BI cluster from the console or using the SELECT CURRENT_NAMESPACE statement in the BI cluster:
    --Namespace refers to the namespace GUID of the consumer cluster in the account
    GRANT USAGE ON DATASHARE tenant1_silodbshare TO NAMESPACE ‘<consumercluster1_namespace>’
    --Account numbers are 12 digit long (optional)
    --GRANT USAGE ON DATASHARE tenant1_silodbshare TO ACCOUNT '<AWS-Account>';
    

Viewing and querying data shares for the silo model from the consumer cluster

To view and query your data shares, complete the following steps:

  1. Log in to the consumer cluster as an admin user.
  2. Create a new database from the data share of the producer cluster:
    CREATE DATABASE tenant1_silodb FROM DATASHARE tenant1_silodbshare
    OF NAMESPACE ‘<producercluster_namespace>’;

Now you can start querying the shared data from the producer cluster by using the syntax – tenant.schema.table. If you created an external schema, then you can also use the two-part notation to query the tables.

  1. Query the data with the following code:
    select * from tenant1_silodb.tenant1.customer;
    select * from tenant1_silodb.tenant1.sales;
    

  1. Optionally, you can create an external schema pointing to the schema in the database of the producer cluster. This allows you query shared tables using a two-part notation. See the following code:
    CREATE EXTERNAL SCHEMA tenant1_siloschema FROM REDSHIFT DATABASE 'tenant1_silodb' SCHEMA 'tenant1';
    
    --With this Schema, you can access using two-part notation to select from data share tables
    select * from tenant1_siloschema.customer;
    select * from tenant1_siloschema.sales;
    

  1. You can repeat the same steps for tenant2 to share the tenant2 database with tenant2 You can also control access to users in your consumer cluster by creating users and groups, and assigning access to the data share objects.

System views to view data shares

We have introduced new system tables and views to easily identify the data shares and related objects. You can use three different groups of system views to view the data share objects:

  • Views starting with SVV_DATASHARES – has detail of datashares and objects in a datashare.
View Name Purpose
SVV_DATASHARES View a list of data shares created on the cluster and data shares shared with the cluster
SVV_DATASHARE_OBJECTS View a list of objects in all data shares created on the cluster or shared with the cluster
SVV_DATASHARE_CONSUMERS View a list of consumers for data share created on the cluster
  • Views starting with SVV_REDSHIFT – contains details on both local and remote Redshift databases.
View Name Purpose
SVV_REDSHIFT_DATABASES List of all databases that a user has access to
SVV_REDSHIFT_SCHEMAS List of all schemas that user has access to
SVV_REDSHIFT_TABLES List of all tables that a user has access to
SVV_REDSHIFT_COLUMNS List of all columns that a user has access to
SVV_REDSHIFT_FUNCTIONS List of all functions that user has access to
  • Views starting with SVV_ALL– contain local and remote databases, external schemas including  spectrum and federated query, and external schema references to shared data.  If you create external schemas in consumer cluster, you need to use the SVV_ALL views to look at the objects.
View Name Purpose
SVV_ ALL _SCHEMAS Union of list of all schemas from SVV_REDSHIFT_SCHEMA view and consolidated list of all external tables and schemas that user has access to
SVV_ ALL _TABLES List of all tables that a user has access to
SVV_ ALL _COLUMNS List of all columns that a user has access to
SVV_ ALL _FUNCTIONS List of all functions that user has access to

Considerations for choosing a storage strategy

You can adopt a storage strategy or choose a hybrid approach based on business, technical, and operational requirements. Before deciding on a strategy, consider the quotas and limits for various objects in Amazon Redshift, and the number of databases per cluster or number of schemas per database to check if it meets your requirements. The following table summarizes these considerations.

  Pool Bridge Silo
Separation of tenant data Views Schema Database
ETL pipeline complexity Low Low Medium
Limits 100,000 tables (RA3 – 4x, 16x large clusters) 9,900 schemas per database 60 databases per cluster
Chargeback to consumer accounts Yes Yes Yes
Scalability High High High

Conclusion

In this post, we discussed how you can use the new data sharing feature of Amazon Redshift to implement an AaaS solution with a multi-tenant architecture while meeting SLAs for consumers using separate Amazon Redshift clusters. We demonstrated three types of models providing various levels of isolation for the tenant data. We compared and contrasted the models and provided guidance on when to choose an implementation model. We encourage you to try the data sharing feature to build your AaaS or software as a service (SaaS) solutions.


About the Authors

Rajesh Francis is a Sr. Analytics Specialist Solutions Architect at AWS. He specializes in Amazon Redshift and works with customers to build scalable Analytic solutions.

 

 

 

Neeraja Rentachintala is a Principal Product Manager with Amazon Redshift. Neeraja is a seasoned Product Management and GTM leader, bringing over 20 years of experience in product vision, strategy and leadership roles in data products and platforms. Neeraja delivered products in analytics, databases, data Integration, application integration, AI/Machine Learning, large scale distributed systems across On-Premise and Cloud, serving Fortune 500 companies as part of ventures including MapR (acquired by HPE), Microsoft SQL Server, Oracle, Informatica and Expedia.com.

 

Jeetesh Srivastva is a Sr. Analytics specialist solutions architect at AWS. He specializes in Amazon Redshift and works with customers to implement scalable solutions leveraging Redshift and other AWS Analytic services. He has worked to deliver on premises and cloud based analytic solutions for customers in banking & finance and hospitality industry verticals.

Querying a Vertica data source in Amazon Athena using the Athena Federated Query SDK

Post Syndicated from Kelly Ragan original https://aws.amazon.com/blogs/big-data/querying-a-vertica-data-source-in-amazon-athena-using-the-athena-federated-query-sdk/

The ability to query data and perform ad hoc analysis across multiple platforms and data stores with a single tool brings immense value to the big data analytical arena. As organizations build out data lakes with increasing volumes of data, there is a growing need to combine that data with large amounts of data in other data stores. As the variety of data increases, it becomes paramount to have a query tool to bridge two or more data stores with a single query.

Even though data lakes became popular for analytic workloads recently, it’s not uncommon to have data warehouses in addition to data lakes for various reporting and business intelligence (BI) use cases. It becomes imperative to be able to seamlessly query the data stored in the data warehouse and the data lake. To address this issue, Amazon Athena has released a feature called Athena Federated Query. Athena is an interactive query service provided by AWS that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Vertica is a columnar MPP database platform that can be deployed in the cloud or on premises, and supports exabyte scale data warehouses. With Athena Federated Query and the Vertica connector, you can now run analytical queries over a data warehouse on Vertica and a data lake in Amazon S3.

Athena Federated Query includes pre-built connectors to a variety of AWS services and databases, as well as an SDK to build custom connectors to other databases and data stores. With this feature, federated queries can pull data from a data lake in an S3 bucket and from an external data source, and then combine it into a single result set in Athena. These connectors are an extension of the Athena query engine, which translates content between Athena and the external data source. Pre-built connectors exist for Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB (with MongoDB compatibility), and Amazon Relational Database Service (Amazon RDS), as well as a JDBC connector for Amazon Redshift, MySQL, and PostgreSQL. For other types of relational databases, you can use the Athena Federated Query SDK to create a custom connector.

In this post, we demonstrate how to deploy the custom connector between Athena and a Vertica database built using the Athena Federated Query SDK. After deploying the custom connector, we demonstrate issuing federated queries and moving data from Vertica to a data lake using CREATE TABLE AS (CTAS) with a federated query.

AWS services used in the solution

The Athena Federated Query SDK is an open-source framework to build custom connectors, and comes with a connector publish tool that deploys the connector executables in an application to the AWS Serverless Application Repository. The Athena Federated Query uses an AWS Lambda function that in turn uses the application deployed to the AWS Serverless Application Repository.

A custom connector is composed of a Lambda function that utilizes three components:

  • MetadataHandler – An interface that exposes metadata information of schemas, tables, and columns from the underlying data store to Athena
  • RecordHandler – An interface that provides hooks to read data from the external source and share it with the Athena query engine in Apache Arrow columnar format
  • CompositeHandler – For managing running the MetadataHandler and RecordHandler

The Lambda function connects to the external data store using an appropriate connection protocol and sends the parsed SQL statement. In the case of Vertica, it is JDBC. The RecordHandler processes the result set produced by the external data store and passes the rows to Athena for final processing. Multiple Lambda functions are called by Athena depending on the Lambda concurrency settings to read the result set in parallel. A spill bucket is used to handle a large dataset that exceeds the Lambda server’s capacity to process the result set.

The JDBC connection established by the Lambda function to the external database is used to send the parsed SQL statement and retrieve the result set rows from the external database. This scenario works well in terms of bandwidth for smaller databases and result sets. However, you might have Vertica deployments with petabyte or exabyte data warehouses. Typical queries return result sets on the order of 10, 20, 30 gigabytes, or more. Due to the bandwidth issue with a JDBC connection, the solution presented in this post modifies the Athena Federated Query SDK to implement a different route for the transmission of large result sets from Vertica to the Athena server for final processing.

The alternate solution utilizes the Vertica EXPORT command as a wrapper around the parsed SQL statement. You can use the EXPORT command to write a result set for a SQL statement directly to an S3 bucket using Vertica’s highly parallelized write to Amazon S3 using partitioning. This solution modifies the SDK to allow Athena to read the result set in the S3 bucket, determine the number of partitions, and call subsequent Lambda functions to parallelize the read of the result set. This produces an efficient way to move a multi-gigabyte result set from Vertica to Athena with parallelized writes from Vertica to Amazon S3 and parallelized reads from Amazon S3 to Athena. When connecting to the Vertica database, the SDK uses AWS Secrets Manager to retrieve a user ID and password for a service account on the Vertica database.

Solution architecture

The following diagram shows the solution architecture for the Vertica custom connector when deployed to AWS.

The following diagram shows the solution architecture for the Vertica custom connector when deployed to AWS.

The connector components are as follows:

  1. A user issues a federated SQL query in Athena against a table in Vertica.
  2. Athena parses the query and calls a Lambda function.
  3. The Lambda function makes a call to Secrets Manager to get the user ID and password for connecting to Vertica.
  4. The connector sends an EXPORT statement wrapper with the embedded SQL statement to Vertica through the JDBC connection. For example, see the following code:
    EXPORT TO PARQUET (directory = 's3://<bucket_name>/<folder_name>, 
    Compression='Snappy', fileSizeMB=64) OVER() as   
    SELECT  
    ORDER_ID,  
    ITEM,  
    CUSTOMER_ID,
    ORDERED_DATE
    FROM SCHEMA1.ORDERS  
    WHERE CUSTOMER_ID = 2;
    

  5. Vertica processes the SQL query and writes the result set to the S3 bucket specified in the EXPORT command. Vertica parallelizes the write to S3 bucket based on the fileSizeMB parameter into as many partitions as needed for the result set.
  6. Athena calls a Lambda function to scan the S3 bucket in order to determine the number of files to read for the result set.
  7. Athena invokes multiple Lambda functions depending on the number of partitions using Amazon S3 Select. This allows Athena to parallelize the read of the S3 files.
  8. Athena combines the result set returned from Vertica with data scanned from the data lake, and returns the combined result set to the user.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • Amazon EC2 IAM role permissions – The AWS Identity and Access Management (IAM) role of the Amazon Elastic Compute Cloud (Amazon EC2) machines hosting the Vertica database must be given write permissions to the VerticaExport S3 bucket, which is created when deploying the connector.
  • Secrets Manager – The Vertica connection credentials are stored in Secrets Manager. The secret name is prefixed with Vertica– and the secret value is the connection credentials.
  • Lambda IAM role permissions – When the Lambda function is deployed to the AWS Serverless Application Repository, it creates a custom IAM role for the function to run. The custom role has the following IAM permissions in order to successfully perform the read and write functions associated with the MetadataHandler and RecordHandler:
    • AWSLambdaBasicExecutionRole
    • AWSLambdaVPCAccessExecutionRole
    • For Secrets Manager, GetSecretValue for secrets with a prefix given in SecretNameOrPrefix
    • For Amazon S3, list, read, and write permissions for SpillBucket and ExportBucket, and list permissions for all S3 buckets
    • For Athena, GetQueryExecution

Demonstration tables

To demonstrate the Athena Vertica connector capabilities, we use the following components:

  • A Vertica database running in our AWS environment.
  • A Vertica table called orders containing details of customer orders.
  • An Athena table called customer, which has an S3 bucket as a data source. This table contains information regarding customers.

The following screenshot shows the details of the customer table in Amazon S3.

The following screenshot shows the details of the customer table in Amazon S3.

The following screenshot shows the details of the orders table in Vertica.

The following screenshot shows the details of the orders table in Vertica.

Setting up the Athena Vertica connector project

To set up your connector project, complete the following steps:

  1. Create an S3 bucket in your AWS account. This is the bucket where the result set from Vertica is exported.
  2. Create another S3 bucket in your AWS account. This is the bucket where the code for the connector is stored and retrieved.
  3. Grant the IAM role of the EC2 machines hosting the Vertica database read and write permissions to the S3 result set bucket, allowing Vertica to export data to the bucket.
  4. Clone the GitHub repo in your local folder.
  5. Open the project in your preferred IDE.
  6. From the athena-query-federation directory, run mvn clean install.
  7. From the athena-vertica directory, run mvn clean install.
  8. From the athena-vertica directory, run ../tools/publish.sh <s3_code_bucket_name> athena-vertica [region] to publish the connector to your private AWS Serverless Application Repository.
  9. Upon successful completion of the script, the connector’s serverless application is published to the AWS Serverless Application Repository.

Deploying the connector

To deploy your connector, complete the following steps:

  1. On the AWS Serverless Application Repository console, choose Published Applications.
  2. On the Private Applications tab, select Show apps that create in order to see deployed applications.
  3. Choose the VerticaAthenaConnector serverless app.
  4. For AthenaCatalogName, enter the name of the connector Lambda function used when querying the Vertica tables (avc).
  5. For SecretNameOrPrefix, enter the prefix used to store the Vertica credentials in Secrets Manager (the default is Vertica-).
  6. For SpillBucket, enter the S3 bucket name where data is spilled in case the result set data volume crosses a certain limit (test-spill-bucket).
  7. For VerticaExportBucket, enter the S3 bucket where the result set from Vertica is exported (test-export-bucket).
  8. For VpcId, enter your VPC ID.

For VpcId, enter your VPC ID.

  1. For SpillPrefix, enter athena-spill.
  2. For SubnetIds, enter your subnet IDs.
  3. For VerticaConnectionString, enter the connection string of the Vertica database in the following format:
    jdbc:vertica://<host_name>:<port>/<database>?user=${vertica-username}&password=${vertica-password}
    

    Where, vertica-username and vertica-password are the secret names of the Vertica   user credentials stored in AWS Secrets Manager.

  1. Select I acknowledge that this app creates custom IAM roles.

Select I acknowledge that this app creates custom IAM roles.

  1. Choose Deploy.

Upon successful deployment, a Lambda function with the name given for AthenaCatalogName is deployed in your AWS environment. We use this function to issue federated queries to Vertica. The connector is now deployed and ready to use.

Using the connector

On the Athena console, you can query Vertica tables as shown in the following code. The value for <lambda_function> corresponds to the function you created in the previous section.

SELECT  
ORDER_ID,   
CUSTOMER_ID,
PRODUCT_ID,
PRODUCT_NAME,
ORDER_DATE
FROM "lambda:<lambda_function>".SCHEMA1.ORDERS  
WHERE CUSTOMER_ID <= 3461
ORDER BY ORDER_ID DESC

In this example, we named the function as avc. The following screenshot shows our query results.

The following screenshot shows our query results.

This demonstrates that the newly deployed connector read the user-requested columns and the Vertica source table, wrapped an EXPORT statement around the SQL statement, and ran it in Vertica. The results of this query were exported to the specified S3 bucket (test-export-bucket) in Parquet format. The connector then invoked multiple Lambda functions to read the data from the S3 bucket using Amazon S3 Select and displayed it on the Athena console. Note that currently the connector exports Vertica timestamp and timestamptz data types as a varchar data type. Therefore we need to use the date_parse(string, format) function to convert the timestamps columns into the correct data type.

We can also create an Athena table using CTAS with the result set of the Vertica query using the following query:

CREATE TABLE default.vertica_customers_table AS (
SELECT  
ORDER_ID,
CUSTOMER_ID,
PRODUCT_ID,
PRODUCT_NAME,
ORDER_DATE
FROM "lambda:<lambda_function>".SCHEMA1.ORDERS  
WHERE CUSTOMER_ID <= 3461
);

We can then use the newly created table to query the data as shown in the following screenshot.

We can then use the newly created table to query the data as shown in the following screenshot.

In addition, we can also query and join the customer data in Amazon S3 and orders data in Vertica using the following sample query:

WITH   
customer_data AS (  
  SELECT   
   	CUSTOMER_NAME,  
   	CUSTOMER_ID  
    
  FROM default.customer
  ),  
orders_data AS (  
  SELECT    
   	ORDER_ID,  
   	PRODUCT_NAME,  
   	CUSTOMER_ID   
  FROM "lambda:<lambda_function>".schema1.orders  
  )  
SELECT a.CUSTOMER_ID, b.ORDER_ID, b.PRODUCT_NAME
FROM customer_data a 
INNER JOIN orders_data b
ON a.customer_data.customer_id = b.orders_data.customer_id 
WHERE lower(b.PRODUCT_NAME) like 'pencil'
ORDER BY b.ORDER_ID DESC

This query joins the orders data in Vertica with customer data in the S3 bucket in the customer_id column and displays the results on the Athena console.

This query joins the orders data in Vertica with customer data in the S3 bucket in the customer_id column and displays the results on the Athena console.

This demonstrates the ease of performing analytics across multiple platforms and data stores.

Conclusion

In this post, we introduced the Athena Vertica connector, its solution architecture, and demonstrated how to deploy the connector using the Athena Federated Query SDK. We saw how to run SQL queries on the Vertica data source. We also learned that we can use the connector to perform extract, transform, and load operations on the data in the Vertica tables and Amazon S3, enabling us to perform faster and better analytics across multiple platforms and data sources.

For more information about Athena Federated Query, see the GitHub repo.

Special Acknowledgement

Special acknowledgement goes to the Intuit Data Engineering staff Denise McInerney – Data Architect, Sanjay Rane – Group Engineering Manager – Data, and Kannan Nagarajan – Database Architect. They helped design, review, and support the development of the custom connector and architecture.


About the Authors

Kelly RaganKelly Ragan is a Senior Data Architect, Strategic Accounts Team, AWS Professional Services. He helps customers solve big data problems and wrestle with large-scale data warehouses. In his spare time, he enjoys snow skiing, bicycling, and camping in the Pacific Northwest.

 

 

Rohit MasurRohit Masur is an Associate Big Data Consultant, Data and Analytics Team, AWS Professional Services. He helps customers architect and implement solutions on AWS to get business value out of data. In his spare time, he enjoys reading books, going on long walks, and exploring new hiking trails in the Bay Area.