Tag Archives: Analytics

New for Amazon Redshift – General Availability of Streaming Ingestion for Kinesis Data Streams and Managed Streaming for Apache Kafka

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-for-amazon-redshift-general-availability-of-streaming-ingestion-for-kinesis-data-streams-and-managed-streaming-for-apache-kafka/

Ten years ago, just a few months after I joined AWS, Amazon Redshift was launched. Over the years, many features have been added to improve performance and make it easier to use. Amazon Redshift now allows you to analyze structured and semi-structured data across data warehouses, operational databases, and data lakes. More recently, Amazon Redshift Serverless became generally available to make it easier to run and scale analytics without having to manage your data warehouse infrastructure.

To process data as quickly as possible from real-time applications, customers are adopting streaming engines like Amazon Kinesis and Amazon Managed Streaming for Apache Kafka. Previously, to load streaming data into your Amazon Redshift database, you’d have to configure a process to stage data in Amazon Simple Storage Service (Amazon S3) before loading. Doing so would introduce a latency of one minute or more, depending on the volume of data.

Today, I am happy to share the general availability of Amazon Redshift Streaming Ingestion. With this new capability, Amazon Redshift can natively ingest hundreds of megabytes of data per second from Amazon Kinesis Data Streams and Amazon MSK into an Amazon Redshift materialized view and query it in seconds.

Architecture diagram.

Streaming ingestion benefits from the ability to optimize query performance with materialized views and allows the use of Amazon Redshift more efficiently for operational analytics and as the data source for real-time dashboards. Another interesting use case for streaming ingestion is analyzing real-time data from gamers to optimize their gaming experience. This new integration also makes it easier to implement analytics for IoT devices, clickstream analysis, application monitoring, fraud detection, and live leaderboards.

Let’s see how this works in practice.

Configuring Amazon Redshift Streaming Ingestion
Apart from managing permissions, Amazon Redshift streaming ingestion can be configured entirely with SQL within Amazon Redshift. This is especially useful for business users who lack access to the AWS Management Console or the expertise to configure integrations between AWS services.

You can set up streaming ingestion in three steps:

  1. Create or update an AWS Identity and Access Management (IAM) role to allow access to the streaming platform you use (Kinesis Data Streams or Amazon MSK). Note that the IAM role should have a trust policy that allows Amazon Redshift to assume the role.
  2. Create an external schema to connect to the streaming service.
  3. Create a materialized view that references the streaming object (Kinesis data stream or Kafka topic) in the external schemas.

After that, you can query the materialized view to use the data from the stream in your analytics workloads. Streaming ingestion works with Amazon Redshift provisioned clusters and with the new serverless option. To maximize simplicity, I am going to use Amazon Redshift Serverless in this walkthrough.

To prepare my environment, I need a Kinesis data stream. In the Kinesis console, I choose Data streams in the navigation pane and then Create data stream. For the Data stream name, I use my-input-stream and then leave all other options set to their default value. After a few seconds, the Kinesis data stream is ready. Note that by default I am using on-demand capacity mode. In a development or test environment, you can choose provisioned capacity mode with one shard to optimize costs.

Now, I create an IAM role to give Amazon Redshift access to the my-input-stream Kinesis data streams. In the IAM console, I create a role with this policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:*:123412341234:stream/my-input-stream"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:ListShards"
            ],
            "Resource": "*"
        }
    ]
}

To allow Amazon Redshift to assume the role, I use the following trust policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "redshift.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

In the Amazon Redshift console, I choose Redshift serverless from the navigation pane and create a new workgroup and namespace, similar to what I did in this blog post. When I create the namespace, in the Permissions section, I choose Associate IAM roles from the dropdown menu. Then, I select the role I just created. Note that the role is visible in this selection only if the trust policy allows Amazon Redshift to assume it. After that, I complete the creation of the namespace using the default options. After a few minutes, the serverless database is ready for use.

In the Amazon Redshift console, I choose Query editor v2 in the navigation pane. I connect to the new serverless database by choosing it from the list of resources. Now, I can use SQL to configure streaming ingestion. First, I create an external schema that maps to the streaming service. Because I am going to use simulated IoT data as an example, I call the external schema sensors.

CREATE EXTERNAL SCHEMA sensors
FROM KINESIS
IAM_ROLE 'arn:aws:iam::123412341234:role/redshift-streaming-ingestion';

To access the data in the stream, I create a materialized view that selects data from the stream. In general, materialized views contain a precomputed result set based on the result of a query. In this case, the query is reading from the stream, and Amazon Redshift is the consumer of the stream.

Because streaming data is going to be ingested as JSON data, I have two options:

  1. Leave all the JSON data in a single column and use Amazon Redshift capabilities to query semi-structured data.
  2. Extract JSON properties into their own separate columns.

Let’s see the pros and cons of both options.

The approximate_arrival_timestamp, partition_key, shard_id, and sequence_number columns in the SELECT statement are provided by Kinesis Data Streams. The record from the stream is in the kinesis_data column. The refresh_time column is provided by Amazon Redshift.

To leave the JSON data in a single column of the sensor_data materialized view, I use the JSON_PARSE function:

CREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS
    SELECT approximate_arrival_timestamp,
           partition_key,
           shard_id,
           sequence_number,
           refresh_time,
           JSON_PARSE(kinesis_data, 'utf-8') as payload    
      FROM sensors."my-input-stream";
CREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS
SELECT approximate_arrival_timestamp,
partition_key,
shard_id,
sequence_number,
refresh_time,
JSON_PARSE(kinesis_data) as payload 
FROM sensors."my-input-stream";

Because I used the AUTO REFRESH YES parameter, the content of the materialized view is automatically refreshed when there is new data in the stream.

To extract the JSON properties into separate columns of the sensor_data_extract materialized view, I use the JSON_EXTRACT_PATH_TEXT function:

CREATE MATERIALIZED VIEW sensor_data_extract AUTO REFRESH YES AS
    SELECT approximate_arrival_timestamp,
           partition_key,
           shard_id,
           sequence_number,
           refresh_time,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'sensor_id')::VARCHAR(8) as sensor_id,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'current_temperature')::DECIMAL(10,2) as current_temperature,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'status')::VARCHAR(8) as status,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'event_time')::CHARACTER(26) as event_time
      FROM sensors."my-input-stream";

Loading Data into the Kinesis Data Stream
To put data in the my-input-stream Kinesis Data Stream, I use the following random_data_generator.py Python script simulating data from IoT sensors:

import datetime
import json
import random
import boto3

STREAM_NAME = "my-input-stream"


def get_random_data():
    current_temperature = round(10 + random.random() * 170, 2)
    if current_temperature > 160:
        status = "ERROR"
    elif current_temperature > 140 or random.randrange(1, 100) > 80:
        status = random.choice(["WARNING","ERROR"])
    else:
        status = "OK"
    return {
        'sensor_id': random.randrange(1, 100),
        'current_temperature': current_temperature,
        'status': status,
        'event_time': datetime.datetime.now().isoformat()
    }


def send_data(stream_name, kinesis_client):
    while True:
        data = get_random_data()
        partition_key = str(data["sensor_id"])
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key)


if __name__ == '__main__':
    kinesis_client = boto3.client('kinesis')
    send_data(STREAM_NAME, kinesis_client)

I start the script and see the records that are being put in the stream. They use a JSON syntax and contain random data.

$ python3 random_data_generator.py
{'sensor_id': 66, 'current_temperature': 69.67, 'status': 'OK', 'event_time': '2022-11-20T18:31:30.693395'}
{'sensor_id': 45, 'current_temperature': 122.57, 'status': 'OK', 'event_time': '2022-11-20T18:31:31.486649'}
{'sensor_id': 15, 'current_temperature': 101.64, 'status': 'OK', 'event_time': '2022-11-20T18:31:31.671593'}
...

Querying Streaming Data from Amazon Redshift
To compare the two materialized views, I select the first ten rows from each of them:

  • In the sensor_data materialized view, the JSON data in the stream is in the payload column. I can use Amazon Redshift JSON functions to access data stored in JSON format.Console screenshot.
  • In the sensor_data_extract materialized view, the JSON data in the stream has been extracted into different columns: sensor_id, current_temperature, status, and event_time.Console screenshot.

Now I can use the data in these views in my analytics workloads together with the data in my data warehouse, my operational databases, and my data lake. I can use the data in these views together with Redshift ML to train a machine learning model or use predictive analytics. Because materialized views support incremental updates, the data in these views can be efficiently used as a data source for dashboards, for example, using Amazon Redshift as a data source for Amazon Managed Grafana.

Availability and Pricing
Amazon Redshift streaming ingestion for Kinesis Data Streams and Managed Streaming for Apache Kafka is generally available today in all commercial AWS Regions.

There are no additional costs for using Amazon Redshift streaming ingestion. For more information, see Amazon Redshift pricing.

It’s never been easier to use low-latency streaming data in your data warehouse and in your data lake. Let us know what you build with this new capability!

Danilo

Preview: Amazon Security Lake – A Purpose-Built Customer-Owned Data Lake Service

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/preview-amazon-security-lake-a-purpose-built-customer-owned-data-lake-service/

To identify potential security threats and vulnerabilities, customers should enable logging across their various resources and centralize these logs for easy access and use within analytics tools. Some of these data sources include logs from on-premises infrastructure, firewalls, and endpoint security solutions, and when utilizing the cloud, services such as Amazon Route 53, AWS CloudTrail, and Amazon Virtual Private Cloud (Amazon VPC).

The Amazon Simple Storage Service (Amazon S3) and AWS Lake Formation simplify the creation and management of a data lake on AWS. But, some customers’ security teams still struggle to define and implement security domain–specific aspects, such as data normalization, which requires them to analyze each log source’s structure and fields, define schemas and mappings, and pull in data enrichment such as threat intelligence.

Today we are announcing the preview release of Amazon Security Lake, a purpose-built service that automatically centralizes an organization’s security data from cloud and on-premises sources into a purpose-built data lake stored in your account. Amazon Security Lake automates the central management of security data, normalizing from integrated AWS services and third-party services and managing the lifecycle of data with customizable retention and also automates storage tiering.

Here are the key features of Amazon Security Lake:

  • Variety of supported log and event sources – During the preview, Amazon Security Lake automatically collects logs for AWS CloudTrail, Amazon VPC, Amazon Route 53, Amazon S3, and AWS Lambda, as well as security findings via AWS Security Hub for AWS Config, AWS Firewall Manager, Amazon GuardDuty, AWS Health Dashboard, AWS IAM Access Analyzer, Amazon Inspector, Amazon Macie, and AWS Systems Manager Patch Manager. Additionally, over 50 sources of third-party security findings can be sent to Amazon Security Lake. Security Partners are also directly sending data in a standard schema called the Open Cybersecurity Schema Framework (OCSF) format to Amazon Security Lake, such as Cisco Security, CrowdStrike, Palo Alto Networks, and more.
  • Data transformation and normalization – Security Lake automatically partitions and converts incoming log data to a storage and query-efficient Apache Parquet and OCSF format, making the data broadly and immediately usable for security analytics without the need for post-processing. Security Lake supports integrations with analytics partners such as IBM, Splunk, Sumo Logic, and more to address a variety of security use cases such as threat detection, investigation, and incident response.
  • Customizable data access levels – You can configure the level of subscribers consuming data stored in the Security Lake, such as specific data sources for data access to all new objects or directly querying data stored. You can also specify a rollup Region that the Security Lake is available in and multiple AWS accounts across your AWS Organizations. This can help you comply with data residency compliance requirements.

By reducing the operational overhead of security data management, you can make it easier to gather more security signals from across your organization and analyze that data to improve the protection of your data, applications, and workloads.

Configure Your Security Lake for Collection Data
To get started with Amazon Security Lake, choose Get started in the AWS console. You can enable log and event sources for all Regions and all accounts.

You can select log and event sources such as CloudTrail logs, VPC flow logs, and Route53 resolver logs into your data lake. Select Regions will contribute their data to your data lake with the Amazon S3-managed encryption that Amazon S3 will create and manage all encryption keys, as well as the specific AWS accounts in your organizations.

Next, you can select rollup and contributing Regions. All aggregated data from contributing Regions reside in the rollup Region. You can create multiple rollup Regions, which can help you comply with data residency compliance requirements. Optionally, you can define the Amazon S3 storage classes and the retention period you want the data to transition from the standard Amazon S3 storage classes used in Security Lake.

After initial configuration, choose Sources in the left pane of the console if you can add or remove log sources in your Regions or account.

You can also collect data from custom sources, such as Bind DNS logs, endpoint telemetry logs, on-premise Netflow logs, and so on. Before adding a custom source, you need to create AWS IAM role to grant permissions for AWS Glue.

To create a custom data source, choose Create custom source in the left menu of Custom sources.

It requires you to enter the IAM role Amazon Resource Names (ARNs) to write data to Security Lake and invoke AWS Glue on your behalf. Then, you can provide details about your custom source.

For efficient data processing and querying, objects from your custom sources should be partitioned by AWS Region, AWS account, year, month, day, and hour with a Parquet-formatted object.

Consume Your Data from Security Lake
Now you can create a subscriber, a service that consumes logs and events from Security Lake. To add or see your subscribers, choose Subscribers in the left pane of the console.

The Security Lake supports two types of subscriber data access methods:

  • Data access (Amazon S3) – Subscribers are notified of new objects for a source as the data is written to your Security Lake S3 bucket. You can choose to notify subscribers of new objects with an Amazon Simple Queue Service (Amazon SQS) queue or through messaging to an HTTPS endpoint provided by the subscriber. This type is useful to ingest selected data in your analytics application—good for use cases that require frequent access to data.
  • Query access (Lake Formation) – Subscribers can consume data by directly querying AWS Lake Formation tables in your S3 bucket through services like Amazon Athena. This type is useful to provide on-demand query access to data without the need to pre-ingest anything and for use cases that require infrequent access or on large volume sources too expensive to ingest upfront or retain in analytics tools.

When you add a subscriber, you can choose Amazon S3 to create data access for the subscriber. If you select the default method of notification, you can receive the following object notification message in either an HTTPS endpoint or Amazon SQS.

{
  "source": "aws.s3",
  "time": "2021-11-12T00:00:00Z",
  "region": "ca-central-1",
  "resources": [
    "arn:aws:s3:::example-bucket"
  ],
  "detail": {
    "bucket": {
      "name": "example-bucket"
    },
    "object": {
      "key": "example-key",
      "size": 5,
      "etag": "b57f9512698f4b09e608f4f2a65852e5"
    },
    "request-id": "N4N7GDK58NMKJ12R",
    "requester": "123456789012"
  }
}

Subscribers with query access can directly query data that is stored in Security Lake by using services like Amazon Athena and other services that can read from AWS Lake Formation. The following are sample queries of CloudTrail data.

SELECT 
      time, 
      api.service.name, 
      api.operation, 
      api.response.error, 
      api.response.message, 
      src_endpoint.ip 
    FROM ${athena_db}.${athena_table}
    WHERE eventHour BETWEEN '${query_start_time}' and '${query_end_time}' 
      AND api.response.error in (
        'Client.UnauthorizedOperation',
        'Client.InvalidPermission.NotFound',
        'Client.OperationNotPermitted',
        'AccessDenied')
    ORDER BY time desc
    LIMIT 25

Subscribers only have access to source data in the AWS Region that you’ve selected when you create the subscriber. To give a subscriber access to data from multiple Regions, you can set the Region where you create your subscriber as a rollup Region.

Third-Party Integrations
For supported third-party integrations, there are a number of sources as well as subscribing services integrated with Amazon Security Lake.

Amazon Security Lake supports third-party sources providing OCSF security data, including Barracuda Networks, Cisco, Cribl, CrowdStrike, CyberArk, Lacework, Laminar, Netscout, Netskope, Okta, Orca, Palo Alto Networks, Ping Identity, SecurityScorecard, Tanium, The Falco Project, Trend Micro, Vectra AI, VMware, Wiz, and Zscaler.

You can also use third-party security, automation, and analytics tools supporting Security Lake, including Datadog, IBM, Rapid7, Securonix, SentinelOne, Splunk, Sumo Logic, and Trellix. There are also service partners such as Accenture, Atos, Deloitte, DXC, Kyndryl, PWC, Rackspace, and Wipro that can work with you and Amazon Security Lake.

Join the Preview
The preview release of Amazon Security Lake is now available in the US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Sydney), Asia Pacific (Tokyo), Europe (Frankfurt), and Europe (Ireland) Regions.

To learn more, see the Amazon Security Lake page and Amazon Security Lake User Guide. We want to hear more feedback during the preview. Please send feedback in AWS re:Post and through your usual AWS support contacts.

Channy

New analytical questions available in Amazon QuickSight Q: “Why” and “Forecast”

Post Syndicated from Shannon Kalisky original https://aws.amazon.com/blogs/big-data/new-analytical-questions-available-in-amazon-quicksight-q-why-and-forecast/

Amazon QuickSight Q uses machine learning (ML) to enable any user to ask questions about business data in natural language and receive accurate answers with relevant visualizations in seconds. Today, Amazon QuickSight announces support for two new question types that simplify and scale complex analytical tasks using natural language: “forecast” and “why.”

In this post, we explore each of these new question types with examples of how to use them.

Prerequisites

The features explored in this post are part of QuickSight Q. If you’re an existing QuickSight user, be sure that the Q add-on is enabled. For steps on how to do this, see Getting Started with Amazon QuickSight Q.

Forecasting questions

Customers often ask how they can forecast future business performance. This is a useful tool to understand if things are proceeding well, or if some action may be needed to get back on track. Forecasting uses historic data to project metrics into the future.

Creating forecasts is often the job of analysts or data scientists. However, the new forecasting question type in Q enables non-analyst users to predict future trajectories for up to three measures simultaneously. Rather than learning formulas or parameter settings, you can get a forecast by entering forecast into the language bar, followed by up to three metrics that you want to see predictions for. This natural language approach is an easy and intuitive way for managers and others who depend on data to get a sense of what’s likely to happen if things don’t change.

Although the experience of creating a forecast in Q is simple, under the hood is a proven and robust forecasting algorithm called Random Cut Forest (RCF). For more information, see How RCF is applied to generate forecasts.

How to ask a forecasting question

To ask a forecasting question, start the question with the word forecast or the phrase Show me a forecast. The minimum information needed to create a forecast is one of these two question starters, plus the measure you want to forecast. For example, Forecast sales is enough to generate a forecast, as shown in the following screenshot.

Forecasting in Q also supports filters. Filters are applied by adding information to the question. The following example shows using a filter in a forecast statement.

Q allows you to forecast up to three numeric measures in a single question. The following example shows a forecast of sales, profit, and quantity.

If the data you have is dense, it can cause the forecast to be crowded into the right side of the visual. Adjusting the time granularity to a coarser step, such as going from weekly granularity to monthly, will help make the visual easier to read. To do this, simply specify the desired time granularity in the question. The following example shows a different view of the previous example grouped by month instead of week.

Note that at this release forecasting in Q doesn’t support dimensional group-by functionality. Dimensional group-bys split the forecast by the different values in a categorical field, for example: Show me a forecast of sales by region.

Why questions

“Why” is one of the most fundamental questions people ask. For many organizations, understanding why is the key to delighting customers, driving innovation, and outmaneuvering the competition. However, manually analyzing a body of data to discover contributing changes is difficult, time-consuming, and requires special analytics skill.

The new why question type enables business users to instantly get insights previously only accessible to trained analysts. Business users need to understand what contributed to changes in their data, so they can make decisions about what action to take. Why questions are easy to ask and natural to think of, so business users can quickly pinpoint insights they need to know.

When you ask a why question in Q, you trigger an on-the-fly contribution analysis that will automatically identify the key drivers of change for the measure you asked about and quantify which value from each driver contributed the most to that change. This gives you an idea of the relative influence each value had to the measure.

How to ask a why question

A why question needs three things:

  • To start with the word “why.”
  • A numeric measure, such as sales, enrollment numbers, profit, price, and so on.
  • A date or time span, such as last quarter, January 2022, or last month. Note that at this release the time span should be complete, but asking about ongoing spans such as “this week” or “this year” or specifying the current month will not yet work.

Why questions often start from seeing something that sparks our curiosity. For example, if I were an administrator reviewing student enrollment and I saw the following visual, I would naturally wonder “Why did enrollment drop in 2021?”

Now we can ask just that, as shown in the following screenshot.

The why answer identifies up to four key drivers (shown in the blue ovals on the left side of the answer), which get unpacked into contribution narratives (center of the answer) that describe the specific value from the key driver that played the biggest role. On the right side of the answer is a quick-view KPI that summarizes the change in the key driver value. Note that you may need to mouse over and scroll in the Q answer pane to see all the drivers.

Refining why questions

In the why answer displayed in the previous example, enrollment dropped more in the fall semester, which is why it appears as a top contributor to the drop in enrollment. To drill into the factors that influenced the drop in fall enrollment, you can ask more precise questions. In this case, adding in the fall to the end of the question focuses the analysis on just the fall semester.

Focusing on fall brings more specific metrics, and reveals gender is an additional key driver specific to that semester.

You can explore additional drivers by choosing the driver and changing to a different field. This can be a helpful way understand the impact of another variable or to avoid redundancy if the data structure led Q to recommend two very similar or overlapping dimensions.

In the following example, we can change State to Student Classification to explore if the drop in enrollment disproportionately impacted any particular student group, such as freshman or graduate students.

In the following result, enrollment from juniors (third-year students) was much lower than it was in 2020, and represents a large portion of the drop in enrollment.

Conclusion

With why and forecasting questions, business users can dig deeper to understand the contributing factors of metric changes or model potential growth. These new question types are available at no additional cost for all Q customers.

The examples used in post utilize the sample QuickSight topics that come included with your QuickSight subscription. For forecasting, we used the Software Sales sample topic, and for why questions, we used the Student Enrollment sample topic. To try the questions on your own, activate the applicable sample topic.


About the author

Shannon Kalisky is a Senior Product Manager – Technical that covers natural language question patterns and model robustness for Amazon QuickSight Q.

New – Amazon Redshift Integration with Apache Spark

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/new-amazon-redshift-integration-with-apache-spark/

Apache Spark is an open-source, distributed processing system commonly used for big data workloads. Spark application developers working in Amazon EMR, Amazon SageMaker, and AWS Glue often use third-party Apache Spark connectors that allow them to read and write the data with Amazon Redshift. These third-party connectors are not regularly maintained, supported, or tested with various versions of Spark for production.

Today we are announcing the general availability of Amazon Redshift integration for Apache Spark, which makes it easy to build and run Spark applications on Amazon Redshift and Redshift Serverless, enabling customers to open up the data warehouse for a broader set of AWS analytics and machine learning (ML) solutions.

With Amazon Redshift integration for Apache Spark, you can get started in seconds and effortlessly build Apache Spark applications in a variety of languages, such as Java, Scala, and Python.

Your applications can read from and write to your Amazon Redshift data warehouse without compromising on the performance of the applications or transactional consistency of the data, as well as performance improvements with pushdown optimizations.

Amazon Redshift integration for Apache Spark builds on an existing open source connector project and enhances it for performance and security, helping customers gain up to 10x faster application performance. We thank the original contributors on the project who collaborated with us to make this happen. As we make further enhancements we will continue to contribute back into the open source project.

Getting Started with Spark Connector for Amazon Redshift
To get started, you can go to AWS analytics and ML services, use data frame or Spark SQL code in a Spark job or Notebook to connect to the Amazon Redshift data warehouse, and start running queries in seconds.

In this launch, Amazon EMR 6.9, EMR Serverless, and AWS Glue 4.0 come with the pre-packaged connector and JDBC driver, and you can just start writing code. EMR 6.9 provides a sample notebook, and EMR Serverless provides a sample Spark Job too.

First, you should set AWS Identity and Access Management (AWS IAM) authentication between Redshift and Spark, between Amazon Simple Storage Service (Amazon S3) and Spark, and between Redshift and Amazon S3. The following diagram describes the authentication between Amazon S3, Redshift, the Spark driver, and Spark executors.

For more information, see Identity and access management in Amazon Redshift in the AWS documentation.

Amazon EMR
If you already have an Amazon Redshift data warehouse and the data available, you can create the database user and provide the right level of grants to the database user. To use this with Amazon EMR, you need to upgrade to the latest version of the Amazon EMR 6.9 that has the packaged spark-redshift connector. Select the emr-6.9.0 release when you create an EMR cluster on Amazon EC2.

You can use EMR Serverless to create your Spark application using the emr-6.9.0 release to run your workload.

EMR Studio also provides an example Jupyter Notebook configured to connect to an Amazon Redshift Serverless endpoint leveraging sample data that you can use to get started quickly.

Here is a Scalar example to build your applications both with Spark Dataframe and Spark SQL. Use IAM-based credentials for connecting to Redshift and use IAM role for unloading and loading data from S3.

// Create the JDBC connection URL and define the Redshift context
val jdbcURL = "jdbc:redshift:iam://<RedshiftEndpoint>:<Port>/<Database>?DbUser=<RsUser>"
val rsOptions = Map (
  "url" -> jdbcURL,
  "tempdir" -> tempS3Dir, 
  "aws_iam_role" -> roleARN,
  )
// Reference the sales table from Redshift 
val sales_df = spark
  .read 
  .format("io.github.spark_redshift_community.spark.redshift") 
  .options(rsOptions) 
  .option("dbtable", "sales") 
  .load() 
sales_df.createOrReplaceTempView("sales") 
// Reference the date table from Redshift using Data Frame 
sales_df.join(date_df, sales_df("dateid") === date_df("dateid"))
  .where(col("caldate") === "2008-01-05")
  .groupBy().sum("qtysold")
  .select(col("sum(qtysold)"))
  .show() 

If Amazon Redshift and Amazon EMR are in different VPCs, you have to configure VPC peering or enable cross-VPC access. Assuming both Amazon Redshift and Amazon EMR are in the same virtual private cloud (VPC), you can create a Spark job or Notebook and connect to the Amazon Redshift data warehouse and write Spark code to use the Amazon Redshift connector.

To learn more, see Use Spark on Amazon Redshift with a connector in the AWS documentation.

AWS Glue
When you use AWS Glue 4.0, the spark-redshift connector is available both as a source and target. In Glue Studio, you can use a visual ETL job to read or write to a Redshift data warehouse simply by selecting a Redshift connection to use within a built-in Redshift source or target node.

The Redshift connection contains Redshift connection details along with the credentials needed to access Redshift with the proper permissions.

To get started, choose Jobs in the left menu of the Glue Studio console. Using either of the Visual modes, you can easily add and edit a source or target node and define a range of transformations on the data without writing any code.

Choose Create and you can easily add and edit a source, target node, and the transform node in the job diagram. At this time, you will choose Amazon Redshift as Source and Target.

Once completed, the Glue job can be executed on Glue for the Apache Spark engine, which will automatically use the latest spark-redshift connector.

The following Python script shows an example job to read and write to Redshift with dynamicframe using the spark-redshift connector.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("================ DynamicFrame Read ===============")
url = "jdbc:redshift://<RedshiftEndpoint>:<Port>/dev"
read_options = {
    "url": url,
    "dbtable": dbtable,
    "redshiftTmpDir": redshiftTmpDir,
    "tempdir": redshiftTmpDir,
    "aws_iam_role": aws_iam_role,
    "autopushdown": "true",
    "include_column_list": "false"
}

redshift_read = glueContext.create_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options=read_options
) 

print("================ DynamicFrame Write ===============")

write_options = {
    "url": url,
    "dbtable": dbtable,
    "user": "awsuser",
    "password": "Password1",
    "redshiftTmpDir": redshiftTmpDir,
    "tempdir": redshiftTmpDir,
    "aws_iam_role": aws_iam_role,
    "autopushdown": "true",
    "DbUser": "awsuser"
}

print("================ dyf write result: check redshift table ===============")
redshift_write = glueContext.write_dynamic_frame.from_options(
    frame=redshift_read,
    connection_type="redshift",
    connection_options=write_options
)

When you set up your job detail, you can only use the Glue 4.0 – Supports spark 3.3 Python 3 version for this integration.

To learn more, see Creating ETL jobs with AWS Glue Studio and Using connectors and connections with AWS Glue Studio in the AWS documentation.

Gaining the Best Performance
In the Amazon Redshift integration for Apache Spark, the Spark connector automatically applies predicate and query pushdown to optimize for performance. You can gain performance improvement by using the default Parquet format for the connector used for unloading with this integration.

As the following sample code shows, the Spark connector will turn the supported function into a SQL query and run the query in Amazon Redshift.

import sqlContext.implicits._val
sample= sqlContext.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url",jdbcURL )
.option("tempdir", tempS3Dir)
.option("unload_s3_format", "PARQUET")
.option("dbtable", "event")
.load()

// Create temporary views for data frames created earlier so they can be accessed via Spark SQL
sales_df.createOrReplaceTempView("sales")
date_df.createOrReplaceTempView("date")
// Show the total sales on a given date using Spark SQL API
spark.sql(
"""SELECT sum(qtysold)
| FROM sales, date
| WHERE sales.dateid = date.dateid
| AND caldate = '2008-01-05'""".stripMargin).show()

Amazon Redshift integration for Apache Spark adds pushdown capabilities for operations such as sort, aggregate, limit, join, and scalar functions so that only the relevant data is moved from the Redshift data warehouse to the consuming Spark application, thereby improving performance.

Available Now
The Amazon Redshift integration for Apache Spark is now available in all Regions that support Amazon EMR 6.9, AWS Glue 4.0, and Amazon Redshift. You can start using the feature directly from EMR 6.9 and Glue Studio 4.0 with the new Spark 3.3.0 version.

Give it a try, and please send us feedback either in the AWS re:Post for Amazon Redshift or through your usual AWS support contacts.

Channy

Preview: Amazon OpenSearch Serverless – Run Search and Analytics Workloads without Managing Clusters

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/preview-amazon-opensearch-serverless-run-search-and-analytics-workloads-without-managing-clusters/

Most AWS analytics services have compelling serverless offerings that make it even easier for customers to analyze vast amounts of data without having to configure, scale, or manage the underlying infrastructure.

Along with other serverless analytics, such as Amazon QuickSight for business intelligence and AWS Glue for data integration, we have introduced Amazon EMR Serverless, Amazon MSK Serverless, and Amazon Redshift Serverless this year.

Today, we announce the preview release of a new serverless option for Amazon OpenSearch Service that makes it easy for customers to run large-scale search and analytics workloads without managing clusters. It automatically provisions and scales the underlying resources to deliver fast data ingestion and query responses for even the most demanding and unpredictable workloads, eliminating the need to configure and optimize clusters.

With Amazon OpenSearch Serverless, you do not need to account for factors that are hard to know in advance, such as the frequency and complexity of queries or the volume of data expected to be analyzed. Instead of managing infrastructure, you can focus on using OpenSearch for exploring and deriving insights from your data. You can also get started using familiar APIs to load and query data and use OpenSearch Dashboards for interactive data analysis and visualization.

Configure Your OpenSearch Serverless Collection
To get started with Amazon OpenSearch Serverless, you create a Collection via the AWS Management Console, AWS Command-Line Interface (AWS CLI), or AWS API.

Before the launch of OpenSearch Serverless, you created a managed cluster, specifying instance types, counts, and storage options, and then managed the lifecycle and shard strategy for indices within that cluster. With OpenSearch Serverless, you create a Collection, which manages a group of indices that work together to support a specific workload. You no longer need to specify the hardware or manage the indices directly.

To create an OpenSearch Serverless collection and secure data, set up Encryption policies to assign AWS KMS keys to one or more collections and attach Network policies to collections to control the access from specified VPCs and public IP addresses.

To create an encryption policy, choose Encryption policies in the left navigation pane and Create encryption policy. Encryption at rest secures the indices within your collection. For each collection, AWS KMS generates a unique, symmetric encryption key. Encryption policies are the optimal way to manage AWS KMS keys across multiple collections. You can define the target collection name or a prefix that automatically applies the encryption settings from this policy to the collection.

In order for users to access a collection, choose Network policies in the left navigation pane and Create network policy. Network policies determine whether your collection is accessible over the internet from public networks or whether it must be accessed through OpenSearch Serverless–managed VPC endpoints.

You can define multiple rules for each collection, either the Public or VPC, as a recommended option for the Access Type. If you select a public option, you can access the collection from OpenSearch Dashboards.

Also, you can configure access for OpenSearch Dashboards and the OpenSearch endpoint. For the Resource type, enable both Access to OpenSearch endpoints and Access to OpenSearch Dashboards. In both input boxes, select the Collection Name property and your collection name or prefix.

Finally, to create an OpenSearch Serverless collection, choose Create collection in the home page or choose Collections in the left navigation pane and choose Create collection.

Input your collection name, description, and collection type, either Time series or Search by your data type.

  • Time series – The log analytics segment that focuses on analyzing large volumes of semistructured, machine-generated data in real time for operational, security, user behavior, and business insights.
  • Search – Full-text search that powers applications in your internal networks (content management systems, legal documents) and internet-facing applications such as e-commerce website search and content search.

When you choose Create, a collection typically takes less than a minute to initialize.

Upload and Search Data in Your Collection
Before uploading and searching data in your collection, configure the IAM policy to access the actual data within a collection. Choose Data access policies in the left navigation pane and Create data access policy.

You can apply multiple policies simultaneously to the same resource. Each policy contains a set of rules. Each rule has a resource (collection or index), permissions for the resource, and a list of principals (IAM users, role ARNs, or SAML identities).

Here is a sample policy that provides a single user the minimum permissions required to create an index in your collection, index some data, and search for it. Replace the principal ARN with the ARN of the account that you’ll use to sign in to OpenSearch Dashboards.

[
  {
    "Rules": [
      {
        "ResourceType": "index",
        "Resource": [
          "index/books/*"
        ],
        "Permission": [
          "aoss:CreateIndex",
          "aoss:ReadDocument",
          "aoss:UpdateIndex",
          "aoss:DeleteIndex",
          "aoss:WriteDocument"
        ]
      }
    ],
    "Principal": [
      "arn:aws:iam::123456789012:user/admin"
    ]
  }
]

Now, you can upload data to an OpenSearch Serverless collection using Postman or curl. You can also use Dev Tools within the OpenSearch Dashboards console. Choose OpenSearch Dashboards on the detail page of your collection.

Sign in to OpenSearch Dashboards using the AWS access and secret keys for the principal that you specified in your data access policy. Within OpenSearch Dashboards, open the left navigation menu and choose Dev Tools.

To create a single index called books-index, run PUT books-index, and index your first single document into books-index.

You can also query search data in Dev Tools.

GET books_index/_search
{
    "query": {
    "simple_query_string": {
    "query": "Jeff",
    "fields": ["author"]
    } 
  }
}

In the case of time-series data, you can ingest data with all of the streaming ingestion options, such as native OpenSearch streaming APIs, Amazon Kinesis Data Firehose, AWS Glue, and a wide range of open-source streaming ingestion pipelines like Logstash, FluentBit, Fluentd, and Data Prepper.

In addition, you can snapshot your data from a managed cluster on OpenSearch Service and restore it to your collection, making it easy to migrate your workloads. Once your data is in your collection, you can then query it using your favorite OpenSearch client and interactively analyze and visualize your data using OpenSearch Dashboards.

Things to Know
Here are a couple of things to keep in mind about additional features and considerations when you choose Amazon OpenSearch Serverless:

  • SAML Authentications – You can use your existing identity provider to offer single sign-on (SSO) for the OpenSearch Dashboards endpoints of OpenSearch Serverless SAML authentication lets you use third-party identity providers to sign in to OpenSearch Dashboards to index and search data. OpenSearch Serverless supports providers that use the SAML 2.0 standard, such as Okta, Keycloak, Active Directory Federation Services, and Auth0.
  • Private VPC Endpoints – You can use AWS PrivateLink to create a private connection between your VPC and OpenSearch Serverless. You can access your collections as if they were in your VPC without the use of an internet gateway, NAT device, VPN connection, or AWS Direct Connect connection. To create an interface endpoint, choose VPC endpoints in the left navigation pane of OpenSearch Service.
  • Managed Clusters – You may prefer to use an option of Amazon OpenSearch Service’s managed clusters in scenarios where you need tight control over cluster configuration or specific customizations. For example, your workloads may need custom plugins that run best on accelerated computing instances and need more control on configuration such as data sharding strategy. You can choose either provisioned instances or serverless according to the requirements of your workload.

Join the Preview
The preview release of Amazon OpenSearch Serverless is now available in the US East (N. Virginia, Ohio), US West (Oregon), EU (Ireland), Asia Pacific (Tokyo). With OpenSearch Serverless, there are no upfront costs, and you pay only for the data that is ingest and the queries you run. For pricing details, see the OpenSearch Service pricing page. To learn more, visit the Amazon OpenSearch Service User Guide.

We want to hear more feedback during the preview. Please send feedback to AWS re:Post for Amazon OpenSearch Service or through your usual AWS support contacts.

Channy

New — Create and Share Operational Reports at Scale with Amazon QuickSight Paginated Reports

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/new-create-and-share-operational-reports-at-scale-with-amazon-quicksight-paginated-reports/

There are various ways to report on data insights, and paginated reports is one of them. Paginated reports are essential documents that contain critical business information for end-users. For decades, paginated reports have been the standard business reporting format. The following are examples of paginated reports. On the left shows the report for income statement and on the right is the yearly summary corporate statement:

Examples of paginated reports

As the example shows, paginated reports contain various highly formatted insights and are designed to be printable, in landscape or portrait orientation, so they can be consumed easily by readers. It’s called paginated because it often spans tens of hundreds of pages of data.

Although it may appear to be a simple task, generating paginated reports is heavily dependent on legacy data warehouses and legacy business intelligence tools, especially because modern business intelligence tools do not offer this capability. As a result, organizations typically have to maintain multiple business intelligence systems to have separate solutions for building critical operational reports and summarized dashboards. Each solution presents its set of challenges with data governance, security, and access management. This caused a disjointed experience both authors and end users. Legacy BI systems also run on-premises infrastructure, which is expensive to maintain and upgrade.

Introducing Amazon QuickSight Paginated Reports
Today, I’m pleased to announce Amazon QuickSight Paginated Reports. This feature allows customers to create and share highly formatted, personalized reports containing business-critical data to hundreds of thousands of end-users without any infrastructure setup or maintenance, up-front licensing, or long-term commitments.

Here’s a quick look on how Amazon QuickSight Paginated Reports works:

Quick look on Amazon QuickSight Paginated Reports

With Amazon QuickSight Paginated Reports, customers can now create and share paginated reports to their users from the same familiar QuickSight interface that they use to create and consume interactive dashboards. They can use one single BI service to create and deliver interactive analytics in dashboards, format reports with paginated reports, or embed analytics in apps while also allowing end users to ask questions of the underlying data using machine learning (ML) powered natural language query with QuickSight Q. From ML powered interactive dashboard to generating and distributing operational reports, these benefits impact different stakeholder groups in an organization

For Readers – Amazon QuickSight Paginated Reports makes it easy for readers to consume reports in a familiar and scheduled fashion, in highly formatted models in .pdf or .csv formats. Readers can access these reports via email, Amazon QuickSight web and mobile interfaces, mobile applications, or embedded portals.

For Authors – This feature gives report authors the flexibility to create highly formatted reports with images, texts, charts, tables, and exact page sizes. They can create reports from the same data models as dashboards, reusing data models built up, using access permissions (RLS/CLS) setup, and publishing in the same dashboards where their users look for data. These dashboards are also available via API, allowing migration between accounts or programmatic creation and migration of these assets as needed.

The Amazon QuickSight Paginated Reports makes it easy to build reports without the need for separate training or investment in a dedicated application. With an easy-to-use web-based authoring interface, this feature allows report authors to create complex data models in the form of operational reports for hundreds of thousands of report readers and enables data-driven decision-making.

For IT Leaders – This feature also provides IT leaders with benefits such as fully managed reporting capabilities consolidated within Amazon QuickSight. This reduces the time and resources required to set up and maintain reporting solutions, helping IT leaders to start looking at the cloud for their BI needs and transitioning legacy reporting to the cloud to save time and resources.

Amazon QuickSight Paginated Reports also leverages existing QuickSight capabilities, such as user management, data preparation, advanced scheduling and audit logging. By inheriting the capabilities from QuickSight, it removes the need to manage any infrastructure or provisioning setup to deliver reports to hundreds of thousands of users.

Get Started with Amazon QuickSight Paginated Reports
Let’s see how to get started with Amazon QuickSight Paginated Reports. I will focus more on how authors can create, publish and deliver reports to readers.

For Authors: Creating a Report
First, I open the QuickSight console. Then, in the navigation section, I select the dataset that I will use for reporting purposes. 

Selecting dataset

After I check and confirm the dataset, I select Use in Analysis.

Using dataset in analysis

On the next page, I have the option to select the sheet type, Interactive sheet, or Paginated report. I select Paginated report, and here I can configure the report for Paper size and either Portrait or Landscape orientation.

Select Paginated report

Now I’m starting my report creation. The sheet area I can use is adjusted to the paper size option I defined in the previous step. In this reporting sheet, QuickSight provides me with Header and Footer areas.

Header and footer area

First, I want to add the title of this report in the header section. I select the Header area, and in the menu section, I select Add text.

Adding text

Now, I can start entering the title of the report. I name this report “Attendance Statistics” and customize the header using the company logo. I can also use the text toolbar to format the text and add page numbers. For any changes I’ve made, I can also see the preview directly on this page.

Using text toolbar

I can also add other visuals in any section by selecting Add visual.

Adding visual

From here, I can start building reports with the available visuals, just like I normally do on the Amazon QuickSight dashboard. For example, if I need to add a summary to the pie chart, I can add another text box and drag and drop to set the layout and resize the visuals as needed.

Arranging layout

If I need to add another section, from the menu, I select Add section, and I can add other visuals or insights into this new section. As for visual tabular data, the visual will be generated across pages.

Table will automatically expand across pages

For Author: Publish and Schedule Report
Once the analysis is completed, I need to publish this analysis as a dashboard by selecting Share and then Publish dashboard. Then I can choose to create a new dashboard by selecting Publish new dashboard or Replace an existing dashboard. I can also select the sheet(s) I want to publish.

Publishing dashboard

At this stage, I’m ready to set a schedule to deliver my reports to readers. To do that, I need to open the dashboard and define a schedule by selecting Add schedule.

Select Add Schedule

In this menu, I can specify the schedule name and also the content format. In the Content section, I can choose either PDF or CSV format. For PDF format, I can select the sheet I want to use. For CSV format, I can select multiple visuals.

Schedule configuration

As for the delivery report schedule, I can define the schedule as Daily, Weekly, Monthly, or one-time delivery with Do not repeat. I can also specify the date and time of delivery, including the time zone.

Schedule timing configuration

Then, I specify the configuration of the email message. In the final section, I can also specify how readers access this report, by using Download link or File attachment. Once I’m done setting up the schedule, I can Save it or send this report according to the schedule by selecting Save and run now.

 

Save or save and run now

For Readers: Receiving and Accessing Reports
Here is an example email from the schedule that QuickSight has sent to me as a reader. I can download this report from the email attachment or from the dashboard. 

Example mail with paginated report

I can also use the provided link in the email to view recent snapshots. The Recent Snapshots feature allows me to review previously generated reports.Recent snapshots feature

Things to Know
Programmatic API Access – In addition to using the Amazon QuickSight console, customers can also use the AWS API and SDK to interact programmatically with Amazon QuickSight Paginated Reports.

AWS Partners – To make it easier for customers to migrate their legacy BI solutions to Amazon QuickSight, customers can work with AWS partners, Ironside Consulting and Data Terrain. Ironside and Data Terrain offerings are available in the AWS Marketplace, with more details at Amazon QuickSight Partners page.

Availability and Pricing – Amazon QuickSight Paginated Reports is available as an add-on to the existing Amazon QuickSight Enterprise or Enterprise enabled with Q in all supported AWS Regions.

Visit the Amazon QuickSight Paginated Reports page to learn more details on how to use this feature, learn how to get started, and understand the pricing.

Happy building!
Donnie

Simplify data loading on the Amazon Redshift console with Informatica Data Loader

Post Syndicated from Deepak Rameswarapu original https://aws.amazon.com/blogs/big-data/simplify-data-loading-on-the-amazon-redshift-console-with-informatica-data-loader/

Amazon Redshift is the fastest, most widely used, fully managed, petabyte-scale cloud data warehouse. Tens of thousands of customers use Amazon Redshift to process exabytes of data every day to power their analytics workloads. Data engineers, data analysts, and data scientists want to use this data to power analytics workloads such as business intelligence (BI), predictive analytics, machine learning (ML), and real-time streaming analytics.

Informatica Intelligent Data Management Cloud™ (IDMC) is an AI-powered, metadata-driven, persona-based, cloud-native platform to empower data professionals with a comprehensive and cohesive cloud data management capabilities to discover, catalog, ingest, cleanse, integrate, govern, secure, prepare, and master data. Informatica Data Loader for Amazon Redshift, available on the AWS Management Console, is a zero-cost, serverless IDMC service that enables frictionless data loading to Amazon Redshift.

Customers need to bring data quickly and at scale from various data stores, including on-premises and legacy systems, third-party applications, and AWS services such as Amazon Relational Database Service (Amazon RDS), Amazon DynamoDB, and more. You also need a simple, easy, and cloud-native solution to quickly onboard new data sources or to analyze recent data for actionable insights. Now, with Informatica Data Loader for Amazon Redshift, you can securely connect and load data to Amazon Redshift at scale via a simple and guided interface. You can access Informatica Data Loader directly from the Amazon Redshift console.

This post provides step-by-step instructions to load data into Amazon Redshift using Informatica Data Loader.

Solution overview

You can access Informatica Data Loader directly from the navigation pane on the Amazon Redshift console. The process follows a similar workflow that Amazon Redshift users already use to access the Amazon Redshift query editor to author and organize SQL queries, or create datashares to share live data in read-only mode across clusters.

For this post, we use a Salesforce developer account as the data source. For instructions in importing a sample dataset, see Import Sample Account Data. You can use over 30 pre-built connectors supported by Informatica services to connect to the data source of your choice.

We use Informatica Data Loader to select and load a subset of Salesforce objects to Amazon Redshift in three simple steps:

  1. Connect to the data source.
  2. Connect to the target data source.
  3. Schedule or run the data load.

In addition to object-level filtering, the service also supports full and incremental loads, change data capture (CDC), column-based and row-based filtering, and schema drifts. After the data is loaded, you can run query and generate visualizations using Amazon Redshift Query Editor v2.0.

Prerequisites

Complete the following prerequisites:

  1. Create an Amazon Redshift cluster or workgroup. For more information, refer to Creating a cluster in a VPC or Amazon Redshift Serverless.
  2. Ensure that the cluster can be accessed from Informatica Data Loader. For a private cluster, add an ingress rule to the security group attached to your cluster to allow traffic from Informatica Data Loader. Allow-list the IP address for the cluster to be accessed from Informatica Data Loader. For more information about adding rules to an Amazon Elastic Compute Cloud (Amazon EC2) security group, see Authorize inbound traffic for your Linux instances.
  3. Create an Amazon Simple Storage Service (Amazon S3) bucket in the same Region as the Amazon Redshift cluster. The Informatica Data Loader will stage the data into this bucket before uploading the data to the cluster. Refer to Creating a bucket for more details. Make a note of the access key ID and secret access key for the user with permission to write to the staging S3 bucket.
  4. If you don’t have a Salesforce account, you can sign up for a free developer account.

Now that you have completed the prerequisites, let’s get started.

Launch Informatica Data Loader from the Amazon Redshift console

To launch Informatica Data Loader, complete the following steps:

  1. On the Amazon Redshift console, under AWS Partner Integration in navigation pane, choose Informatica Data Loader.
  2. In the pop-up window Create Informatica integration, choose Complete Informatica integration.
    If you’re accessing the free Informatica Data Loader for first time, you’re directed to the Informatica Data Loader for Amazon Redshift to sign up at no cost. You only need your email address to sign up.
  3. After you sign up, you can sign in to your Informatica account.

Connect to a data source

To connect to a data source, complete the following steps:

  1. On the Informatica Data Loader console, choose New in the navigation pane.
  2. Choose New Connection.
  3. Choose Salesforce as your source connection.
  4. Choose Continue.
  5. Under General Properties, enter a name for your connection and an optional description.
  6. Under Salesforce Connection Properties¸ enter the credentials for your Salesforce account and security token.These options may vary depending on the source type, connection type, and authentication method. For guidance, you can use the embedded connection configuration help video.
  7. Make a note of the connection name Salesforce_Source_Connection.
  8. Choose Test to verify the connection.
  9. Choose Add to save your connection details, and continue setting up the data loader.Now that you have connected to the Salesforce data source, you load the sample account information to Amazon Redshift. For this post, we load the Account object containing information on customer type and billing state or province, among other fields.
  10. Ensure that Salesforce_Source_Connection you just created is selected as Connection.
  11. To filter the Account object in Salesforce, select Include some under Define Object.
  12. Choose the plus sign to select the source object Account.
  13. In the pop-up window Select Source Object, search for account and choose Search.
  14. Select Account and choose OK.
  15. For this post, the rest of the following settings are left to their default value:
    1. Exclude fields – Exclude source fields from the source data.
    2. Define Filter – Filter rows from source data based on one or more specified filters.
    3. Define Primary Keys – Configuration to specify or detect the primary key column in the data source.
    4. Define Watermark Fields – Configuration to specify or detect the watermark column in the data source.

Connect to the target data source

To connect to the target data source (Amazon Redshift), complete the following steps:

  1. On the Informatica Data Loader, choose Connect Target.
  2. Choose New Connection.
  3. For Connection, choose Redshift (Amazon Redshift v2).
  4. Provide a connection name and optional description.
  5. Under Amazon Redshift Connection Section, enter your access key ID, secret access key, and the JDBC URL or your provisioned cluster or serverless workgroup.
  6. Choose Test to verify connectivity.
  7. After the connection is successful, choose Add.
  8. Optionally, for Target Name Prefix, enter the prefix to which the object name should be appended.
  9. For Path, enter the schema name public in Amazon Redshift where you want to load the data.
  10. For Load to existing tables, select No, create new tables every time.
  11. Choose Advanced Options to enter the name of the staging S3 bucket.
  12. Choose OK.

You have now successfully connected to a target Amazon Redshift cluster.

Schedule or run a data load

You can run your data load by choosing Run or expand the Schedule section to schedule it.

You can also monitor job status on the My Jobs page.

When your job status changes to Success, you can return to the Amazon Redshift console and open Query Editor V2.

In Amazon Redshift Query Editor v2.0, you can verify the loaded data by running the following query:

select * from public.”Account”;

Now we can do some more analysis. Let’s look at customer account by industry:

select 
    case when industry is NULL then 'Other'
    else industry end as Industry,
    case
        when type is NULL then 'Customer-Other'
        when type = '1' then 'Customer-Other'
        when type = '2' then 'Customer-Other'
        else type 
    end as CustomerType,
    count(*) as AggCount
from "dev"."public"."Account"
group by industry, type
order by aggcount desc

Also, we can use the charting capability of Query Editor V2 for visualization.

Simply choose the chart type and the value and label you want to chart.

Conclusion

The post demonstrates the integrated Amazon Redshift console experience of loading data with Informatica Data Loader and querying the data with Amazon Redshift Query Editor. With Informatica Data Loader, Amazon Redshift customers can quickly onboard new data sources in three simple steps and just-in-time bring data at scale to drive data-driven decisions.

You can sign up for Informatica Data Loader for Amazon Redshift and start loading data to Amazon Redshift.


About the authors

Deepak Rameswarapu is a Director of Product Management at Informatica. He is product leader with a strategic focus on new features and product launches, strategic product road map, AI/ML, cloud data integration, and data engineering and integration leadership. He brings 20 years of experience building best-of-breed products and solutions to address end-to-end data management challenges.

Rajeev Srinivasan is a Director of Technical Alliance, Ecosystem at Informatica. He leads the strategic technical partnership with AWS to bring needed and innovative solutions and capabilities into the hands of the customers. Along with customer obsession, he has a passion for data and cloud technologies, and riding his Harley.

Michael Yitayew is a Product Manager for Amazon Redshift based out of New York. He works with customers and engineering teams to build new features that enable data engineers and data analysts to more easily load data, manage data warehouse resources, and query their data. He has supported AWS customers for over 3 years in both product marketing and product management roles.

Phil Bates is a Senior Analytics Specialist Solutions Architect at AWS. He has more than 25 years of experience implementing large-scale data warehouse solutions. He is passionate about helping customers through their cloud journey and using the power of ML within their data warehouse.

Weifan Liang is a Senior Partner Solutions Architect at AWS. He works closely with AWS top strategic data analytics software partners to drive product integration, build optimized architecture, develop long-term strategy, and provide thought leadership. Innovating together with partners, Weifan strives to help customers accelerate business outcomes with cloud-powered digital transformation.

Create advanced insights using level-aware calculations in Amazon QuickSight

Post Syndicated from Karthik Tharmarajan original https://aws.amazon.com/blogs/big-data/create-advanced-insights-using-level-aware-calculations-in-amazon-quicksight/

Calculation at the right granularity always needs to be handled carefully when performing data analytics. Especially when data is generated through joining across multiple tables, the denormalization of datasets can add a lot of complications to make accurate calculations challenging. Amazon QuickSight recently launched a new functionality called level-aware calculations (LAC), which enables you to specify any level of granularity at which you want the aggregate functions (at what level to group by) or window functions (in what window to partition by) to be conducted. This brings flexibility and simplification for you to build advanced calculations and powerful analyses. Without LAC, you have to prepare pre-aggregated tables in your original data source, or run queries in the data prep phase to enable those calculations.

In this post, we demonstrate how to create advanced insights using LAC in QuickSight. Before we start walking through the functions, let’s first introduce the important concept of order of evaluation in QuickSight and then talk about LAC in more detail.

QuickSight order of evaluation

Every time you open or update an analysis, QuickSight evaluates everything that is configured in the analysis in a specific sequence, and translates the configuration into a query that a database engine can run. This is the order of evaluation. The level of complication depends on how many elements are embedded in a visual, multiplied by the number of visuals plus the interactives between the visuals. But if we abstract the concept, the order of evaluation logic can be demonstrated by the following chart.

In the first DEFAULT route, QuickSight evaluates simple calculations at row level for the raw dataset, then it applies the WHERE filters. After that, based on what dimensions are added to the visual, QuickSight evaluates the aggregation for the selected measures at visual dimensions and applies HAVING filters. Table calculations such as running total or percent of total then get evaluated after the visual is formed, with the subtotal and total calculated last.

Sometimes, you may want the analytical steps to be conducted with a different sequence. For example, you may want to do an aggregation before the data being filtered, or do an aggregation first for some specific dimensions and then aggregate again for the visual dimensions. Based on those different needs, QuickSight offers three variations of order of evaluation (as shown in the preceding chart). Specifically, you can use the key words of PRE_FILTER to add a calculation step before the WHERE filter, use PRE_AGG to add a calculation step before the visual aggregation, or use a whole suite of level-aware calculation-aggregation functions to define an aggregation at independent dimensions, and then aggregate them at the visual dimension (a nested aggregation).

Most of the time, your visuals will include more than one calculated field. You’ll want to be careful to define each of them and understand how they’re interacting with the visuals and with different filters. Applying a filter before or after a window function can generate totally different results and business meanings.

With all the background introduced, now let’s talk about the new LAC functions and their capabilities, and demonstrate several typical use cases.

Level-aware calculations (LAC)

There are two groups of LAC functions:

  • Level-aware calculations-aggregate functions (LAC-A) – These are our newly launched functions. By adding one argument into an existing aggregate function (for example, sum(), max(), or count()), you can define any group-by dimensions you desire for the aggregation. A typical syntax for LAC-A is sum(measure,[group_field_A]). With LAC-A, you can add an aggregation step before the visual aggregation. The added layer can be fixed, which is independent of the visual dimension. It also can be dynamically interacting with the visual dimensions. We give some detailed examples later in this post. For a list of supported aggregation functions, refer to Level-aware calculation – aggregate (LAC-A) functions.
  • Level-aware calculation-window functions (LAC-W) – These are the existing functions. They used to be called level-aware aggregations (LAA). We recently changed its name to better reflect the function nature, due to underlying difference between window functions and aggregate functions. LAC-W is a group of window functions (such as sumover(), maxover(), and denseRank()) where using a third parameter, you can choose to run the calculation at the PRE_FILTER or PRE-AGG stage. A typical syntax for LAC-W is sumOver(measure,[partition_field_A],pre_agg). For a list of supported window functions, refer to Level-aware calculation – window (LAC-W) functions.

The following high-level diagram shows different branches of LAC. In this post, we mostly focus on the new LAC-A functions.

With the new LAC-A functions, you can run two layers of aggregation calculations. This offers the following benefits:

  • Run aggregation calculations that are independent of the group-by fields in the visual calculation
  • Run aggregation calculations for the dimensions that are NOT in the visual
  • Remove duplication of raw data before running calculations
  • Run aggregation calculations with nested group-by fields dynamically adapting to visual group-by fields

Let’s explore how we can achieve those benefits by demonstrating a few use cases.

Use case #1: Identify orders where actual ordered quantity for a product is higher than the average quantity

In this case, our visual is at the order level; however, we want to compute the average quantity of a product and use that to display the difference at each individual row/order level. With the LAC-A function, we can easily create an aggregation that is independent of the level in the visual.

We first compute the average quantity sold at product level using the expression of avg(Quantity,[Product]). To do so, change the visual-level aggregation to Average. In this case, visual-level aggregation doesn’t matter because we have product as a column and the LAC-A are at the same level. In the result table, the average quantity value for product is repeated across all orders because this is computed at the product level.

Now that we have computed the average quantity at the product level, we can extend this to compute the difference between actual quantity ordered and the average quantity of product using the expression sum(Quantity) - avg(avg(Quantity,[Product])). This computed difference can then be used to conditionally format the view to highlight orders that have quantity higher than the average quantity of a product.

As seen in this example, although the visual was at the order level, we easily created an aggregation like average quantity of product, which is independent of the level in the visual, and used that to display the difference at each individual row/order level.

Use case #2: Identify the average of total country sales by region

Here we want to aggregate the sales for each country and then compute the average of country-level sales at region level using the same dataset.

With the LAC-A function, we can easily create an aggregation at a dimension level that is NOT in the visual. In this example, although country is not included in the visual, the LAC-A function first aggregates the sales at the country level and then the visual-level calculation generates the average number for each region. Within QuickSight, we can implement this in two ways.

Option 1: Nest the LAC-A with visual-level aggregate functions

Create a calculated column to compute sales at country level by using the expression of sum(Sales,[Country] ), then add the calculation to the visual and change the aggregation to Average, as shown in the following screenshot. Note that if we don’t use LAC-A to specify the level, the average sales are calculated at the lowest granular level (the base level of the dataset) for each region. That’s why the numbers are significantly smaller for the sales column.

Option 2: Use LAC-A combined with other aggregate functions and nest them in the calculated column

Create a calculated column to compute sales at country level by using the expression sum(Sales,[Country]) and then nest that with additional aggregation, in this case Average, by using the expression avg(sum(Sales,[Country])).

Use case #3: Calculate total and average for a denormalized dataset with duplications

LAC-A calculations are designed to effectively handle duplicates in data while performing computation. It allows you to perform computations like average without the need for explicit handling of duplicates in data.

Consider a dataset that has employee and project details along with each employee’s salary. An employee can be associated with multiple projects, as shown in the following example.

Now let’s calculate total employee salary, average employee salary, and minimum and maximum salary using this sample dataset.

To compute total and average, we have to consider each employee’s salary just once even though an employee can be part of multiple projects and the salary for the employee can get duplicated across these projects. We can easily achieve this using LAC-A to compute the maximum of the salary at the employee level and then use that to compute the total and average.

Create a calculated column called Total Salary using the expression sum(max(Salary,[{Employee Name}])) and create another calculated column called Average Salary using avg(max(Salary,[{Employee Name}])). We can easily calculate Min Salary using the expression min(Salary) and Max Salary using max(Salary).

If we try to solve this without using LAC-A, we have to explicitly handle salary duplication in our calculation, and we have to go through multiple steps to get to the final result. Refer to the QuickSight Community blog for a similar use case.

Dynamic group keys for LAC-A

In addition to defining a static level of aggregation as seen in the preceding examples, you can also dynamically add or remove dimensions from the visual level group-by fields. The following are example syntax for a dynamic level:

  • Add dimensions to the visual dimensions with sum(cost, [$VisualDimensions, LevelA, LevelB])
  • Remove dimensions from the visual dimensions with sum(cost, [$VisualDimensions, !LevelC, !LevelD])

This capability brings a lot of flexibility and scalability for you to make the LAC even more powerful. Firstly, you can define one LAC calculated field and reuse it across multiple visuals with similar business intents. Additionally, if you’re building the visuals and keep adding or deleting the visual fields, you don’t need to edit the LAC-A calculated field each time, and the LAC-A will automatically adjust to the visual dimensions and give the right output.

Use case #4: Identify average sales of customers within each region or country

To compute this, we can use the dynamic expression Sum(Sales, [$VisualDimensions, Customers]). This is because each customer has purchases in more than one region. We need to calculate the customer average sales only within each region. We can reuse the same expression in a different visual with country as the visual dimension if we want to calculate average sales of customers within each country.

Use Average as the visual-level metric with Group by as Region to get the region-level average. In this case, “$visualDimensions” is adapted to Region. So the expression is equivalent to Sum(Sales, [Region, Customers]) in this visual.

If you have a similar visual with the Country dimension, then the dynamic expression is equivalent to Sum(Sales, [Country, Customers]). Reusing the same expression in different visuals saves us a lot of time, especially when we want to build similar visuals with slightly different business context.

Use case #5: Identify the sales percentage of each subregion compared to region level

In this example, we want to calculate the percentage of sales for each subregion, comparing with the total region sales. You can use the fixed dimension as we mentioned before, but imagine a situation when you want to include more dimensions into the visual such as product, year, or supplier. Using the dynamic group key by removing only {subregion} from the visual dimensions makes the exploration process much easier and quicker.

First, create an expression to calculate the sum of sales without the subregion level using sum(Sales, [$visualDimensions, !subregion]), then calculate the percentage using sum(Sales) / sum(sum(Sales, [$visualDimensions, !subregion])).

Conclusion

In this post, we introduced new QuickSight LAC-A functions, which enable powerful and advanced aggregations with user-defined dimensions. We introduced the QuickSight order of evaluation, and walked through three use cases for LAC-A static keys and two use cases of LAC-A dynamic keys. Level-aware calculations are now generally available in all supported QuickSight Regions.

We look forward to your feedback and stories on how you apply these calculations for your business needs.


About the Authors

Karthik Tharmarajan is a Senior Specialist Solutions Architect for Amazon QuickSight. Karthik has over 15 years of experience implementing enterprise business intelligence (BI) solutions and specializes in integration of BI solutions with business applications and enable data-driven decisions.

Emily Zhu is a Senior Product Manager-Tech at Amazon QuickSight, AWS’s cloud-native, fully managed SaaS BI service. She leads the development of the QuickSight analytics and query experience. Before joining AWS, she worked in the Amazon Prime Air drone delivery program and the Boeing company as senior strategist for several years. Emily is passionate about the potential of cloud-based BI solutions and looks forward to helping customers advance in their data-driven strategy making.

Feng Han is a Software Development Manager in AWS QuickSight Query Platform team. He focuses on query generation platform and advanced function calculation, and is leading the team to next generation of calculation engine.

Scale AWS SDK for pandas workloads with AWS Glue for Ray

Post Syndicated from Abdel Jaidi original https://aws.amazon.com/blogs/big-data/scale-aws-sdk-for-pandas-workloads-with-aws-glue-for-ray/

AWS SDK for pandas is an open-source library that extends the popular Python pandas library, enabling you to connect to AWS data and analytics services using pandas data frames. We’ve seen customers use the library in combination with pandas for both data engineering and AI workloads. Although pandas data frames are simple to use, they have a limitation on the size of data that can be processed. Because pandas is single-threaded, jobs are bounded by the available resources. If the data you need to process is small, this won’t be a problem, and pandas makes analysis and manipulation simple, as well as interactions with many other tools that support machine learning (ML) and visualization. However, as your data size scales, you may run into problems. This can be especially frustrating if you’ve created a promising prototype that can’t be moved to production. In our work with customers, we’ve seen many projects, both in data science and data engineering, that are stuck while they wait for someone to rewrite using a big data framework such as Apache Spark.

We are excited to announce that AWS SDK for pandas now supports Ray and Modin, enabling you to scale your pandas workflows from a single machine to a multi-node environment, with no code changes. The simplest way to do this is to use AWS Glue with Ray, the new serverless option to run distributed Python code announced at AWS re:Invent 2022. AWS SDK for pandas also supports self-managed Ray on Amazon Elastic Compute Cloud (Amazon EC2).

In this post, we show you how you can use pandas to connect to AWS data and analytics services and manipulate data at scale by running on an AWS Glue with Ray job.

Overview of solution

Ray is a unified framework that enables you to scale AI and Python applications. The goal of the project is to take any Python code that’s written on a laptop and scale the workload on a cluster. This innovative framework opens the door to big data processing to a new audience. Previously, the only way to process large datasets on a cluster was to use tools such as Apache Hadoop, Apache Spark, or Apache Flink. These frameworks require additional skills because they provide their own programming model and often require languages such as Scala or Java to fully take advantage of the advanced capabilities. With Ray, you can just use Python to parallelize your code with few modifications.

Although Ray opens the door to big data processing, it’s not enough on its own to distribute pandas-specific methods. That task falls to Modin, a drop-in replacement of pandas, optimized to run in a distributed environment, such as Ray. Modin has the same API as pandas, so you can keep your code the same, but it parallelizes workloads to improve performance.

With today’s announcement, AWS SDK for pandas customers can use both Ray and Modin for their workloads. You have the option of loading data into Modin data frames, instead of regular pandas data frames. By configuring the library to use Ray and Modin, your existing data processing scripts can distribute operations end-to-end, with no code changes. AWS SDK for pandas takes care of parallelizing the read and write operations for your files across the compute cluster.

To use this feature, you can install the release candidate version of awswrangler with the ray and modin extras:

pip install "awswrangler[modin,ray]==3.0.0rc2"

Once installed, you can use the library in your code by importing it with the following statement:

import awswrangler as wr

When you run this code, the SDK for pandas looks for an environmental variable called WR_ADDRESS. If it finds it, it uses this value to send the commands to a remote cluster. If it doesn’t find it, it starts a local Ray runtime on your machine.

The following diagram shows what is happening when you run code that uses AWS SDK for pandas to read data from Amazon Simple Storage Service (Amazon S3) into a Modin data frame, perform a filtering operation, and write the data back to Amazon S3, using a multi-node cluster.

In the first phase, each node reads one or more input files and stores them in memory as blocks. During this phase, the head node builds a mapping reference that tracks the location of each block on the worker nodes. In the second phase, a filter operation is submitted to each node, creating a subset of the data. Finally, each worker node writes its blocks to Amazon S3.

It’s important to note that certain data frame operations (for example groupby or join) may result in the data being shuffled across nodes. Shuffling will also happen if you do partitioned or bucketed writes. This tends to slow down the job because data needs to move between nodes.

If you want to create your own Ray cluster on Amazon EC2, refer to the tutorial Distributing Calls on Ray Remote Cluster. The rest of this post shows you how to run AWS SDK for pandas and Modin on an AWS Glue with Ray job.

Use AWS Glue with Ray

Because AWS Glue with Ray is a fully managed environment, it’s a simple way to run jobs. Both AWS SDK for pandas and Modin are pre-loaded, you don’t need to worry about cluster management or installing the right set of dependencies, and the job auto scales with your workload. To get started, complete the following steps:

  1. Choose Launch Stack to provision an AWS CloudFormation stack in your AWS account:
    launch cloudformation stack
    Note that while in preview, AWS Glue with Ray is available in a limited set of AWS Regions.The stack takes about 3 minutes to complete. You can verify that everything was successfully deployed by checking that the CloudFormation stack shows the status CREATE_COMPLETE.
  2. Navigate to AWS Glue Studio to find an AWS Glue job named GlueRayJob with the following script.
  3. Choose Run to start the job and navigate to the Runs tab to monitor progress.

Here, we break down the script and show you what happens at each stage when we run this code on AWS Glue with Ray. First, we import the library:

import awswrangler as wr

At import, AWS SDK for pandas detects if the runtime supports Ray, and automatically initializes a Ray cluster with the default parameters. In this case, because we’re running on AWS Glue with Ray, AWS SDK for pandas automatically uses the Ray cluster with no extra configuration needed. Advanced users can override this process, however, by starting the Ray runtime before the import command.

Next, we read Amazon product data in Parquet format from Amazon S3 and load it into a distributed Modin data frame:

# Read Parquet data (1.2 Gb Parquet compressed)
df = wr.s3.read_parquet(
    path=f"s3://amazon-reviews-pds/parquet/product_category={category.title()}/",
)

Simple data transformations on the data frame are applied next. Modin data frames implement the same interface as pandas data frames, allowing you to perform familiar pandas operations at scale. First, we drop the customer_id column, then we filter for a subset of the reviews that received five-star ratings:

# Drop the customer_id column
df.drop("customer_id", axis=1, inplace=True)

# Filter reviews with 5-star rating
df5 = df[df["star_rating"] == 5]

The data is written back to Amazon S3 in Parquet format, partitioned by year and marketplace. The dataset=True argument ensures that an associated Hive table is also created in the AWS Glue metadata catalog:

# Write partitioned five-star reviews to S3 in Parquet format
wr.s3.to_parquet(
    df5,
    path=f"s3://{bucket_name}/{category}/",
    partition_cols=["year", "marketplace"],
    dataset=True, 
    database=glue_database,
    table=glue_table, 
)

Finally, a query is run in Amazon Athena, and the S3 objects resulting from this operation are read in parallel into a Modin data frame:

# Read the data back to a Modin df via Athena
df5_athena = wr.athena.read_sql_query(
    f"SELECT * FROM {glue_table}",
    database=glue_database,
    ctas_approach=False, 
    unload_approach=True, 
    workgroup=workgroup_name,
    s3_output=f"s3://{bucket_name}/unload/{category}/",
)

The Amazon CloudWatch logs of the job provide insights into the performance achieved from reading blocks in parallel in a multi-node Ray cluster.

For simplicity, this example showcased Amazon S3 and Athena APIs only, but AWS SDK for pandas supports other services, including Amazon Timestream and Amazon Redshift. For a full list of the APIs that support distribution, refer to Supported APIs.

Clean up AWS resources

To prevent unwanted charges to your AWS account, you can delete the AWS resources that you used for this example:

  1. On the Amazon S3 console, empty data from both buckets with prefix glue-ray-.
  2. On the AWS CloudFormation console, delete the SDKPandasOnGlueRay stack.

The resources created as part of the stack are automatically deleted with it.

Conclusion

In this post, we demonstrated how you can run your workloads at scale using AWS SDK for pandas. When used in combination with AWS Glue with Ray, this gives you access to a fully managed environment to distribute your Python scripts. We hope this solution can help with migrating your existing pandas jobs to achieve higher performance and speedups across multiple data stores on AWS.

For more examples, check out the tutorials in the AWS SDK for pandas documentation.


About the Authors

Abdel Jaidi is a Senior Cloud Engineer for AWS Professional Services. He works on open-source projects focused on AWS Data & Analytics services. In his spare time, he enjoys playing tennis and hiking.

Anton Kukushkin is a Data Engineer for AWS Professional Services based in London, United Kingdom. He works with AWS customers, helping them build and scale their data and analytics.

Leon Luttenberger is a Data Engineer for AWS Professional Services based in Austin, Texas. He works on AWS open-source solutions that help our customers analyze their data at scale.

Lucas Hanson is Senior Cloud Engineer for AWS Professional Services. He focuses on helping customers with infrastructure management and DevOps processes for data management solutions. Outside of work, he enjoys music production and practicing yoga.

Introducing AWS Glue for Ray: Scaling your data integration workloads using Python

Post Syndicated from Zach Mitchell original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-for-ray-scaling-your-data-integration-workloads-using-python/

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development. Today, AWS Glue processes customer jobs using either Apache Spark’s distributed processing engine for large workloads or Python’s single-node processing engine for smaller workloads. Customers like Python for its ease of use and rich collection of built-in data-processing libraries but might find it difficult for customers to scale Python beyond a single compute node. This limitation makes it difficult for customers to process large datasets. Customers want a solution that allows them to continue using familiar Python tools and AWS Glue jobs on data sets of all sizes, even those that can’t fit on a single instance.

We are happy to announce the release of a new AWS Glue job type: Ray. Ray is an open-source unified compute framework that makes it simple to scale AI and Python workloads. Ray started as an open-source project at RISELab in UC Berkeley. If your application is written in Python, you can scale it with Ray in a distributed cluster in a multi-node environment.  Ray is Python native and you can combine it with the AWS SDK for pandas to prepare, integrate and transform your data for running your data analytics and ML workloads in combination.

This post provides an introduction to AWS Glue for Ray and shows you how to start using Ray to distribute your Python workloads.

What is AWS Glue for Ray?

Customers like the serverless experience and fast start time offered by AWS Glue. With the introduction of Ray, we have ensured that you get the same experience. We have also ensured that you can use the AWS Glue job and AWS Glue interactive session primitives to access the Ray engine. AWS Glue jobs are fire-and-forget systems where customer submit their Ray code to the AWS Glue jobs API and AWS Glue automatically provisions the required compute resources and runs the job. AWS Glue interactive session APIs allow interactive exploration of the data for the purpose of job development. Regardless of the option used, you are only billed for the duration of the compute used. With AWS Glue for Ray, we are also introducing a new Graviton2 based worker (Z.2x) which offers 8 virtual CPUs and 64 GB of RAM.

AWS Glue for Ray consists of two major components:

  1. Ray Core – The distributed computing framework
  2. Ray Dataset – The distributed data framework based on Apache Arrow

When running a Ray job, AWS Glue provisions the Ray cluster for you and runs these distributed Python jobs on a serverless auto-scaling infrastructure.  The cluster in AWS Glue for Ray will consists of exactly one head node and one or more worker nodes.  

The head node is identical to the other worker nodes with the exception that it runs singleton processes for cluster management and the Ray driver process.  The driver is a special worker process in the head node that runs the top-level application in Python that starts the Ray job.  The worker node has processes that are responsible for submitting and running tasks.

The following figure provides a simple introduction to the Ray architecture.  The architecture illustrates how Ray is able to schedule jobs through processes called Raylets.  The Raylet manages the shared resources on each node and is shared between the concurrently running jobs.  For more information on how Ray works, see Ray.io.

The following figure shows the components of the worker node and the shared-memory object store:

There is a Global Control Store in the head node that can treat each separate machine as nodes, similar to how Apache Spark treats workers as nodes.  The following figure shows the components of the head node and the Global Control Store managing the cluster-level metadata.

AWS Glue for Ray comes included with Ray Core, Ray DatasetModin (distributed pandas) and the AWS SDK for pandas (on Modin) for seamless distributed integration into other AWS services.  Ray Core is the foundation of Ray and the basic framework for distributing Python functions and classes. Ray Dataset is a distributed data framework based on Apache Arrow and is most closely analogous to a dataframe in Apache Spark. Modin is a library designed to distribute pandas applications across a Ray cluster without any modification and is compatible with data in Ray Datasets. The included AWS SDK for pandas (formerly AWS Data Wrangler) is an abstraction layer on top of Modin to allow for the creation of pandas dataframes from (and writing to) many AWS sources such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon DynamoDB, Amazon OpenSearch Service, and others.

You can also install your own ARM compatible Python libraries via pip, either through Ray’s environmental configuration in @ray.remote or via --additional-python-modules.

To learn more about Ray, please visit the GitHub repo.

Why use AWS Glue for Ray?

Many of us start our data journey on AWS with Python, looking to prepare data for ML and data science, and move data at scale with AWS APIs and Boto3. Ray allows you to bring those familiar skills, paradigms, frameworks and libraries to AWS Glue and make them scale to handle massive datasets with minimal code changes. You can use the same data processing tools you currently have (such as Python libraries for data cleansing, computation, and ML) on datasets of all sizes. AWS Glue for Ray enables the distributed run of your Python scripts over multi-node clusters.

AWS Glue for Ray is designed for the following:

  • Task parallel applications (for example, when you want to apply multiple transforms in parallel)
  • Speeding up your Python workload as well as using Python native libraries.
  • Running the same workload across hundreds of data sources.
  • ML ingestion and parallel batch inference on data

Solution overview

For this post, you will use the Parquet Amazon Customer Reviews Dataset stored in the public S3 bucket. The objective is to perform transformations using the Ray dataset and then write it back to Amazon S3 in the Parquet file format.

Configure Amazon S3

The first step is to create an Amazon S3 bucket to store the transformed Parquet dataset as the end result.

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. For Bucket name, enter a name for your Amazon S3 bucket.
  4. Choose Create.

Set up a Jupyter notebook with an AWS Glue interactive session

For our development environment, we use a Jupyter notebook to run the code.

You’re required to install the AWS Glue interactive sessions locally or run interactive sessions with an AWS Glue Studio notebook. Using AWS Glue Interactive sessions will help you follow and run the series of demonstration steps.

Refer to Getting started with AWS Glue interactive sessions for instructions to spin up a notebook on an AWS Glue interactive session.

Run your code using Ray in a Jupyter notebook

This section walks you through several notebook paragraphs on how to use AWS Glue for Ray. In this exercise, we look at the customer reviews from the Amazon Customer Review Parquet dataset, perform some Ray transformations, and write the results to Amazon S3 in a Parquet format.

  1. On Jupyter console, under New, choose Glue Python.
  2. Signify you want to use Ray as the engine by using the %glue_ray magic.
  3. Import the Ray library along with additional Python libraries:
    %glue_ray
    
    import ray
    import pandas
    import pyarrow
    from ray import data
    import time
    from ray.data import ActorPoolStrategy

  4. Initialize a Ray Cluster with AWS Glue.
    ray.init('auto')

  5. Next, we read a single partition from the dataset, which is Parquet file format:
    start = time.time()
    ds = ray.data.read_parquet("s3://amazon-reviews-pds/parquet/product_category=Wireless/")
    end = time.time()
    print(f"Reading the data to dataframe: {end - start} seconds")

  6. Parquet files store the number of rows per file in the metadata, so we can get the total number of records in ds without performing a full data read:
    ds.count()

  7. Next , we can check the schema of this dataset. We don’t have to read the actual data to get the schema; we can read it from the metadata:
    ds.schema()

  8. We can check the total size in bytes for the full Ray dataset:
    #calculate the size in bytes of the full dataset,  Note that for Parquet files, this size-in-bytes will be pulled from the Parquet
    #  metadata (not triggering a data read).
    ds.size_bytes()

  9. We can see a sample record from the Ray dataset:
    #Show sample records from the underlying Parquet dataset  
    start = time.time()
    ds.show(1)
    end = time.time()
    print(f"Time taken to show the data from dataframe : {end - start} seconds")

Applying dataset transformations with Ray

There are primarily two types of transformations that can be applied to Ray datasets:

  • One-to-One transformations – Each input block will contributes to only one output block, such as add_column(), map_batches() and drop_column() , and so on.
  • All-to-All transformations – Input blocks can contribute to multiple output blocks such as sort() and groupby(), and so on.

In the next series of steps we will apply some of these transformations on our resultant Ray datasets from the previous section.

  1. We can add a new column and check the schema to verify the newly added column, followed by retrieving a sample record. This transformation is only available for the datasets that can be converted to pandas format.
    # Add the given new column to the dataset and show the sample record after adding a new column
    
    start = time.time()
    ds = ds.add_column( "helpful_votes_ratio", lambda df: df["helpful_votes"] / df["total_votes"])
    end = time.time()
    print(f"Time taken to Add a new columns : {end - start} seconds")
    ds.show(1)

  2. Let’s drop a few columns we don’t need using a drop_columns transformation and then check the schema to verify if those columns are dropped from the Ray dataset:
    # Dropping few columns from the underlying Dataset 
    start = time.time()
    ds = ds.drop_columns(["review_body", "vine", "product_parent", "verified_purchase", "review_headline"])
    end = time.time()
    print(f"Time taken to drop a few columns : {end - start} seconds")
    ds.schema()


    Ray datasets have built-in transformations such as sorting the dataset by the specified key column or key function.

  3. Next, we apply the sort transformation using one of the columns present in the dataset (total_votes):
    #Sort the dataset by total votes
    start = time.time()
    ds =ds.sort("total_votes")
    end = time.time()
    print(f"Time taken for sort operation  : {end - start} seconds")
    ds.show(3)

  4. Next, we will create a Python UDF function that allows you to write customized business logic in transformations. In our UDF we have written a logic to find out the products that are rated low (i.e. total votes less than 100).We create a UDF as a function on pandas DataFrame batches. For the supported input batch formats, see the UDF Input Batch Format. We also demonstrate using map_batches() which applies the given function to the batches of records of this dataset. Map_batches() uses the default compute strategy (tasks), which helps distribute the data processing to multiple Ray workers, which are used to run tasks. For more information on a map_batches() transformation, please see the following documentation.
    # UDF as a function on pandas DataFrame - To Find products with total_votes < 100 
    def low_rated_products(df: pandas.DataFrame) -> pandas.DataFrame:
        return df[(df["total_votes"] < 100)]
        
    #Calculate the number of products which are rated low in terms of low votes i.e. less than 100
    # This technique is called Batch inference processing with Ray tasks (the default compute strategy).
    ds = ds.map_batches(low_rated_products)
    
    #See sample records for the products which are rated low in terms of low votes i.e. less than 100
    ds.show(1)

    #Count total number of products which are rated low 
    ds.count()

  5. If you have complex transformations that require more resources for data processing, we recommend utilizing Ray actors using additional configurations with applicable transformations. We have demonstrated with map_batches() below:
    # Batch inference processing with Ray actors. Autoscale the actors between 2 and 4.
    
    class LowRatedProducts:
        def __init__(self):
            self._model = low_rated_products
    
        def __call__(self, batch: pandas.DataFrame) -> pandas.DataFrame:
            return self._model(batch)
    
    start = time.time()
    predicted = ds.map_batches(
        LowRatedProducts, compute=ActorPoolStrategy(2, 4), batch_size=4)
    end = time.time()
    

  6. Next, before writing the final resultant Ray dataset we will apply map_batches() transformations to filter out the customer reviews data where the total votes for a given product is greater than 0 and the reviews belongs to the “US” marketplace only. Using map_batches() for the filter operation is better in terms of performance in comparison to filter() transformation.
    # Filter our records with total_votes == 0
    ds = ds.map_batches(lambda df: df[df["total_votes"] > 0])
    
    # Filter and select records with marketplace equals US only
    ds = ds.map_batches(lambda df: df[df["marketplace"] == 'US'])
    
    ds.count()

  7. Finally, we write the resultant data to the S3 bucket you created in a Parquet file format. You can use different dataset APIs available, such as write_csv() or write_json() for different file formats.  Additionally, you can convert the resultant dataset to another DataFrame type such as Mars, Modin or pandas.
    ds.write_parquet("s3://<your-own-s3-bucket>/manta/Output/Raydemo/")

Clean up

To avoid incurring future charges, delete the Amazon S3 bucket and Jupyter notebook.

  1. On the Amazon S3 console, choose Buckets.
  2. Choose the bucket you created.
  3. Choose Empty and enter your bucket name.
  4. Choose Confirm.
  5. Choose Delete and enter your bucket name.
  6. Choose Delete bucket.
  7. On the AWS Glue console, choose Interactive Sessions
  8. Choose the interactive session you created.
  9. Choose Delete to remove the interactive session.

Conclusion

In this post, we demonstrated how you can use AWS Glue for Ray to run your Python code in a distributed environment.  You can now run your data and ML applications in a multi-node environment.

Refer to the Ray documentation for additional information and use cases.


About the authors

Zach Mitchell is a Sr. Big Data Architect. He works within the product team to enhance understanding between product engineers and their customers while guiding customers through their journey to develop data lakes and other data solutions on AWS analytics services.

Ishan Gaur works as Sr. Big Data Cloud Engineer ( ETL ) specialized in AWS Glue. He’s passionate about helping customers build out scalable distributed ETL workloads and implement scalable data processing and analytics pipelines on AWS. When not at work, Ishan likes to cook, travel with his family, or listen to music.

Derek Liu is a Solutions Architect on the Enterprise team based out of Vancouver, BC.  He is part of the AWS Analytics field community and enjoys helping customers solve big data challenges through AWS analytic services.

Kinshuk Pahare is a Principal Product Manager on AWS Glue.

New – Amazon Redshift Support in AWS Backup

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-amazon-redshift-support-in-aws-backup/

With Amazon Redshift, you can analyze data in the cloud at any scale. Amazon Redshift offers native data protection capabilities to protect your data using automatic and manual snapshots. This works great by itself, but when you’re using other AWS services, you have to configure more than one tool to manage your data protection policies.

To make this easier, I am happy to share that we added support for Amazon Redshift in AWS Backup. AWS Backup allows you to define a central backup policy to manage data protection of your applications and can now also protect your Amazon Redshift clusters. In this way, you have a consistent experience when managing data protection across all supported services. If you have a multi-account setup, the centralized policies in AWS Backup let you define your data protection policies across all your accounts within your AWS Organizations. To help you meet your regulatory compliance needs, AWS Backup now includes Amazon Redshift in its auditor-ready reports. You also have the option to use AWS Backup Vault Lock to have immutable backups and prevent malicious or inadvertent changes.

Let’s see how this works in practice.

Using AWS Backup with Amazon Redshift
The first step is to turn on the Redshift resource type for AWS Backup. In the AWS Backup console, I choose Settings in the navigation pane and then, in the Service opt-in section, Configure resources. There, I toggle the Redshift resource type on and choose Confirm.

Console screenshot.

Now, I can create or update a backup plan to include the backup of all, or some, of my Redshift clusters. In the backup plan, I can define how often these backups should be taken and for how long they should be kept. For example, I can have daily backups with one week of retention, weekly backups with one month of retention, and monthly backups with one year of retention.

I can also create on-demand backups. Let’s see this with more details. I choose Protected resources in the navigation pane and then Create on-demand backup.

I select Redshift in the Resource type dropdown. In the Cluster identifier, I select one of my clusters. For this workload, I need two weeks of retention. Then, I choose Create on-demand backup.

Console screenshot.

My data warehouse is not huge, so after a few minutes, the backup job has completed.

Console screenshot.

I now see my Redshift cluster in the list of the resources protected by AWS Backup.

Console screenshot.

In the Protected resources list, I choose the Redshift cluster to see the list of the available recovery points.

Console screenshot.

When I choose one of the recovery points, I have the option to restore the full data warehouse or just a table into a new Redshift cluster.

Console screenshot.

I now have the possibility to edit the cluster and database configuration, including security and networking settings. I just update the cluster identifier, otherwise the restore would fail because it must be unique. Then, I choose Restore backup to start the restore job.

After some time, the restore job has completed, and I see the old and the new clusters in the Amazon Redshift console. Using AWS Backup gives me a simple centralized way to manage data protection for Redshift clusters as well as many other resources in my AWS accounts.

Console screenshot.

Availability and Pricing
Amazon Redshift support in AWS Backup is available today in the AWS Regions where both AWS Backup and Amazon Redshift are offered, with the exception of the Regions based in China. You can use this capability via the AWS Management Console, AWS Command Line Interface (CLI), and AWS SDKs.

There is no additional cost for using AWS Backup compared to the native snapshot capability of Amazon Redshift. Your overall costs depend on the amount of storage and retention you need. For more information, see AWS Backup pricing.

Danilo

New for Amazon Transcribe – Real-Time Analytics During Live Calls

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-for-amazon-transcribe-real-time-analytics-during-live-calls/

The experience customers have when interacting with a contact center can have a profound impact on them. For this reason, we launched Amazon Transcribe Call Analytics last year to help you analyze customer call recordings and get insights into issues and trends related to customer satisfaction and agent performance.

To assist agents in resolving live calls faster, we are introducing today real-time call analytics in Amazon Transcribe Call Analytics. Real-time call analytics provides APIs for developers to accurately transcribe live calls and at the same time identify customer experience issues and sentiment in real time. Transcribe Call Analytics uses state-of-the-art machine learning capabilities to automatically assess thousands of in-progress calls and detect customer experience issues, such as repeated requests to speak to a manager or cancel a subscription.

With a few clicks, supervisors and analysts can create categories in the AWS console to identify customer experience issues using criteria such as specific terms such as “not happy,” “poor quality,” and “cancel my subscription.” Transcribe Call Analytics analyzes in-progress calls in real time to detect when a category is met. Developers can use those signals, along with sentiment trends from the API, to build a proactive system that alerts supervisors about emerging issues or assists agents with relevant information to solve customer issues.

Transcribe Call Analytics also provides a real-time transcript of the live conversation that supervisors can use to quickly get up to speed on the customer interaction and assess the appropriate action. The in-call transcript also eliminates the need for customers to repeat themselves if the call is transferred to another agent. Agents can focus all their attention on the customer during the call instead of taking notes for entry in a CRM system because Transcribe Call Analytics includes an automated call summarization capability, which identifies the issue, outcome, and action item associated with a call.

Transcribe Call Analytics is a foundational API for AWS Contact Center Intelligence solutions such as post-call analytics and the updated real-time call analytics with agent assist solution using the new real-time capabilities.

Let’s see how this works in practice.

Exploring Real-Time Call Analytics in the Console
To see how this works visually, I use the Amazon Transcribe console. First, I create a category to be notified if some terms are used in the call that would require an escalation. I choose Category Management from the navigation pane and then Create category.

I enter Escalation as the name for the category. I select REAL_TIME in the Category type dropdown. Then, I choose Create from scratch.

Console screenshot.

I only need one rule for this category. In the Rule type dropdown, I select Transcript content match. In the next three options, I choose to trigger the rule when any of the words are mentioned during the entire call, and the speaker is either the customer or the agent. Now, I can enter the words or phrases to look for in the transcript. In this case, I enter cancel, canceled, cancelled, manager, and supervisor. In your case, you might be more specific depending on your business. For example, if subscriptions are your business, you can look for the phrase cancel my subscription.

Console screenshot.

Now that the category has been created, I use one of the sample calls in the console to test it. I choose Real-Time Analytics in the navigation pane. By choosing Configure advanced settings, I can configure the personally identifiable information (PII) identification and redaction settings. For example, I can choose to identify personal data such as email addresses or redact financial data like bank account numbers.

With no additional charge, I can enable Post-call Analytics so that, at the end of the call, I receive the output of the transcription job in an Amazon Simple Storage Service (Amazon S3) bucket. This output is in a similar format to what I’d receive if I were analyzing a call recording with Transcribe Call Analytics. In this way, I can use the post-call analytics output derived from the audio stream in any process I already have in place for output of analytics generated from call recordings, for example, to update dashboards or generate periodic reports.

With Insurance complaints in Step 1: Specify input audio selected, I choose Start streaming. In the Transcription output section of the console, I receive in real-time the transcription of the call. The words of the customer and agent appear as they are pronounced. Each sentence is flagged with its recognized sentiment (positive, neutral, or negative). The Escalation category that I just configured is found in two sentences, first, when the customer mentions that their insurance has been canceled, and then when the agent mentions their manager. Also, part of a sentence is underlined because an issue has been detected.

Console screenshot.

Using the Download dropdown, I download the full JSON transcript. If I am only interested in the transcription, I can download the text transcript. The JSON transcript contains an array where each item is similar to what I’d get in real time when using the real-time call analytics API.

Using the Live Call Analytics With Agent Assist (LCA) Solution
You can use the open-source real-time call analytics with agent assist solution for your contact center or as an inspiration of what Amazon Transcribe enables for developers. Let’s look at a couple of screenshots to understand how it works.

Here there is a list of on-going calls with the overall sentiment, the sentiment trend (is it improving or not?), and the categories found in real-time during the call that can be used for specific activities.

Screenshot from the real-time call analytics with agent assist solution.

When selecting a call from the list, you have access to more in-depth information, including the call transcript and the issues found during the on-going call. This allows to take action quickly to help resolve the call.

Screenshot from the real-time call analytics with agent assist solution.

Availability and Pricing
Amazon Transcribe Call Analytics with real-time capabilities is available today in US (N. Virginia, Oregon), Canada (Central), Europe (Frankfurt, London), and Asia Pacific (Seoul, Sydney, Tokyo) and supports US English, British English, Australian English, US Spanish, Canadian French, French, German, Italian, and Brazilian Portuguese.

With Amazon Transcribe Call Analytics, you pay as you go and are billed monthly based on tiered pricing. For more information, see Amazon Transcribe pricing.

As part of the AWS Free Tier, you can get started with Amazon Transcribe Call Analytics for free, including the new real-time call analytics API. You can analyze up to 60 minutes of call audio monthly for free for the first 12 months. For more information, see the AWS Free Tier page.

If you’re at re:Invent, you can learn more about this new capability in session AIM307 – JPMorganChase real-time agent assist for contact center productivity. I will update this post when the recording of the session is publicly available.

Start analyzing contact center conversations in real-time to improve your customers’ experience.

Danilo

Graph for fraud detection

Post Syndicated from Grab Tech original https://engineering.grab.com/graph-for-fraud-detection

Grab has grown rapidly in the past few years. It has expanded its business from ride hailing to food and grocery delivery, financial services, and more. Fraud detection is challenging in Grab, because new fraud patterns always arise whenever we introduce a new business product. We cannot afford to develop a new model whenever a new fraud pattern appears as it is time consuming and introduces a cold start problem, that is no protection at the early stage. We need a general fraud detection framework to better protect Grab from various unknown fraud risks.

Our key observation is that although Grab has many different business verticals, the entities within those businesses are connected to each other (Figure 1. Left), for example, two passengers may be connected by a Wi-Fi router or phone device, a merchant may be connected to a passenger by a food order, and so on. A graph provides an elegant way to capture the spatial correlation among different entities in the Grab ecosystem. A common fraud shows clear patterns on a graph, for example, a fraud syndicate tends to share physical devices, and collusion happens between a merchant and an isolated set of passengers (Figure 1. Right).

Figure 1. Left: The graph captures different correlations in the Grab ecosystem.
Right: The graph shows that common fraud has clear patterns.

We believe graphs can help us discover subtle traces and complicated fraud patterns more effectively. Graph-based solutions will be a sustainable foundation for us to fight against known and unknown fraud risks.

Why graph?

The most common fraud detection methods include the rule engine and the decision tree-based models, for example, boosted tree, random forest, and so on. Rules are a set of simple logical expressions designed by human experts to target a particular fraud problem. They are good for simple fraud detection, but they usually do not work well in complicated fraud or unknown fraud cases.

Fraud detection methods

Utilises correlations
(Higher is better)
Detects unknown fraud
(Higher is better)
Requires feature engineering
(Lower is better)
Depends on labels
(Lower is better)
Rule engine Low N/A N/A Low
Decision tree Low Low High High
Graph model High High Low Low

Table 1. Graph vs. common fraud detection methods.

Decision tree-based models have been dominating fraud detection and Kaggle competitions for structured or tabular data in the past few years. With that said, the performance of a tree-based model is highly dependent on the quality of labels and feature engineering, which is often hard to obtain in real life. In addition, it usually does not work well in unknown fraud which has not been seen in the labels.

On the other hand, a graph-based model requires little amount of feature engineering and it is applicable to unknown fraud detection with less dependence on labels, because it utilises the structural correlations on the graph.

In particular, fraudsters tend to show strong correlations on a graph, because they have to share physical properties such as personal identities, phone devices, Wi-Fi routers, delivery addresses, and so on, to reduce cost and maximise revenue as shown in Figure 2 (left). An example of such strong correlations is shown in Figure 2 (right), where the entities on the graph are densely connected, and the known fraudsters are highlighted in red. Those strong correlations on the graph are the key reasons that make the graph based approach a sustainable foundation for various fraud detection tasks.

Figure 2. Fraudsters tend to share physical properties to reduce cost (left), and they are densely connected as shown on a graph (right).

Semi-supervised graph learning

Unlike traditional decision tree-based models, the graph-based machine learning model can utilise the graph’s correlations and achieve great performance even with few labels. The semi-supervised Graph Convolutional Network model has been extremely popular in recent years 1. It has proven its success in many fraud detection tasks across industries, for example, e-commerce fraud, financial fraud, internet traffic fraud, etc.
We apply the Relational Graph Convolutional Network (RGCN) 2 for fraud detection in Grab’s ecosystem. Figure 3 shows the overall architecture of RGCN. It takes a graph as input, and the graph passes through several graph convolutional layers to get node embeddings. The final layer outputs a fraud probability for each node. At each graph convolutional layer, the information is propagated along the neighbourhood nodes within the graph, that is nodes that are close on the graph are similar to each other.

Fig 3. A semi-supervised Relational Graph Convolutional Network model.

We train the RGCN model on a graph with millions of nodes and edges, where only a few percentages of the nodes on the graph have labels. The semi-supervised graph model has little dependency on the labels, which makes it a robust model for tackling various types of unknown fraud.

Figure 4 shows the overall performance of the RGCN model. On the left is the Receiver Operating Characteristic (ROC) curve on the label dataset, in particular, the Area Under the Receiver Operating Characteristic (AUROC) value is close to 1, which means the RGCN model can fit the label data quite well. The right column shows the low dimensional projections of the node embeddings on the label dataset. It is clear that the embeddings of the genuine passenger are well separated from the embeddings of the fraud passenger. The model can distinguish between a fraud and a genuine passenger quite well.

Fig 4. Left: ROC curve of the RGCN model on the label dataset.
Right: Low dimensional projections of the graph node embeddings.

Finally, we would like to share a few tips that will make the RGCN model work well in practice.

  • Use less than three convolutional layers: The node feature will be over-smoothed if there are many convolutional layers, that is all the nodes on the graph look similar.
  • Node features are important: Domain knowledge of the node can be formulated as node features for the graph model, and rich node features are likely to boost the model performance.

Graph explainability

Unlike other deep network models, graph neural network models usually come with great explainability, that is why a user is classified as fraudulent. For example, fraudulent accounts are likely to share hardware devices and form dense clusters on the graph, and those fraud clusters can be easily spotted on a graph visualiser 3.

Figure 5 shows an example where graph visualisation helps to explain the model prediction scores. The genuine passenger with a low RGCN score does not share devices with other passengers, while the fraudulent passenger with a high RGCN score shares devices with many other passengers, that is, dense clusters.

Figure 5. Upper left: A genuine passenger with a low RGCN score has no device sharing with other passengers. Bottom right: A fraudulent user with a high RGCN score shares devices with many other passengers.

Closing thoughts

Graphs provide a sustainable foundation for combating many different types of fraud risks. Fraudsters are evolving very fast these days, and the best traditional rules or models can do is to chase after those fraudsters given that a fraud pattern has already been discovered. This is suboptimal as the damage has already been done on the platform. With the help of graph models, we can potentially detect those fraudsters before any fraudulent activity has been conducted, thus reducing the fraud cost.

The graph structural information can significantly boost the model performance without much dependence on labels, which is often hard to get and might have a large bias in fraud detection tasks. We have shown that with only a small percentage of labelled nodes on the graph, our model can already achieve great performance.

With that said, there are also many challenges to making a graph model work well in practice. We are working towards solving the following challenges we are facing.

  • Feature initialisation: Sometimes, it is hard to initialise the node feature, for example, a device node does not carry many semantic meanings. We have explored self-supervised pre-training 4 to help the feature initialisation, and the preliminary results are promising.
  • Real-time model prediction: Realtime graph model prediction is challenging because real-time graph updating is a heavy operation in most cases. One possible solution is to do batch real-time prediction to reduce the overhead.
  • Noisy connections: Some connections on the graph are inherently noisy on the graph, for example, two users sharing the same IP address does not necessarily mean they are physically connected. The IP might come from a mobile network. One possible solution is to use the attention mechanism in the graph convolutional kernel and control the message passing based on the type of connection and node profiles.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

References

  1. T. Kipf and M. Welling, “Semi-supervised classification with graph convolutional networks,” in ICLR, 2017 

  2. Schlichtkrull, Michael, et al. “Modeling relational data with graph convolutional networks.” European semantic web conference. Springer, Cham, 2018. 

  3. Fujiao Liu, Shuqi Wang, et al.. “Graph Networks – 10X investigation with Graph Visualisations”. Grab Tech Blog. 

  4. Wang, Chen, et al.. “Deep Fraud Detection on Non-attributed Graph.” IEEE Big Data conference, PSBD, 2021. 

Implement row-level access control in a multi-tenant environment with Amazon Redshift

Post Syndicated from Siva Bangaru original https://aws.amazon.com/blogs/big-data/implement-row-level-access-control-in-a-multi-tenant-environment-with-amazon-redshift/

This is a guest post co-written with Siva Bangaru and Leon Liu from ADP.

ADP helps organizations of all types and sizes by providing human capital management (HCM) solutions that unite HR, payroll, talent, time, tax, and benefits administration. ADP is a leader in business outsourcing services, analytics, and compliance expertise. ADP’s unmatched experience, deep insights, and cutting-edge technology have transformed human resources from a back-office administrative function to a strategic business advantage.

People Analytics powered by ADP DataCloud is an application that provides analytics and enhanced insights to ADP’s clients. It delivers a guided analytics experience that make it easy for you to create, use, and distribute tailored analytics for your organization. ADP People Analytics’s streamlined, configurable dashboards can help you identify potential issues in key areas, like overtime, turnover, compensation, and much more.

ADP provides this analytics experience to thousands of clients today. Securing customers’ data is a top priority for ADP. The company requires the highest security standards when implementing a multi-tenant analytics platform on Amazon Redshift.

ADP DataCloud integrates with Amazon Redshift row-level security (RLS) to implement granular data entitlements and enforce the access restrictions on their tables in Amazon Redshift.

In this post, we discuss how the ADP DataCloud team implemented Amazon Redshift RLS on the foundation of role-based access control (RBAC) to simplify managing privileges required in a multi-tenant environment, and also enabled and enforced access to granular data entitlements in business terms.

The ADP DataCloud team had the following key requirements and challenges:

  • Support a multi-tenant application to enforce a logical separation of each tenant’s data rows
  • Support dynamic provisioning of new tenants
  • Minimal impact on performance

Row-level security in Amazon Redshift

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. One of the challenges with security is that enterprises want to provide fine-grained access control at the row level for sensitive data. This can be done by creating views or using different databases and schemas for different users. However, this approach isn’t scalable and becomes complex to maintain over time, especially when supporting multi-tenant environments.

In early 2022, Amazon Redshift released row-level security, which is built on the foundation of role-based access control. RLS allows you to control which users or roles can access specific records of data within tables, based on security policies that are defined at the database object level. This new RLS capability in Amazon Redshift enables you to dynamically filter existing rows of data in a table along with session context variable setting capabilities to dynamically assign the appropriate tenant configuration. This is in addition to column-level access control, where you can grant users permissions to a subset of columns. Now you can combine column-level access control with RLS policies to further restrict access to particular rows of visible columns. Refer to Achieve fine-grained data security with row-level access control in Amazon Redshift for additional details.

Solution overview

As part of ADP’s key requirements to support a multi-tenant data store wherein a single table holds data of multiple tenants, enforcement of security policies to ensure no cross-tenant data access is of paramount importance. One obvious way to ensure this is by creating database users for each tenant and implementing RLS policies to filter a single tenant’s data as per the logged-in user. But this can be tedious and become cumbersome to maintain as the number of tenants grow by the thousands.

This post presents another way to handle this use case by combining session context variables and RLS policies on tables to filter a single tenant’s data, thereby easing the burden of creating and maintaining thousands of database users. In fact, a single database user is all that is needed to connect and query different tenant’s data in different sessions from a multi-tenant table by setting different values to a session context variable in each session, as shown in the following diagram.

Let’s start by covering the high-level implementation steps. Consider there is a database user in Amazon Redshift app_user (which is neither a super user, nor has the sys:secadmin role granted, nor has the IGNORE RLS system privilege granted via another role). The user app_user owns a schema with the same name and all objects in it. The following is a typical multi-tenant table employee in the app_user schema with some sample records shown in the table:

CREATE TABLE app_user.employee (
    tenant_id varchar(50) not null,
    id varchar(50) not null,
    name varchar(200),
    email varchar(200),
    ssn char(9),
    constraint employee_pkey primary key (tenant_id,id)	
);
TENANT_ID ID NAME EMAIL SSN
T0001 E4646 Andy . XXXXXXXXX
T0001 E4689 Bob . XXXXXXXXX
T0001 E4691 Christina . XXXXXXXXX
T0002 E4733 Peter . XXXXXXXXX
T0002 E4788 Quan . XXXXXXXXX
T0002 E4701 Rose . XXXXXXXXX
T0003 E5699 Diana . XXXXXXXXX
T0003 E5608 Emily . XXXXXXXXX
T0003 E5645 Florence . XXXXXXXXX

To implement that, the following steps are required:

  1. Create a RLS policy on a column using a predicate that is set using a session context variable.
  2. Enable RLS at the table level and attach the RLS policy on the table.
  3. Create a stored procedure that sets the session context variable used in the RLS policy predicate.
  4. Connect and call the stored procedure to set the session context variable and query the table.

Now RLS can be enabled on this table in such a way that whenever app_user queries the employee table, the user will either see no rows or retrieve only rows specific to single tenant despite being the owner of the table.

An administrator, such as app_admin, either a super user or a user that has the sys:secadmin role, can enforce this as follows:

  1. Create a RLS policy that attaches a tenant_id predicate using a session context variable:
    create rls policy tenant_policy
    with (tenant_id varchar(50))
    using (tenant_id = current_setting('app_context.tenant_id'));

  2. Enable RLS and attach the policy on the employee table:
    alter table app_user.employee row level security on;
    
    attach rls policy tenant_policy on app_user.employee to public;

  3. Create a stored procedure to set the tenant_id in a session variable and grant access to app_user:
    create or replace procedure app_admin.set_app_context
    (p_tenant_id in varchar)
    	language plpgsql
    as $$ 	
    declare
    		v_tenant_id  varchar(50);
    begin
        reset all;   
    	v_tenant_id := set_config('app_context.tenant_id',p_tenant_id,false);	
    end;
     $$
    ;
    
    grant execute on app_admin. set_app_context(varchar) to app_user;

  4. Connect to app_user and call the stored procedure to set the session context variable:
    call app_admin.set_app_context('T0001');

When this setup is complete, whenever tenants are connecting to ADP Analytics dashboards, it connects as app_user and runs stored procedures by passing tenant_id, which sets the session context variable using the tenant ID. In this case, when requests come to connect and query the employee table, the user will experience the following scenarios:

  • No data is retrieved if current_setting('app_context.tenant_id') is not set or is null
  • Data is retrieved if current_setting('app_context.tenant_id') is set by calling the app_admin.set_app_context(varchar) procedure to a value that exists in the employee table (for example, app_admin.set_app_context(‘T0001’))

No data is retrieved if current_setting('app_context.tenant_id') is set to a value that doesn’t exist in the employee table (for example, app_admin.set_app_context(‘T9999’))

Validate RLS by examining query plans

Now let’s review the preceding scenarios by running an explain plan and observing how RLS works for the test setup. If a query contains a table that is subject to RLS policies, EXPLAIN displays a special RLS SecureScan node. Amazon Redshift also logs the same node type to the STL_EXPLAIN system table. EXPLAIN doesn’t reveal the RLS predicate that applies to the employee table. To view an explain plan with RLS predicate details, the EXPLAIN RLS system privilege is granted to app_user via a role.

In this first scenario, tenant_id wasn’t set by the stored procedure and was passed as a null value, therefore below select statement returns no rows .

=> select count(1),tenant_id from employee group by 2;

count | tenant_id

-------+-------------

(0 rows)

Explain plan output shows the filter as NULL:

=> explain select count(1),tenant_id from employee group by 2;

QUERY PLAN

--------------------------------------------------------------------------------

XN HashAggregate (cost=0.10..0.11 rows=4 width=20)

-> XN RLS SecureScan employee (cost=0.00..0.08 rows=4 width=20)

-> XN Result (cost=0.00..0.04 rows=4 width=20)

One-Time Filter: NULL::boolean

-> XN Seq Scan on employee (cost=0.00..0.04 rows=4 width=20)

(5 rows)

In the second scenario, tenant_id was set by the stored procedure and passed as a value of T0001, therefore returning only corresponding rows as shown in the explain plan output:

Call stored procedure to set the session context variable as ‘T0001’ and then run the select :

=> call app_admin.set_app_context('T0001');

=> select count(1),tenant_id from employee group by 2;
 count |   tenant_id
-------+------------------
     3 | T0001
(1 row)

Explain plan output shows the filter on tenant_id as ‘T0001’

=> explain select count(1),tenant_id from employee group by 2;
                                QUERY PLAN
--------------------------------------------------------------------------
 XN HashAggregate  (cost=0.07..0.07 rows=1 width=20)
   ->  XN RLS SecureScan employee  (cost=0.00..0.06 rows=1 width=20)
         ->  XN Seq Scan on employee  (cost=0.00..0.05 rows=1 width=20)
               Filter: ((tenant_id)::text = 'T0001'::text)
(4 rows)

In the third scenario, a non-existing tenant_id was set by the stored procedure, therefore returning no rows:

=> call app_admin.set_app_context('T9999');

=> select count(1),tenant_id from employee group by 2;
 count | tenant_id
-------+-------------
(0 rows)


=> explain select count(1),tenant_id from employee group by 2;
                                QUERY PLAN
--------------------------------------------------------------------------
 XN HashAggregate  (cost=0.07..0.07 rows=1 width=20)
   ->  XN RLS SecureScan employee  (cost=0.00..0.06 rows=1 width=20)
         ->  XN Seq Scan on employee  (cost=0.00..0.05 rows=1 width=20)
               Filter: ((tenant_id)::text = 'T9999'::text)
(4 rows)

Another key point is that you can apply the same policy to multiple tables as long as they have the column (tenant_id varchar(50)) defined with the same data type, because RLS polices are strongly typed in Amazon Redshift. Similarly, you can combine multiple RLS policies defined using different session context variables or other relevant column predicates and attach them to a single table.

Also, this RLS implementation doesn’t need any changes when a new tenant’s data is added to the table, because it can be queried by simply setting the new tenant’s identifier in the session context variable that is used to define the filter predicate inside the RLS policy. A tenant to its corresponding identifier mapping is typically done during an application’s tenant onboarding process and is generally maintained in a separate metastore, which is also referred to during each tenant’s login to get the tenant’s identifier. With that, thousands of tenants could be provisioned without needing to change any policy in Amazon Redshift. In our testing, we found no performance impact by tenants after RLS was implemented.

Conclusion

In this post, we demonstrated how the ADP DataCloud team implemented row-level security in a multi-tenant environment for thousands of customers using Amazon Redshift RLS and session context variables. For more information about RLS best practices, refer to Amazon Redshift security overview.

Try out RLS for your future Amazon Redshift implementations, and feel free to leave a comment about your use cases and experience.


About the authors

Siva Bangaru is a Database Architect at ADP. He has more than 13 years of experience with technical expertise on design, development, administration, and performance tuning of database solutions for a variety of OLAP and OLTP use cases on multiple database engines like Oracle, Amazon Aurora PostgreSQL, and Amazon Redshift.

Leon Liu is a Chief Architect at ADP. He has over 20 years of experience with enterprise application framework, architecture, data warehouses, and big data real-time processing.

Neha Daudani is a Solutions Architect at AWS. She has 15 years of experience in the data and analytics space. She has enabled clients on various projects on enterprise data warehouses, data governance, data visualization, master data management, data modeling, and data migration for clients to use business intelligence and analytics in business growth and operational efficiency.

Rohit Bansal is an Analytics Specialist Solutions Architect at AWS. He specializes in Amazon Redshift and works with customers to build next-generation analytics solutions using other AWS Analytics services.

Build your Apache Hudi data lake on AWS using Amazon EMR – Part 1

Post Syndicated from Suthan Phillips original https://aws.amazon.com/blogs/big-data/part-1-build-your-apache-hudi-data-lake-on-aws-using-amazon-emr/

Apache Hudi is an open-source transactional data lake framework that greatly simplifies incremental data processing and data pipeline development. It does this by bringing core warehouse and database functionality directly to a data lake on Amazon Simple Storage Service (Amazon S3) or Apache HDFS. Hudi provides table management, instantaneous views, efficient upserts/deletes, advanced indexes, streaming ingestion services, data and file layout optimizations (through clustering and compaction), and concurrency control, all while keeping your data in open-source file formats such as Apache Parquet and Apache Avro. Furthermore, Apache Hudi is integrated with open-source big data analytics frameworks, such as Apache Spark, Apache Hive, Apache Flink, Presto, and Trino.

In this post, we cover best practices when building Hudi data lakes on AWS using Amazon EMR. This post assumes that you have the understanding of Hudi data layout, file layout, and table and query types. The configuration and features can change with new Hudi versions; the concept of this post applies to Hudi versions of 0.11.0 (Amazon EMR release 6.7), 0.11.1 (Amazon EMR release 6.8) and 0.12.1 (Amazon EMR release 6.9).

Specify the table type: Copy on Write Vs. Merge on Read

When we write data into Hudi, we have the option to specify the table type: Copy on Write (CoW) or Merge on Read (MoR). This decision has to be made at the initial setup, and the table type can’t be changed after the table has been created. These two table types offer different trade-offs between ingest and query performance, and the data files are stored differently based on the chosen table type. If you don’t specify it, the default storage type CoW is used.

The following table summarizes the feature comparison of the two storage types.

CoW MoR
Data is stored in base files (columnar Parquet format). Data is stored as a combination of base files (columnar Parquet format) and log files with incremental changes (row-based Avro format).
COMMIT: Each new write creates a new version of the base files, which contain merged records from older base files and newer incoming records. Each write adds a commit action to the timeline, and each write atomically adds a commit action to the timeline, guaranteeing a write (and all its changes) entirely succeed or get entirely rolled back. DELTA_COMMIT: Each new write creates incremental log files for updates, which are associated with the base Parquet files. For inserts, it creates a new version of the base file similar to CoW. Each write adds a delta commit action to the timeline.
Write
In case of updates, write latency is higher than MoR due to the merge cost because it needs to rewrite the entire affected Parquet files with the merged updates. Additionally, writing in the columnar Parquet format (for CoW updates) is more latent in comparison to the row-based Avro format (for MoR updates). No merge cost for updates during write time, and the write operation is faster because it just appends the data changes to the new log file corresponding to the base file each time.
Compaction isn’t needed because all data is directly written to Parquet files. Compaction is required to merge the base and log files to create a new version of the base file.
Higher write amplification because new versions of base files are created for every write. Write cost will be O(number of files in storage modified by the write). Lower write amplification because updates go to log files. Write cost will be O(1) for update-only datasets and can get higher when there are new inserts.
Read
CoW table supports snapshot query and incremental queries.

MoR offers two ways to query the same underlying storage: ReadOptimized tables and Near-Realtime tables (snapshot queries).

ReadOptimized tables support read-optimized queries, and Near-Realtime tables support snapshot queries and incremental queries.

Read-optimized queries aren’t applicable for CoW because data is already merged to base files while writing. Read-optimized queries show the latest compacted data, which doesn’t include the freshest updates in the not yet compacted log files.
Snapshot queries have no merge cost during read. Snapshot queries merge data while reading if not compacted and therefore can be slower than CoW while querying the latest data.

CoW is the default storage type and is preferred for simple read-heavy use cases. Use cases with the following characteristics are recommended for CoW:

  • Tables with a lower ingestion rate and use cases without real-time ingestion
  • Use cases requiring the freshest data with minimal read latency because merging cost is taken care of at the write phase
  • Append-only workloads where existing data is immutable

MoR is recommended for tables with write-heavy and update-heavy use cases. Use cases with the following characteristics are recommended for MoR:

  • Faster ingestion requirements and real-time ingestion use cases.
  • Varying or bursty write patterns (for example, ingesting bulk random deletes in an upstream database) due to the zero-merge cost for updates during write time
  • Streaming use cases
  • Mix of downstream consumers, where some are looking for fresher data by paying some additional read cost, and others need faster reads with some trade-off in data freshness

For streaming use cases demanding strict ingestion performance with MoR tables, we suggest running the table services (for example, compaction and cleaning) asynchronously, which is discussed in the upcoming Part 3 of this series.

For more details on table types and use cases, refer to How do I choose a storage type for my workload?

Select the record key, key generator, preCombine field, and record payload

This section discusses the basic configurations for the record key, key generator, preCombine field, and record payload.

Record key

Every record in Hudi is uniquely identified by a Hoodie key (similar to primary keys in databases), which is usually a pair of record key and partition path. With Hoodie keys, you can enable efficient updates and deletes on records, as well as avoid duplicate records. Hudi partitions have multiple file groups, and each file group is identified by a file ID. Hudi maps Hoodie keys to file IDs, using an indexing mechanism.

A record key that you select from your data can be unique within a partition or across partitions. If the selected record key is unique within a partition, it can be uniquely identified in the Hudi dataset using the combination of the record key and partition path. You can also combine multiple fields from your dataset into a compound record key. Record keys cannot be null.

Key generator

Key generators are different implementations to generate record keys and partition paths based on the values specified for these fields in the Hudi configuration. The right key generator has to be configured depending on the type of key (simple or composite key) and the column data type used in the record key and partition path columns (for example, TimestampBasedKeyGenerator is used for timestamp data type partition path). Hudi provides several key generators out of the box, which you can specify in your job using the following configuration.

Configuration Parameter Description Value
hoodie.datasource.write.keygenerator.class Key generator class, which generates the record key and partition path Default value is SimpleKeyGenerator

The following table describes the different types of key generators in Hudi.

Key Generators Use-case
SimpleKeyGenerator Use this key generator if your record key refers to a single column by name and similarly your partition path also refers to a single column by name.
ComplexKeyGenerator Use this key generator when record key and partition paths comprise multiple columns. Columns are expected to be comma-separated in the config value (for example, "hoodie.datasource.write.recordkey.field" : “col1,col4”).
GlobalDeleteKeyGenerator

Use this key generator when you can’t determine the partition of incoming records to be deleted and need to delete only based on record key. This key generator ignores the partition path while generating keys to uniquely identify Hudi records.

When using this key generator, set the config hoodie.[bloom|simple|hbase].index.update.partition.path to false in order to avoid redundant data written to the storage.

NonPartitionedKeyGenerator Use this key generator for non-partitioned datasets because it returns an empty partition for all records.
TimestampBasedKeyGenerator Use this key generator for a timestamp data type partition path. With this key generator, the partition path column values are interpreted as timestamps. The record key is the same as before, which is a single column converted to string. If using TimestampBasedKeyGenerator, a few more configs need to be set.
CustomKeyGenerator Use this key generator to take advantage of the benefits of SimpleKeyGenerator, ComplexKeyGenerator, and TimestampBasedKeyGenerator all at the same time. With this you can configure record key and partition paths as a single field or a combination of fields. This is helpful if you want to generate nested partitions with each partition key of different types (for example, field_3:simple,field_5:timestamp). For more information, refer to CustomKeyGenerator.

The key generator class can be automatically inferred by Hudi if the specified record key and partition path require a SimpleKeyGenerator or ComplexKeyGenerator, depending on whether there are single or multiple record key or partition path columns. For all other cases, you need to specify the key generator.

The following flow chart explains how to select the right key generator for your use case.

PreCombine field

This is a mandatory field that Hudi uses to deduplicate the records within the same batch before writing them. When two records have the same record key, they go through the preCombine process, and the record with the largest value for the preCombine key is picked by default. This behavior can be customized through custom implementation of the Hudi payload class, which we describe in the next section.

The following table summarizes the configurations related to preCombine.

Configuration Parameter Description Value
hoodie.datasource.write.precombine.field The field used in preCombining before the actual write. It helps select the latest record whenever there are multiple updates to the same record in a single incoming data batch.

The default value is ts. You can configure it to any column in your dataset that you want Hudi to use to deduplicate the records whenever there are multiple records with the same record key in the same batch. Currently, you can only pick one field as the preCombine field.

Select a column with the timestamp data type or any column that can determine which record holds the latest version, like a monotonically increasing number.

hoodie.combine.before.upsert During upsert, this configuration controls whether deduplication should be done for the incoming batch before ingesting into Hudi. This is applicable only for upsert operations. The default value is true. We recommend keeping it at the default to avoid duplicates.
hoodie.combine.before.delete Same as the preceding config, but applicable only for delete operations. The default value is true. We recommend keeping it at the default to avoid duplicates.
hoodie.combine.before.insert When inserted records share the same key, the configuration controls whether they should be first combined (deduplicated) before writing to storage. The default value is false. We recommend setting it to true if the incoming inserts or bulk inserts can have duplicates.

Record payload

Record payload defines how to merge new incoming records against old stored records for upserts.

The default OverwriteWithLatestAvroPayload payload class always overwrites the stored record with the latest incoming record. This works fine for batch jobs and most use cases. But let’s say you have a streaming job and want to prevent the late-arriving data from overwriting the latest record in storage. You need to use a different payload class implementation (DefaultHoodieRecordPayload) to determine the latest record in storage based on an ordering field, which you provide.

For example, in the following example, Commit 1 has HoodieKey 1, Val 1, preCombine10, and in-flight Commit 2 has HoodieKey 1, Val 2, preCombine 5.

If using the default OverwriteWithLatestAvroPayload, the Val 2 version of the record will be the final version of the record in storage (Amazon S3) because it’s the latest version of the record.

If using DefaultHoodieRecordPayload, it will honor Val 1 because the Val 2’s record version has a lower preCombine value (preCombine 5) compared to Val 1’s record version, while merging multiple versions of the record.

You can select a payload class while writing to the Hudi table using the configuration hoodie.datasource.write.payload.class.

Some useful in-built payload class implementations are described in the following table.

Payload Class Description
OverwriteWithLatestAvroPayload (org.apache.hudi.common.model.OverwriteWithLatestAvroPayload) Chooses the latest incoming record to overwrite any previous version of the records. Default payload class.
DefaultHoodieRecordPayload (org.apache.hudi.common.model.DefaultHoodieRecordPayload) Uses hoodie.payload.ordering.field to determine the final record version while writing to storage.
EmptyHoodieRecordPayload (org.apache.hudi.common.model.EmptyHoodieRecordPayload) Use this as payload class to delete all the records in the dataset.
AWSDmsAvroPayload (org.apache.hudi.common.model.AWSDmsAvroPayload) Use this as payload class if AWS DMS is used as source. It provides support for seamlessly applying changes captured via AWS DMS. This payload implementation performs insert, delete, and update operations on the Hudi table based on the operation type for the CDC record obtained from AWS DMS.

Partitioning

Partitioning is the physical organization of files within a table. They act as virtual columns and can impact the max parallelism we can use on writing.

Extremely fine-grained partitioning (for example, over 20,000 partitions) can create excessive overhead for the Spark engine managing all the small tasks, and can degrade query performance by reducing file sizes. Also, an overly coarse-grained partition strategy, without clustering and data skipping, can negatively impact both read and upsert performance with the need to scan more files in each partition.

Right partitioning helps improve read performance by reducing the amount of data scanned per query. It also improves upsert performance by limiting the number of files scanned to find the file group in which a specific record exists during ingest. A column frequently used in query filters would be a good candidate for partitioning.

For large-scale use cases with evolving query patterns, we suggest coarse-grained partitioning (such as date), while using fine-grained data layout optimization techniques (clustering) within each partition. This opens the possibility of data layout evolution.

By default, Hudi creates the partition folders with just the partition values. We recommend using Hive style partitioning, in which the name of the partition columns is prefixed to the partition values in the path (for example, year=2022/month=07 as opposed to 2022/07). This enables better integration with Hive metastores, such as using msck repair to fix partition paths.

To support Apache Hive style partitions in Hudi, we have to enable it in the config hoodie.datasource.write.hive_style_partitioning.

The following table summarizes the key configurations related to Hudi partitioning.

Configuration Parameter Description Value
hoodie.datasource.write.partitionpath.field Partition path field. This is a required configuration that you need to pass while writing the Hudi dataset. There is no default value set for this. Set it to the column that you have determined for partitioning the data. We recommend that it doesn’t cause extremely fine-grained partitions.
hoodie.datasource.write.hive_style_partitioning Determines whether to use Hive style partitioning. If set to true, the names of partition folders follow <partition_column_name>=<partition_value> format. Default value is false. Set it to true to use Hive style partitioning.
hoodie.datasource.write.partitionpath.urlencode Indicates if we should URL encode the partition path value before creating the folder structure. Default value is false. Set it to true if you want to URL encode the partition path value. For example, if you’re using the data format “yyyy-MM-dd HH:mm:ss“, the URL encode needs to be set to true because it will result in an invalid path due to :.

Note that if the data isn’t partitioned, you need to specifically use NonPartitionedKeyGenerator for the record key, which is explained in the previous section. Additionally, Hudi doesn’t allow partition columns to be changed or evolved.

Choose the right index

After we select the storage type in Hudi and determine the record key and partition path, we need to choose the right index for upsert performance. Apache Hudi employs an index to locate the file group that an update/delete belongs to. This enables efficient upsert and delete operations and enforces uniqueness based on the record keys.

Global index vs. non-global index

When picking the right indexing strategy, the first decision is whether to use a global (table level) or non-global (partition level) index. The main difference between global vs. non-global indexes is the scope of key uniqueness constraints. Global indexes enforce uniqueness of the keys across all partitions of a table. The non-global index implementations enforce this constraint only within a specific partition. Global indexes offer stronger uniqueness guarantees, but they come with a higher update/delete cost, for example global deletes with just the record key need to scan the entire dataset. HBase indexes are an exception here, but come with an operational overhead.

For large-scale global index use cases, use an HBase index or record-level index (available in Hudi 0.13) because for all other global indexes, the update/delete cost grows with the size of the table, O(size of the table).

When using a global index, be aware of the configuration hoodie[bloom|simple|hbase].index.update.partition.path, which is already set to true by default. For existing records getting upserted to a new partition, enabling this configuration will help delete the old record in the old partition and insert it in the new partition.

Hudi index options

After picking the scope of the index, the next step is to decide which indexing option best fits your workload. The following table explains the indexing options available in Hudi as of 0.11.0.

Indexing Option How It Works Characteristic Scope
Simple Index Performs a join of the incoming upsert/delete records against keys extracted from the involved partition in case of non-global datasets and the entire dataset in case of global or non-partitioned datasets. Easiest to configure. Suitable for basic use cases like small tables with evenly spread updates. Even for larger tables where updates are very random to all partitions, a simple index is the right choice because it directly joins with interested fields from every data file without any initial pruning, as compared to Bloom, which in the case of random upserts adds additional overhead and doesn’t give enough pruning benefits because the Bloom filters could indicate true positive for most of the files and end up comparing ranges and filters against all these files. Global/Non-global
Bloom Index (default index in EMR Hudi) Employs Bloom filters built out of the record keys, optionally also pruning candidate files using record key ranges. Bloom filter is stored in the data file footer while writing the data.

More efficient filter compared to simple index for use cases like late-arriving updates to fact tables and deduplication in event tables with ordered record keys such as timestamp. Hudi implements a dynamic Bloom filter mechanism to reduce false positives provided by Bloom filters.

In general, the probability of false positives increases with the number of records in a given file. Check the Hudi FAQ for Bloom filter configuration best practices.

Global/Non-global
Bucket Index It distributes records to buckets using a hash function based on the record keys or subset of it. It uses the same hash function to determine which file group to match with incoming records. New indexing option since hudi 0.11.0. Simple to configure. It has better upsert throughput performance compared to the Bloom filter. As of Hudi 0.11.1, only fixed bucket number is supported. This will no longer be an issue with the upcoming consistent hashing bucket index feature, which can dynamically change bucket numbers. Non-global
HBase Index The index mapping is managed in an external HBase table. Best lookup time, especially for large numbers of partitions and files. It comes with additional operational overhead because you need to manage an external HBase table. Global

Use cases suitable for simple index

Simple indexes are most suitable for workloads with evenly spread updates over partitions and files on small tables, and also for larger tables with dimension kind of workloads because updates are random to all partitions. A common example is a CDC pipeline for a dimension table. In this case, updates end up touching a large number of files and partitions. Therefore, a join with no other pruning is most efficient.

Use cases suitable for Bloom index

Bloom indexes are suitable for most production workloads with uneven update distribution across partitions. For workloads with most updates to recent data like fact tables, Bloom filter rightly fits the bill. It can be clickstream data collected from an ecommerce site, bank transactions in a FinTech application, or CDC logs for a fact table.

When using a Bloom index, be aware of the following configurations:

  • hoodie.bloom.index.use.metadata – By default, it is set to false. When this flag is on, the Hudi writer gets the index metadata information from the metadata table and doesn’t need to open Parquet file footers to get the Bloom filters and stats. You prune out the files by just using the metadata table and therefore have improved performance for larger tables.
  • hoodie.bloom.index.prune.by.rangesEnable or disable range pruning based on use case. By default, it’s already set to true. When this flag is on, range information from files is used to speed up index lookups. This is helpful if the selected record key is monotonously increasing. You can set any record key to be monotonically increasing by adding a timestamp prefix. If the record key is completely random and has no natural ordering (such as UUIDs), it’s better to turn this off, because range pruning will only add extra overhead to the index lookup.

Use cases suitable for bucket index

Bucket indexes are suitable for upsert use cases on huge datasets with a large number of file groups within partitions, relatively even data distribution across partitions, and can achieve relatively even data distribution on the bucket hash field column. It can have better upsert performance in these cases due to no index lookup involved as file groups are located based on a hashing mechanism, which is very fast. This is totally different from both simple and Bloom indexes, where an explicit index lookup step is involved during write. The buckets here has one-one mapping with the hudi file group and since the total number of buckets (defined by hoodie.bucket.index.num.buckets(default – 4)) is fixed here, it can potentially lead to skewed data (data distributed unevenly across buckets) and scalability (buckets can grow over time) issues over time. These issues will be addressed in the upcoming consistent hashing bucket index, which is going to be a special type of bucket index.

Use cases suitable for HBase index

HBase indexes are suitable for use cases where ingestion performance can’t be met using the other index types. These are mostly use cases with global indexes and large numbers of files and partitions. HBase indexes provide the best lookup time but come with large operational overheads if you’re already using HBase for other workloads.

For more information on choosing the right index and indexing strategies for common use cases, refer to Employing the right indexes for fast updates, deletes in Apache Hudi. As you have already seen, Hudi index performance depends heavily on the actual workload. We encourage you to evaluate different indexes for your workload and choose the one which is best suited for your use case.

Migration guidance

With Apache Hudi growing in popularity, one of the fundamental challenges is to efficiently migrate existing datasets to Apache Hudi. Apache Hudi maintains record-level metadata to perform core operations such as upserts and incremental pulls. To take advantage of Hudi’s upsert and incremental processing support, you need to add Hudi record-level metadata to your original dataset.

Using bulk_insert

The recommended way for data migration to Hudi is to perform a full rewrite using bulk_insert. There is no look-up for existing records in bulk_insert and writer optimizations like small file handling. Performing a one-time full rewrite is a good opportunity to write your data in Hudi format with all the metadata and indexes generated and also potentially control file size and sort data by record keys.

You can set the sort mode in a bulk_insert operation using the configuration hoodie.bulkinsert.sort.mode. bulk_insert offers the following sort modes to configure.

Sort Modes Description
NONE No sorting is done to the records. You can get the fastest performance (comparable to writing parquet files with spark) for initial load with this mode.
GLOBAL_SORT Use this to sort records globally across Spark partitions. It is less performant in initial load than other modes as it repartitions data by partition path and sorts it by record key within each partition. This helps in controlling the number of files generated in the target thereby controlling the target file size. Also, the generated target files will not have overlapping min-max values for record keys which will further help speed up index look-ups during upserts/deletes by pruning out files based on record key ranges in bloom index.
PARTITION_SORT Use this to sort records within Spark partitions. It is more performant for initial load than Global_Sort and if your Spark partitions in the data frame are already fairly mapped to the Hudi partitions (dataframe is already repartitioned by partition column), using this mode would be preferred as you can obtain records sorted by record key within each partition.

We recommend to use Global_Sort mode if you can handle the one-time cost. The default sort mode is changed from Global_Sort to None from EMR 6.9 (Hudi 0.12.1). During bulk_insert with Global_Sort, two configurations control the sizes of target files generated by Hudi.

Configuration Parameter Description Value
hoodie.bulkinsert.shuffle.parallelism The number of files generated from the bulk insert is determined by this configuration. The higher the parallelism, the more Spark tasks processing the data. Default value is 200. To control file size and achieve maximum performance (more parallelism), we recommend setting this to a value such that the files generated are equal to the hoodie.parquet.max.file.size. If you make parallelism really high, the max file size can’t be honored because the Spark tasks are working on smaller amounts of data.
hoodie.parquet.max.file.size Target size for Parquet files produced by Hudi write phases. Default value is 120 MB. If the Spark partitions generated with hoodie.bulkinsert.shuffle.parallelism are larger than this size, it splits it and generates multiple files to not exceed the max file size.

Let’s say we have a 100 GB Parquet source dataset and we’re bulk inserting with Global_Sort into a partitioned Hudi table with 10 evenly distributed Hudi partitions. We want to have the preferred target file size of 120 MB (default value for hoodie.parquet.max.file.size). The Hudi bulk insert shuffle parallelism should be calculated as follows:

  • The total data size in MB is 100 * 1024 = 102400 MB
  • hoodie.bulkinsert.shuffle.parallelism should be set to 102400/120 = ~854

Please note that in reality even with Global_Sort, each spark partition can be mapped to more than one hudi partition and this calculation should only be used as a rough estimate and can potentially end up with more files than the parallelism specified.

Using bootstrapping

For customers operating at scale on hundreds of terabytes or petabytes of data, migrating your datasets to start using Apache Hudi can be time-consuming. Apache Hudi provides a feature called bootstrap to help with this challenge.

The bootstrap operation contains two modes: METADATA_ONLY and FULL_RECORD.

FULL_RECORD is the same as full rewrite, where the original data is copied and rewritten with the metadata as Hudi files.

The METADATA_ONLY mode is the key to accelerating the migration progress. The conceptual idea is to decouple the record-level metadata from the actual data by writing only the metadata columns in the Hudi files generated while the data isn’t copied over and stays in its original location. This significantly reduces the amount of data written, thereby improving the time to migrate and get started with Hudi. However, this comes at the expense of read performance, which involves the overhead merging Hudi files and original data files to get the complete record. Therefore, you may not want to use it for frequently queried partitions.

You can pick and choose these modes at partition level. One common strategy is to tier your data. Use FULL_RECORD mode for a small set of hot partitions, which are accessed frequently, and METADATA_ONLY for a larger set of cold partitions.

Consider the following:

Catalog sync

Hudi supports syncing Hudi table partitions and columns to a catalog. On AWS, you can either use the AWS Glue Data Catalog or Hive metastore as the metadata store for your Hudi tables. To register and synchronize the metadata with your regular write pipeline, you need to either enable hive sync or run the hive_sync_tool or AwsGlueCatalogSyncTool command line utility.

We recommend enabling the hive sync feature with your regular write pipeline to make sure the catalog is up to date. If you don’t expect a new partition to be added or the schema changed as part of each batch, then we recommend enabling hoodie.datasource.meta_sync.condition.sync as well so that it allows Hudi to determine if hive sync is necessary for the job.

If you have frequent ingestion jobs and need to maximize ingestion performance, you can disable hive sync and run the hive_sync_tool asynchronously.

If you have the timestamp data type in your Hudi data, we recommend setting hoodie.datasource.hive_sync.support_timestamp to true to convert the int64 (timestamp_micros) to the hive type timestamp. Otherwise, you will see the values in bigint while querying data.

The following table summarizes the configurations related to hive_sync.

Configuration Parameter Description Value
hoodie.datasource.hive_sync.enable To register or sync the table to a Hive metastore or the AWS Glue Data Catalog. Default value is false. We recommend setting the value to true to make sure the catalog is up to date, and it needs to be enabled in every single write to avoid an out-of-sync metastore.
hoodie.datasource.hive_sync.mode This configuration sets the mode for HiveSynctool to connect to the Hive metastore server. For more information, refer to Sync modes. Valid values are hms, jdbc, and hiveql. If the mode isn’t specified, it defaults to jdbc. Hms and jdbc both talk to the underlying thrift server, but jdbc needs a separate jdbc driver. We recommend setting it to ‘hms’, which uses the Hive metastore client to sync Hudi tables using thrift APIs directly. This helps when using the AWS Glue Data Catalog because you don’t need to install Hive as an application on the EMR cluster (because it doesn’t need the server).
hoodie.datasource.hive_sync.database Name of the destination database that we should sync the Hudi table to. Default value is default. Set this to the database name of your catalog.
hoodie.datasource.hive_sync.table Name of the destination table that we should sync the Hudi table to. In Amazon EMR, the value is inferred from the Hudi table name. You can set this config if you need a different table name.
hoodie.datasource.hive_sync.support_timestamp To convert logical type TIMESTAMP_MICROS as hive type timestamp. Default value is false. Set it to true to convert to hive type timestamp.
hoodie.datasource.meta_sync.condition.sync If true, only sync on conditions like schema change or partition change. Default value is false.

Writing and reading Hudi datasets, and its integration with other AWS services

There are different ways you can write the data to Hudi using Amazon EMR, as explained in the following table.

Hudi Write Options Description
Spark DataSource

You can use this option to do upsert, insert, or bulk insert for the write operation.

Refer to Work with a Hudi dataset for an example of how to write data using DataSourceWrite.

Spark SQL You can easily write data to Hudi with SQL statements. It eliminates the need to write Scala or PySpark code and adopt a low-code paradigm.
Flink SQL, Flink DataStream API If you’re using Flink for real-time streaming ingestion, you can use the high-level Flink SQL or Flink DataStream API to write the data to Hudi.
DeltaStreamer DeltaStreamer is a self-managed tool that supports standard data sources like Apache Kafka, Amazon S3 events, DFS, AWS DMS, JDBC, and SQL sources, built-in checkpoint management, schema validations, as well as lightweight transformations. It can also operate in a continuous mode, in which a single self-contained Spark job can pull data from source, write it out to Hudi tables, and asynchronously perform cleaning, clustering, compactions, and catalog syncing, relying on Spark’s job pools for resource management. It’s easy to use and we recommend using it for all the streaming and ingestion use cases where a low-code approach is preferred. For more information, refer to Streaming Ingestion.
Spark structured streaming For use cases that require complex data transformations of the source data frame written in Spark DataFrame APIs or advanced SQL, we recommend the structured streaming sink. The streaming source can be used to obtain change feeds out of Hudi tables for streaming or incremental processing use cases.
Kafka Connect Sink If you standardize on the Apache Kafka Connect framework for your ingestion needs, you can also use the Hudi Connect Sink.

Refer to the following support matrix for query support on specific query engines. The following table explains the different options to read the Hudi dataset using Amazon EMR.

Hudi Read options Description
Spark DataSource You can read Hudi datasets directly from Amazon S3 using this option. The tables don’t need to be registered with Hive metastore or the AWS Glue Data Catalog for this option. You can use this option if your use case doesn’t require a metadata catalog. Refer to Work with a Hudi dataset for example of how to read data using DataSourceReadOptions.
Spark SQL You can query Hudi tables with DML/DDL statements. The tables need to be registered with Hive metastore or the AWS Glue Data Catalog for this option.
Flink SQL After the Flink Hudi tables have been registered to the Flink catalog, they can be queried using the Flink SQL.
PrestoDB/Trino The tables need to be registered with Hive metastore or the AWS Glue Data Catalog for this option. This engine is preferred for interactive queries. There is a new Trino connector in upcoming Hudi 0.13, and we recommend reading datasets through this connector when using Trino for performance benefits.
Hive The tables need to be registered with Hive metastore or the AWS Glue Data Catalog for this option.

Apache Hudi is well integrated with AWS services, and these integrations work when AWS Glue Data Catalog is used, with the exception of Athena, where you can also use a data source connector to an external Hive metastore. The following table summarizes the service integrations.

AWS Service Description
Amazon Athena

You can use Athena for a serverless option to query a Hudi dataset on Amazon S3. Currently, it supports snapshot queries and read-optimized queries, but not incremental queries.

For more details, refer to Using Athena to query Apache Hudi datasets.

Amazon Redshift Spectrum

You can use Amazon Redshift Spectrum to run analytic queries against tables in your Amazon S3 data lake with Hudi format.

Currently, it supports only CoW tables. For more details, refer to Creating external tables for data managed in Apache Hudi.

AWS Lake Formation AWS Lake Formation is used to secure data lakes and define fine-grained access control on the database and table level. Hudi is not currently supported with Amazon EMR Lake Formation integration.
AWS DMS You can use AWS DMS to ingest data from upstream relational databases to your S3 data lakes into an Hudi dataset. For more details, refer to Apply record level changes from relational databases to Amazon S3 data lake using Apache Hudi on Amazon EMR and AWS Database Migration Service.

Conclusion

This post covered best practices for configuring Apache Hudi data lakes using Amazon EMR. We discussed the key configurations in migrating your existing dataset to Hudi and shared guidance on how to determine the right options for different use cases when setting up Hudi tables.

The upcoming Part 2 of this series focuses on optimizations that can be done on this setup, along with monitoring using Amazon CloudWatch.


About the Authors

Suthan Phillips is a Big Data Architect for Amazon EMR at AWS. He works with customers to provide best practice and technical guidance and helps them achieve highly scalable, reliable and secure solutions for complex applications on Amazon EMR. In his spare time, he enjoys hiking and exploring the Pacific Northwest.

Dylan Qu is an AWS solutions architect responsible for providing architectural guidance across the full AWS stack with a focus on Data Analytics, AI/ML and DevOps.

Automate your Amazon QuickSight deployment with the new API-based account creation and deletion

Post Syndicated from Srikanth Baheti original https://aws.amazon.com/blogs/big-data/automate-your-amazon-quicksight-deployment-with-the-new-api-based-account-creation-and-deletion/

Amazon QuickSight is a fully managed, cloud-native business intelligence (BI) service that makes it easy to connect to your data, create interactive dashboards, and share these with tens of thousands of users, either within the QuickSight interface, or embedded in software as a service (SaaS) applications or web portals.

We’re excited to announce the availability of QuickSight APIs that enable administrators and developers to programmatically create and delete accounts with QuickSight Enterprise and Enterprise + Q editions. This allows developers and administrators to automate the deployment and teardown of QuickSight accounts in their organization at scale.

Feature overview

To create and delete QuickSight accounts programmatically, you can use the following APIs:

You need to have the correct AWS Identity and Access Management (IAM) permissions to sign up for QuickSight. For more information, see IAM policy examples for Amazon QuickSight. If your IAM policy includes both the Subscribe and CreateAccountSubscription actions, make sure that both actions are set to Allow. If either action is set to Deny, the Deny action prevails and your API call fails.

Create a QuickSight account

Use the CreateAccountSubscription API to create a QuickSight account:

POST /account/AwsAccountId HTTP/1.1 
Content-type: application/json 
{ 
    "AccountName": "string", 
    "ActiveDirectoryName": "string", 
    "AdminGroup": [ "string" ], 
    "AuthenticationMethod": "string", 
    "AuthorGroup": [ "string" ], 
    "ContactNumber": "string", 
    "DirectoryId": "string", 
    "Edition": "string", 
    "EmailAddress": "string", 
    "FirstName": "string", 
    "LastName": "string", 
    "NotificationEmail": "string", 
    "ReaderGroup": [ "string" ], 
    "Realm": "string" 
}

You can choose to subscribe to Enterprise or Enterprise + Q edition of QuickSight. You can choose the user authentication method: IAM only, IAM and QuickSight, or Active Directory. If you choose Active Directory, you need to provide an Active Directory name and an admin group associated with your Active Directory. Provide the notification email address that you want QuickSight to send notifications to regarding your QuickSight account or QuickSight subscription.

The following code shows the response for CreateAccountSubscription:

HTTP/1.1 Status
Content-type: application/json
{
   "RequestId": "string",
   "SignupResponse": { 
      "accountName": "string",
      "directoryType": "string",
      "IAMUser": boolean,
      "userLoginName": "string"
   }
}

Describe a QuickSight account

Use the DescribeAccountSubscription API to receive a description of a QuickSight account’s subscription:

GET /account/AwsAccountId HTTP/1.1

A successful API call returns an AccountInfo object that includes an account’s name, subscription status, authentication type, edition, and notification email address:

HTTP/1.1 Status 
Content-type: application/json 
{ 
    "AccountInfo": { 
        "AccountName": "string", 
        "AccountSubscriptionStatus": "string", 
        "AuthenticationType": "string", 
        "Edition": "string", 
        "NotificationEmail": "string" 
    },
    "RequestId": "string" 
}

Update and delete a QuickSight account

To avoid accidental deletion, a termination protection flag has been introduced. When a new QuickSight account is provisioned through the CreateAccountSubscription API or user interface, the termination protection flag is set to True by default. You can use the existing DescribeAccountSettings API to inquire the current value of the termination protection flag:

GET /account/AwsAccountId HTTP/1.1

The following code shows the response for DescribeAccountSettings:

HTTP/1.1 Status 
Content-type: application/json
{ 
    "AccountSettings": { 
        "AccountName": "string", 
        "Edition": "string", 
        "DefaultNamespace": "string", 
        "PublicSharingEnabled": "boolean", 
        "NotificationEmail": "string",
        "TerminationProtectionEnabled": "boolean"
    },
    "RequestId": "string" 
}

Use the UpdateAccountSettings API to disable the termination protection flag before proceeding with deleting the account. Optionally, this API can be used to update notification email:

PUT /accounts/{AwsAccountId}/settings HTTP/1.1 
Content-type: application/json 
{ 
    "AwsAccountId": "string", 
    "DefaultNamespace": "string", //Currently only supports default namespace: default 
    "TerminationProtectionEnabled": "boolean",
     "NotificationEmail": "string"
}

After the termination protection has been set to False, use the DeleteAccountSubscription API to delete the QuickSight account:

DELETE /account/{AwsAccountId} HTTP/1.1 
Content-type: application/json

Sample use case

Let’s consider a fictional company, AnyCompany, which owns healthcare facilities across the country. The central IT team of AnyCompany is responsible for setting up and maintaining the IT infrastructure and services for all the facilities in each state. Because AnyCompany is scaling their business, they want to automate the onboarding and maintenance of the IT infrastructure as much as possible. QuickSight is one of the services used by AnyCompany sites, and central IT needs to be able to set up and tear down QuickSight accounts automatically.

The following code shows their sample CreateAccountSubscription API call for setting up a QuickSight Enterprise account:

aws quicksight create-account-subscription --edition ENTERPRISE 
--authentication-method IAM_AND_QUICKSIGHT --aws-account-id XXXXXXXXXXXX 
--account-name quicksight-enterprise-reporting --notification-email XXXXXXXXXX 
--region us-west-2

The following screenshot shows the response.

They use the DescribeAccountSubscription API to monitor the status of new QuickSight account’s subscription:

aws quicksight describe-account-subscription --aws-account-id XXXXXXXXXXX

The following screenshot shows the response.

In case of a site closing event, the following code turns off the account protection using the UpdateAccountSettings API call:

 aws quicksight update-account-settings --aws-account-id XXXXXXXXXXXX 
 --default-namespace default --no-termination-protection-enabled 
 --region us-west-2

The following screenshot shows the response.

The following code deletes the QuickSight account using the DeleteAccountSubscription API call:

aws quicksight delete-account-subscription --aws-account-id XXXXXXXXXXXX 
--region us-west-2

The following screenshot shows the response.

Conclusion

Provisioning and deleting QuickSight accounts enables IT teams to automate their deployments of QuickSight instances across their organization. You can scale and keep up with the rapidly growing business, and minimize human error such as choosing the wrong authentication method. For more information, refer to Amazon QuickSight and What’s New in the Amazon QuickSight User Guide.

Try out QuickSight account provisioning and deletion APIs to automate your QuickSight deployments, and share your feedback and questions in the comments.


About the authors

Srikanth Baheti is a Specialized World Wide Sr. Solution Architect for Amazon QuickSight. He started his career as a consultant and worked for multiple private and government organizations. Later he worked for PerkinElmer Health and Sciences & eResearch Technology Inc, where he was responsible for designing and developing high traffic web applications, highly scalable and maintainable data pipelines for reporting platforms using AWS services and Serverless computing.

Raji Sivasubramaniam is a Sr. Solutions Architect at AWS, focusing on Analytics. Raji is specialized in architecting end-to-end Enterprise Data Management, Business Intelligence and Analytics solutions for Fortune 500 and Fortune 100 companies across the globe. She has in-depth experience in integrated healthcare data and analytics with wide variety of healthcare datasets including managed market, physician targeting and patient analytics.

Mayank Agarwal is a product manager for Amazon QuickSight, AWS’ cloud-native, fully managed BI service. He focuses on account administration, governance and developer experience. He started his career as an embedded software engineer developing handheld devices. Prior to QuickSight he was leading engineering teams at Credence ID, developing custom mobile embedded device and web solutions using AWS services that make biometric enrollment and identification fast, intuitive, and cost-effective for Government sector, healthcare and transaction security applications.

How GoDaddy built a data mesh to decentralize data ownership using AWS Lake Formation

Post Syndicated from Ankit Jhalaria original https://aws.amazon.com/blogs/big-data/how-godaddy-built-a-data-mesh-to-decentralize-data-ownership-using-aws-lake-formation/

This is a guest post co-written with Ankit Jhalaria from GoDaddy.

GoDaddy is empowering everyday entrepreneurs by providing all the help and tools to succeed online. With more than 20 million customers worldwide, GoDaddy is the place people come to name their idea, build a professional website, attract customers, and manage their work.

GoDaddy is a data-driven company, and getting meaningful insights from data helps them drive business decisions to delight their customers. In 2018, GoDaddy began a large infrastructure revamp and partnered with AWS to innovate faster than ever before to meet the needs of its customer growth around the world. As part of this revamp, the GoDaddy Data Platform team wanted to set the company up for long-term success by creating a well-defined data strategy and setting goals to decentralize the ownership and processing of data.

In this post, we discuss how GoDaddy uses AWS Lake Formation to simplify security management and data governance at scale, and enable data as a service (DaaS) supporting organization-wide data accessibility with cross-account data sharing using a data mesh architecture.

The challenge

In the vast ocean of data, deriving useful insights is an art. Prior to the AWS partnership, GoDaddy had a shared Hadoop cluster on premises that various teams used to create and share datasets with other analysts for collaboration. As the teams grew, copies of data started to grow in the Hadoop Distributed File System (HDFS). Several teams started to build tooling to manage this challenge independently, duplicating efforts. Managing permissions on these data assets became harder. Making data discoverable across a growing number of data catalogs and systems is something that had started to become a big challenge. Although the cost of storage these days is relatively inexpensive, when there are several copies of the same data asset available, it makes it harder for analysts to efficiently and reliably use the data available to them. Business analysts need robust pipelines on key datasets that they rely upon to make business decisions.

Solution overview

In GoDaddy’s data mesh hub and spoke model, a central data catalog contains information about all the data products that exist in the company. In AWS terminology, this is the AWS Glue Data Catalog. The data platform team provides APIs, SDKs, and Airflow Operators as components that different teams use to interact with the catalog. Activities such as updating the metastore to reflect a new partition for a given data product, and occasionally running MSCK repair operations, are all handled in the central governance account, and Lake Formation is used to secure access to the Data Catalog.

The data platform team introduced a layer of data governance that ensures best practices for building data products are followed throughout the company. We provide the tooling to support data engineers and business analysts while leaving the domain experts to run their data pipelines. With this approach, we have well-curated data products that are intuitive and easy to understand for our business analysts.

A data product refers to an entity that powers insights for analytical purposes. In simple terms, this could refer to an actual dataset pointing to a location in Amazon Simple Storage Service (Amazon S3). Data producers are responsible for the processing of data and creating new snapshots or partitions depending on the business needs. In some cases, data is refreshed every 24 hours, and other cases, every hour. Data consumers come to the data mesh to consume data, and permissions are managed in the central governance account through Lake Formation. Lake Formation uses AWS Resource Access Manager (AWS RAM) to send resource shares to different consumer accounts to be able to access the data from the central governance account. We go into details about this functionality later in the post.

The following diagram illustrates the solution architecture.

Solution architecture illustrated

Defining metadata with the central schema repository

Data is only useful if end-users can derive meaningful insights from it—otherwise, it’s just noise. As part of onboarding with the data platform, a data producer registers their schema with the data platform along with relevant metadata. This is reviewed by the data governance team that ensures best practices for creating datasets are followed. We have automated some of the most common data governance review items. This is also the place where producers define a contract about reliable data deliveries, often referred to as Service Level Objective (SLO). After a contract is in place, the data platform team’s background processes monitor and send out alerts when data producers fail to meet their contract or SLO.

When managing permissions with Lake Formation, you register the Amazon S3 location of different S3 buckets. Lake Formation uses AWS RAM to share the named resource.

When managing resources with AWS RAM, the central governance account creates AWS RAM shares. The data platform provides a custom AWS Service Catalog product to accept AWS RAM shares in consumer accounts.

Having consistent schemas with meaningful names and descriptions makes the discovery of datasets easy. Every data producer who is a domain expert is responsible for creating well-defined schemas that business users use to generate insights to make key business decisions. Data producers register their schemas along with additional metadata with the data lake repository. Metadata includes information about the team responsible for the dataset, such as their SLO contract, description, and contact information. This information gets checked into a Git repository where automation kicks in and validates the request to make sure it conforms to standards and best practices. We use AWS CloudFormation templates to provision resources. The following code is a sample of what the registration metadata looks like.

Sample code of what the registration metadata looks like

As part of the registration process, automation steps run in the background to take care of the following on behalf of the data producer:

  • Register the producer’s Amazon S3 location of the data with Lake Formation – This allows us to use Lake Formation for fine-grained access to control the table in the AWS Glue Data Catalog that refers to this location as well as to the underlying data.
  • Create the underlying AWS Glue database and table – Based on the schema specified by the data producer along with the metadata, we create the underlying AWS Glue database and table in the central governance account. As part of this, we also use table properties of AWS Glue to store additional metadata to use later for analysis.
  • Define the SLO contract – Any business-critical dataset needs to have a well-defined SLO contract. As part of dataset registration, the data producer defines a contract with a cron expression that gets used by the data platform to create an event rule in Amazon EventBridge. This rule triggers an AWS Lambda function to watch for deliveries of the data and triggers an alert to the data producer’s Slack channel if they breach the contract.

Consuming data from the data mesh catalog

When a data consumer belonging to a given line of business (LOB) identifies the data product that they’re interested in, they submit a request to the central governance team containing their AWS account ID that they use to query the data. The data platform provides a portal to discover datasets across the company. After the request is approved, automation runs to create an AWS RAM share with the consumer account covering the AWS Glue database and tables mapped to the data product registered in the AWS Glue Data Catalog of the central governance account.

The following screenshot shows an example of a resource share.

Example of a resource share

The consumer data lake admin needs to accept the AWS RAM share and create a resource link in Lake Formation to start querying the shared dataset within their account. We automated this process by building an AWS Service Catalog product that runs in the consumer’s account as a Lambda function that accepts shares on behalf of consumers.

When the resource linked datasets are available in the consumer account, the consumer data lake admin provides grants to IAM users and roles mapping to data consumers within the account. These consumers (application or user persona) can now query the datasets using AWS analytics services of their choice like Amazon Athena and Amazon EMR based on the access privileges granted by the consumer data lake admin.

Day-to-day operations and metrics

Managing permissions using Lake Formation is one part of the overall ecosystem. After permissions have been granted, data producers create new snapshots of the data at a certain cadence that can vary from every 15 minutes to a day. Data producers are integrated with the data platform APIs that informs the platform about any new refreshes of the data. The data platform automatically writes a 0-byte _SUCCESS file for every dataset that gets refreshed, and notifies the subscribed consumer account via an Amazon Simple Notification Service (Amazon SNS) topic in the central governance account. Consumers use this as a signal to trigger their data pipelines and processes to start processing newer version of the data utilizing an event-driven approach.

There are over 2,000 data products built on the GoDaddy data mesh on AWS. Every day, there are thousands of updates to the AWS Glue metastore in the central data governance account. There are hundreds of data producers generating data every hour in a wide array of S3 buckets, and thousands of data consumers consuming data across a wide array of tools, including Athena, Amazon EMR, and Tableau from different AWS accounts.

Business outcomes

With the move to AWS, GoDaddy’s Data Platform team laid the foundations to build a modern data platform that has increased our velocity of building data products and delighting our customers. The data platform has successfully transitioned from a monolithic platform to a model where ownership of data has been decentralized. We accelerated the data platform adoption to over 10 lines of business and over 300 teams globally, and are successfully managing multiple petabytes of data spread across hundreds of accounts to help our business derive insights faster.

Conclusion

GoDaddy’s hub and spoke data mesh architecture built using Lake Formation simplifies security management and data governance at scale, to deliver data as a service supporting company-wide data accessibility. Our data mesh manages multiple petabytes of data across hundreds of accounts, enabling decentralized ownership of well-defined datasets with automation in place, which helps the business discover data assets quicker and derive business insights faster.

This post illustrates the use of Lake Formation to build a data mesh architecture that enables a DaaS model for a modernized enterprise data platform. For more information, see Design a data mesh architecture using AWS Lake Formation and AWS Glue.


About the Authors

Ankit Jhalaria is the Director Of Engineering on the Data Platform at GoDaddy. He has over 10 years of experience working in big data technologies. Outside of work, Ankit loves hiking, playing board games, building IoT projects, and contributing to open-source projects.

Harsh Vardhan is an AWS Solutions Architect, specializing in Analytics. He has over 6 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Kyle Tedeschi is a Principal Solutions Architect at AWS. He enjoys helping customers innovate, transform, and become leaders in their respective domains. Outside of work, Kyle is an avid snowboarder, car enthusiast, and traveler.

Get started with data integration from Amazon S3 to Amazon Redshift using AWS Glue interactive sessions

Post Syndicated from Vikas Omer original https://aws.amazon.com/blogs/big-data/get-started-with-data-integration-from-amazon-s3-to-amazon-redshift-using-aws-glue-interactive-sessions/

Organizations are placing a high priority on data integration, especially to support analytics, machine learning (ML), business intelligence (BI), and application development initiatives. Data is growing exponentially and is generated by increasingly diverse data sources. Data integration becomes challenging when processing data at scale and the inherent heavy lifting associated with infrastructure required to manage it. This is one of the key reasons why organizations are constantly looking for easy-to-use and low maintenance data integration solutions to move data from one location to another or to consolidate their business data from several sources into a centralized location to make strategic business decisions.

Most organizations use Spark for their big data processing needs. If you’re looking to simplify data integration, and don’t want the hassle of spinning up servers, managing resources, or setting up Spark clusters, we have the solution for you.

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, ML, and application development. AWS Glue provides both visual and code-based interfaces to make data integration simple and accessible for everyone.

If you prefer a code-based experience and want to interactively author data integration jobs, we recommend interactive sessions. Interactive sessions is a recently launched AWS Glue feature that allows you to interactively develop AWS Glue processes, run and test each step, and view the results.

There are different options to use interactive sessions. You can create and work with interactive sessions through the AWS Command Line Interface (AWS CLI) and API. You can also use Jupyter-compatible notebooks to visually author and test your notebook scripts. Interactive sessions provide a Jupyter kernel that integrates almost anywhere that Jupyter does, including integrating with IDEs such as PyCharm, IntelliJ, and Visual Studio Code. This enables you to author code in your local environment and run it seamlessly on the interactive session backend. You can also start a notebook through AWS Glue Studio; all the configuration steps are done for you so that you can explore your data and start developing your job script after only a few seconds. When the code is ready, you can configure, schedule, and monitor job notebooks as AWS Glue jobs.

If you haven’t tried AWS Glue interactive sessions before, this post is highly recommended. We work through a simple scenario where you might need to incrementally load data from Amazon Simple Storage Service (Amazon S3) into Amazon Redshift or transform and enrich your data before loading into Amazon Redshift. In this post, we use interactive sessions within an AWS Glue Studio notebook to load the NYC Taxi dataset into an Amazon Redshift Serverless cluster, query the loaded dataset, save our Jupyter notebook as a job, and schedule it to run using a cron expression. Let’s get started.

Solution overview

We walk you through the following steps:

  1. Set up an AWS Glue Jupyter notebook with interactive sessions.
  2. Use notebook’s magics, including AWS Glue connection and bookmarks.
  3. Read data from Amazon S3, and transform and load it into Redshift Serverless.
  4. Save the notebook as an AWS Glue job and schedule it to run.

Prerequisites

For this walkthrough, we must complete the following prerequisites:

  1. Upload Yellow Taxi Trip Records data and the taxi zone lookup table datasets into Amazon S3. Steps to do that are listed in the next section.
  2. Prepare the necessary AWS Identity and Access Management (IAM) policies and roles to work with AWS Glue Studio Jupyter notebooks, interactive sessions, and AWS Glue.
  3. Create the AWS Glue connection for Redshift Serverless.

Upload datasets into Amazon S3

Download Yellow Taxi Trip Records data and taxi zone lookup table data to your local environment. For this post, we download the January 2022 data for yellow taxi trip records data in Parquet format. The taxi zone lookup data is in CSV format. You can also download the data dictionary for the trip record dataset.

  1. On the Amazon S3 console, create a bucket called my-first-aws-glue-is-project-<random number> in the us-east-1 Region to store the data.S3 bucket names must be unique across all AWS accounts in all the Regions.
  2. Create folders nyc_yellow_taxi and taxi_zone_lookup in the bucket you just created and upload the files you downloaded.
    Your folder structures should look like the following screenshots.s3 yellow taxi datas3 lookup data

Prepare IAM policies and role

Let’s prepare the necessary IAM policies and role to work with AWS Glue Studio Jupyter notebooks and interactive sessions. To get started with notebooks in AWS Glue Studio, refer to Getting started with notebooks in AWS Glue Studio.

Create IAM policies for the AWS Glue notebook role

Create the policy AWSGlueInteractiveSessionPassRolePolicy with the following permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Action": "iam:PassRole",
        "Resource":"arn:aws:iam::<AWS account ID>:role/AWSGlueServiceRole-GlueIS"
        }
    ]
}

This policy allows the AWS Glue notebook role to pass to interactive sessions so that the same role can be used in both places. Note that AWSGlueServiceRole-GlueIS is the role that we create for the AWS Glue Studio Jupyter notebook in a later step. Next, create the policy AmazonS3Access-MyFirstGlueISProject with the following permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<your s3 bucket name>",
                "arn:aws:s3:::<your s3 bucket name>/*"
            ]
        }
    ]
}

This policy allows the AWS Glue notebook role to access data in the S3 bucket.

Create an IAM role for the AWS Glue notebook

Create a new AWS Glue role called AWSGlueServiceRole-GlueIS with the following policies attached to it:

Create the AWS Glue connection for Redshift Serverless

Now we’re ready to configure a Redshift Serverless security group to connect with AWS Glue components.

  1. On the Redshift Serverless console, open the workgroup you’re using.
    You can find all the namespaces and workgroups on the Redshift Serverless dashboard.
  2. Under Data access, choose Network and security.
  3. Choose the link for the Redshift Serverless VPC security group.redshift serverless vpc security groupYou’re redirected to the Amazon Elastic Compute Cloud (Amazon EC2) console.
  4. In the Redshift Serverless security group details, under Inbound rules, choose Edit inbound rules.
  5. Add a self-referencing rule to allow AWS Glue components to communicate:
    1. For Type, choose All TCP.
    2. For Protocol, choose TCP.
    3. For Port range, include all ports.
    4. For Source, use the same security group as the group ID.
      redshift inbound security group
  6. Similarly, add the following outbound rules:
    1. A self-referencing rule with Type as All TCP, Protocol as TCP, Port range including all ports, and Destination as the same security group as the group ID.
    2. An HTTPS rule for Amazon S3 access. The s3-prefix-list-id value is required in the security group rule to allow traffic from the VPC to the Amazon S3 VPC endpoint.
      redshift outbound security group

If you don’t have an Amazon S3 VPC endpoint, you can create one on the Amazon Virtual Private Cloud (Amazon VPC) console.

s3 vpc endpoint

You can check the value for s3-prefix-list-id on the Managed prefix lists page on the Amazon VPC console.

s3 prefix list

Next, go to the Connectors page on AWS Glue Studio and create a new JDBC connection called redshiftServerless to your Redshift Serverless cluster (unless one already exists). You can find the Redshift Serverless endpoint details under your workgroup’s General Information section. The connection setting looks like the following screenshot.

redshift serverless connection page

Write interactive code on an AWS Glue Studio Jupyter notebook powered by interactive sessions

Now you can get started with writing interactive code using AWS Glue Studio Jupyter notebook powered by interactive sessions. Note that it’s a good practice to keep saving the notebook at regular intervals while you work through it.

  1. On the AWS Glue Studio console, create a new job.
  2. Select Jupyter Notebook and select Create a new notebook from scratch.
  3. Choose Create.
    glue interactive session create notebook
  4. For Job name, enter a name (for example, myFirstGlueISProject).
  5. For IAM Role, choose the role you created (AWSGlueServiceRole-GlueIS).
  6. Choose Start notebook job.
    glue interactive session notebook setupAfter the notebook is initialized, you can see some of the available magics and a cell with boilerplate code. To view all the magics of interactive sessions, run %help in a cell to print a full list. With the exception of %%sql, running a cell of only magics doesn’t start a session, but sets the configuration for the session that starts when you run your first cell of code.glue interactive session jupyter notebook initializationFor this post, we configure AWS Glue with version 3.0, three G.1X workers, idle timeout, and an Amazon Redshift connection with the help of available magics.
  7. Let’s enter the following magics into our first cell and run it:
    %glue_version 3.0
    %number_of_workers 3
    %worker_type G.1X
    %idle_timeout 60
    %connections redshiftServerless

    We get the following response:

    Welcome to the Glue Interactive Sessions Kernel
    For more information on available magic commands, please type %help in any new cell.
    
    Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
    Installed kernel version: 0.35 
    Setting Glue version to: 3.0
    Previous number of workers: 5
    Setting new number of workers to: 3
    Previous worker type: G.1X
    Setting new worker type to: G.1X
    Current idle_timeout is 2880 minutes.
    idle_timeout has been set to 60 minutes.
    Connections to be included:
    redshiftServerless

  8. Let’s run our first code cell (boilerplate code) to start an interactive notebook session within a few seconds:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
      
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)

    We get the following response:

    Authenticating with environment variables and user-defined glue_role_arn:arn:aws:iam::xxxxxxxxxxxx:role/AWSGlueServiceRole-GlueIS
    Attempting to use existing AssumeRole session credentials.
    Trying to create a Glue session for the kernel.
    Worker Type: G.1X
    Number of Workers: 3
    Session ID: 7c9eadb1-9f9b-424f-9fba-d0abc57e610d
    Applying the following default arguments:
    --glue_kernel_version 0.35
    --enable-glue-datacatalog true
    --job-bookmark-option job-bookmark-enable
    Waiting for session 7c9eadb1-9f9b-424f-9fba-d0abc57e610d to get into ready status...
    Session 7c9eadb1-9f9b-424f-9fba-d0abc57e610d has been created

  9. Next, read the NYC yellow taxi data from the S3 bucket into an AWS Glue dynamic frame:
    nyc_taxi_trip_input_dyf = glueContext.create_dynamic_frame.from_options(
        connection_type = "s3", 
        connection_options = {
            "paths": ["s3://<your-s3-bucket-name>/nyc_yellow_taxi/"]
        }, 
        format = "parquet",
        transformation_ctx = "nyc_taxi_trip_input_dyf"
    )

    Let’s count the number of rows, look at the schema and a few rows of the dataset.

  10. Count the rows with the following code:
    nyc_taxi_trip_input_df = nyc_taxi_trip_input_dyf.toDF()
    nyc_taxi_trip_input_df.count()

    We get the following response:

    2463931

  11. View the schema with the following code:
    nyc_taxi_trip_input_df.printSchema()

    We get the following response:

    root
     |-- VendorID: long (nullable = true)
     |-- tpep_pickup_datetime: timestamp (nullable = true)
     |-- tpep_dropoff_datetime: timestamp (nullable = true)
     |-- passenger_count: double (nullable = true)
     |-- trip_distance: double (nullable = true)
     |-- RatecodeID: double (nullable = true)
     |-- store_and_fwd_flag: string (nullable = true)
     |-- PULocationID: long (nullable = true)
     |-- DOLocationID: long (nullable = true)
     |-- payment_type: long (nullable = true)
     |-- fare_amount: double (nullable = true)
     |-- extra: double (nullable = true)
     |-- mta_tax: double (nullable = true)
     |-- tip_amount: double (nullable = true)
     |-- tolls_amount: double (nullable = true)
     |-- improvement_surcharge: double (nullable = true)
     |-- total_amount: double (nullable = true)
     |-- congestion_surcharge: double (nullable = true)
     |-- airport_fee: double (nullable = true)

  12. View a few rows of the dataset with the following code:
    nyc_taxi_trip_input_df.show(5)

    We get the following response:

    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    |VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    |       2| 2022-01-18 15:04:43|  2022-01-18 15:12:51|            1.0|         1.13|       1.0|                 N|         141|         229|           2|        7.0|  0.0|    0.5|       0.0|         0.0|                  0.3|        10.3|                 2.5|        0.0|
    |       2| 2022-01-18 15:03:28|  2022-01-18 15:15:52|            2.0|         1.36|       1.0|                 N|         237|         142|           1|        9.5|  0.0|    0.5|      2.56|         0.0|                  0.3|       15.36|                 2.5|        0.0|
    |       1| 2022-01-06 17:49:22|  2022-01-06 17:57:03|            1.0|          1.1|       1.0|                 N|         161|         229|           2|        7.0|  3.5|    0.5|       0.0|         0.0|                  0.3|        11.3|                 2.5|        0.0|
    |       2| 2022-01-09 20:00:55|  2022-01-09 20:04:14|            1.0|         0.56|       1.0|                 N|         230|         230|           1|        4.5|  0.5|    0.5|      1.66|         0.0|                  0.3|        9.96|                 2.5|        0.0|
    |       2| 2022-01-24 16:16:53|  2022-01-24 16:31:36|            1.0|         2.02|       1.0|                 N|         163|         234|           1|       10.5|  1.0|    0.5|       3.7|         0.0|                  0.3|        18.5|                 2.5|        0.0|
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    only showing top 5 rows

  13. Now, read the taxi zone lookup data from the S3 bucket into an AWS Glue dynamic frame:
    nyc_taxi_zone_lookup_dyf = glueContext.create_dynamic_frame.from_options(
        connection_type = "s3", 
        connection_options = {
            "paths": ["s3://<your-s3-bucket-name>/taxi_zone_lookup/"]
        }, 
        format = "csv",
        format_options= {
            'withHeader': True
        },
        transformation_ctx = "nyc_taxi_zone_lookup_dyf"
    )

    Let’s count the number of rows, look at the schema and a few rows of the dataset.

  14. Count the rows with the following code:
    nyc_taxi_zone_lookup_df = nyc_taxi_zone_lookup_dyf.toDF()
    nyc_taxi_zone_lookup_df.count()

    We get the following response:

    265

  15. View the schema with the following code:
    nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()

    We get the following response:

    root
     |-- LocationID: string (nullable = true)
     |-- Borough: string (nullable = true)
     |-- Zone: string (nullable = true)
     |-- service_zone: string (nullable = true)

  16. View a few rows with the following code:
    nyc_taxi_zone_lookup_df.show(5)

    We get the following response:

    +----------+-------------+--------------------+------------+
    |LocationID|      Borough|                Zone|service_zone|
    +----------+-------------+--------------------+------------+
    |         1|          EWR|      Newark Airport|         EWR|
    |         2|       Queens|         Jamaica Bay|   Boro Zone|
    |         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
    |         4|    Manhattan|       Alphabet City| Yellow Zone|
    |         5|Staten Island|       Arden Heights|   Boro Zone|
    +----------+-------------+--------------------+------------+
    only showing top 5 rows

  17. Based on the data dictionary, lets recalibrate the data types of attributes in dynamic frames corresponding to both dynamic frames:
    nyc_taxi_trip_apply_mapping_dyf = ApplyMapping.apply(
        frame = nyc_taxi_trip_input_dyf, 
        mappings = [
            ("VendorID","Long","VendorID","Integer"), 
            ("tpep_pickup_datetime","Timestamp","tpep_pickup_datetime","Timestamp"), 
            ("tpep_dropoff_datetime","Timestamp","tpep_dropoff_datetime","Timestamp"), 
            ("passenger_count","Double","passenger_count","Integer"), 
            ("trip_distance","Double","trip_distance","Double"),
            ("RatecodeID","Double","RatecodeID","Integer"), 
            ("store_and_fwd_flag","String","store_and_fwd_flag","String"), 
            ("PULocationID","Long","PULocationID","Integer"), 
            ("DOLocationID","Long","DOLocationID","Integer"),
            ("payment_type","Long","payment_type","Integer"), 
            ("fare_amount","Double","fare_amount","Double"),
            ("extra","Double","extra","Double"), 
            ("mta_tax","Double","mta_tax","Double"),
            ("tip_amount","Double","tip_amount","Double"), 
            ("tolls_amount","Double","tolls_amount","Double"), 
            ("improvement_surcharge","Double","improvement_surcharge","Double"), 
            ("total_amount","Double","total_amount","Double"), 
            ("congestion_surcharge","Double","congestion_surcharge","Double"), 
            ("airport_fee","Double","airport_fee","Double")
        ],
        transformation_ctx = "nyc_taxi_trip_apply_mapping_dyf"
    )

    nyc_taxi_zone_lookup_apply_mapping_dyf = ApplyMapping.apply(
        frame = nyc_taxi_zone_lookup_dyf, 
        mappings = [ 
            ("LocationID","String","LocationID","Integer"), 
            ("Borough","String","Borough","String"), 
            ("Zone","String","Zone","String"), 
            ("service_zone","String", "service_zone","String")
        ],
        transformation_ctx = "nyc_taxi_zone_lookup_apply_mapping_dyf"
    )

  18. Now let’s check their schema:
    nyc_taxi_trip_apply_mapping_dyf.toDF().printSchema()

    We get the following response:

    root
     |-- VendorID: integer (nullable = true)
     |-- tpep_pickup_datetime: timestamp (nullable = true)
     |-- tpep_dropoff_datetime: timestamp (nullable = true)
     |-- passenger_count: integer (nullable = true)
     |-- trip_distance: double (nullable = true)
     |-- RatecodeID: integer (nullable = true)
     |-- store_and_fwd_flag: string (nullable = true)
     |-- PULocationID: integer (nullable = true)
     |-- DOLocationID: integer (nullable = true)
     |-- payment_type: integer (nullable = true)
     |-- fare_amount: double (nullable = true)
     |-- extra: double (nullable = true)
     |-- mta_tax: double (nullable = true)
     |-- tip_amount: double (nullable = true)
     |-- tolls_amount: double (nullable = true)
     |-- improvement_surcharge: double (nullable = true)
     |-- total_amount: double (nullable = true)
     |-- congestion_surcharge: double (nullable = true)
     |-- airport_fee: double (nullable = true)

    nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()

    We get the following response:

    root
     |-- LocationID: integer (nullable = true)
     |-- Borough: string (nullable = true)
     |-- Zone: string (nullable = true)
     |-- service_zone: string (nullable = true)

  19. Let’s add the column trip_duration to calculate the duration of each trip in minutes to the taxi trip dynamic frame:
    # Function to calculate trip duration in minutes
    def trip_duration(start_timestamp,end_timestamp):
        minutes_diff = (end_timestamp - start_timestamp).total_seconds() / 60.0
        return(minutes_diff)

    # Transformation function for each record
    def transformRecord(rec):
        rec["trip_duration"] = trip_duration(rec["tpep_pickup_datetime"], rec["tpep_dropoff_datetime"])
        return rec
    nyc_taxi_trip_final_dyf = Map.apply(
        frame = nyc_taxi_trip_apply_mapping_dyf, 
        f = transformRecord, 
        transformation_ctx = "nyc_taxi_trip_final_dyf"
    )

    Let’s count the number of rows, look at the schema and a few rows of the dataset after applying the above transformation.

  20. Get a record count with the following code:
    nyc_taxi_trip_final_df = nyc_taxi_trip_final_dyf.toDF()
    nyc_taxi_trip_final_df.count()

    We get the following response:

    2463931

  21. View the schema with the following code:
    nyc_taxi_trip_final_df.printSchema()

    We get the following response:

    root
     |-- extra: double (nullable = true)
     |-- tpep_dropoff_datetime: timestamp (nullable = true)
     |-- trip_duration: double (nullable = true)
     |-- trip_distance: double (nullable = true)
     |-- mta_tax: double (nullable = true)
     |-- improvement_surcharge: double (nullable = true)
     |-- DOLocationID: integer (nullable = true)
     |-- congestion_surcharge: double (nullable = true)
     |-- total_amount: double (nullable = true)
     |-- airport_fee: double (nullable = true)
     |-- payment_type: integer (nullable = true)
     |-- fare_amount: double (nullable = true)
     |-- RatecodeID: integer (nullable = true)
     |-- tpep_pickup_datetime: timestamp (nullable = true)
     |-- VendorID: integer (nullable = true)
     |-- PULocationID: integer (nullable = true)
     |-- tip_amount: double (nullable = true)
     |-- tolls_amount: double (nullable = true)
     |-- store_and_fwd_flag: string (nullable = true)
     |-- passenger_count: integer (nullable = true)

  22. View a few rows with the following code:
    nyc_taxi_trip_final_df.show(5)

    We get the following response:

    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    |extra|tpep_dropoff_datetime|     trip_duration|trip_distance|mta_tax|improvement_surcharge|DOLocationID|congestion_surcharge|total_amount|airport_fee|payment_type|fare_amount|RatecodeID|tpep_pickup_datetime|VendorID|PULocationID|tip_amount|tolls_amount|store_and_fwd_flag|passenger_count|
    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    |  0.0|  2022-01-18 15:12:51| 8.133333333333333|         1.13|    0.5|                  0.3|         229|                 2.5|        10.3|        0.0|           2|        7.0|         1| 2022-01-18 15:04:43|       2|         141|       0.0|         0.0|                 N|              1|
    |  0.0|  2022-01-18 15:15:52|              12.4|         1.36|    0.5|                  0.3|         142|                 2.5|       15.36|        0.0|           1|        9.5|         1| 2022-01-18 15:03:28|       2|         237|      2.56|         0.0|                 N|              2|
    |  3.5|  2022-01-06 17:57:03| 7.683333333333334|          1.1|    0.5|                  0.3|         229|                 2.5|        11.3|        0.0|           2|        7.0|         1| 2022-01-06 17:49:22|       1|         161|       0.0|         0.0|                 N|              1|
    |  0.5|  2022-01-09 20:04:14| 3.316666666666667|         0.56|    0.5|                  0.3|         230|                 2.5|        9.96|        0.0|           1|        4.5|         1| 2022-01-09 20:00:55|       2|         230|      1.66|         0.0|                 N|              1|
    |  1.0|  2022-01-24 16:31:36|14.716666666666667|         2.02|    0.5|                  0.3|         234|                 2.5|        18.5|        0.0|           1|       10.5|         1| 2022-01-24 16:16:53|       2|         163|       3.7|         0.0|                 N|              1|
    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    only showing top 5 rows

  23. Next, load both the dynamic frames into our Amazon Redshift Serverless cluster:
    nyc_taxi_trip_sink_dyf = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame = nyc_taxi_trip_final_dyf, 
        catalog_connection = "redshiftServerless", 
        connection_options =  {"dbtable": "public.f_nyc_yellow_taxi_trip","database": "dev"}, 
        redshift_tmp_dir = "s3://aws-glue-assets-<AWS-account-ID>-us-east-1/temporary/", 
        transformation_ctx = "nyc_taxi_trip_sink_dyf"
    )

    nyc_taxi_zone_lookup_sink_dyf = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame = nyc_taxi_zone_lookup_apply_mapping_dyf, 
        catalog_connection = "redshiftServerless", 
        connection_options = {"dbtable": "public.d_nyc_taxi_zone_lookup", "database": "dev"}, 
        redshift_tmp_dir = "s3://aws-glue-assets-<AWS-account-ID>-us-east-1/temporary/", 
        transformation_ctx = "nyc_taxi_zone_lookup_sink_dyf"
    )

    Now let’s validate the data loaded in Amazon Redshift Serverless cluster by running a few queries in Amazon Redshift query editor v2. You can also use your preferred query editor.

  24. First, we count the number of records and select a few rows in both the target tables (f_nyc_yellow_taxi_trip and d_nyc_taxi_zone_lookup):
    SELECT 'f_nyc_yellow_taxi_trip' AS table_name, COUNT(1) FROM "public"."f_nyc_yellow_taxi_trip"
    UNION ALL
    SELECT 'd_nyc_taxi_zone_lookup' AS table_name, COUNT(1) FROM "public"."d_nyc_taxi_zone_lookup";

    redshift table record count query output

    The number of records in f_nyc_yellow_taxi_trip (2,463,931) and d_nyc_taxi_zone_lookup (265) match the number of records in our input dynamic frame. This validates that all records from files in Amazon S3 have been successfully loaded into Amazon Redshift.

    You can view some of the records for each table with the following commands:

    SELECT * FROM public.f_nyc_yellow_taxi_trip LIMIT 10;

    redshift fact data select query

    SELECT * FROM public.d_nyc_taxi_zone_lookup LIMIT 10;

    redshift lookup data select query

  25. One of the insights that we want to generate from the datasets is to get the top five routes with their trip duration. Let’s run the SQL for that on Amazon Redshift:
    SELECT 
        CASE WHEN putzl.zone >= dotzl.zone 
            THEN putzl.zone || ' - ' || dotzl.zone 
            ELSE  dotzl.zone || ' - ' || putzl.zone 
        END AS "Route",
        COUNT(1) AS "Frequency",
        ROUND(SUM(trip_duration),1) AS "Total Trip Duration (mins)"
    FROM 
        public.f_nyc_yellow_taxi_trip ytt
    INNER JOIN 
        public.d_nyc_taxi_zone_lookup putzl ON ytt.pulocationid = putzl.locationid
    INNER JOIN 
        public.d_nyc_taxi_zone_lookup dotzl ON ytt.dolocationid = dotzl.locationid
    GROUP BY 
        "Route"
    ORDER BY 
        "Frequency" DESC, "Total Trip Duration (mins)" DESC
    LIMIT 5;

    redshift top 5 route query

Transform the notebook into an AWS Glue job and schedule it

Now that we have authored the code and tested its functionality, let’s save it as a job and schedule it.

Let’s first enable job bookmarks. Job bookmarks help AWS Glue maintain state information and prevent the reprocessing of old data. With job bookmarks, you can process new data when rerunning on a scheduled interval.

  1. Add the following magic command after the first cell that contains other magic commands initialized during authoring the code:
    %%configure
    {
        "--job-bookmark-option": "job-bookmark-enable"
    }

    To initialize job bookmarks, we run the following code with the name of the job as the default argument (myFirstGlueISProject for this post). Job bookmarks store the states for a job. You should always have job.init() in the beginning of the script and the job.commit() at the end of the script. These two functions are used to initialize the bookmark service and update the state change to the service. Bookmarks won’t work without calling them.

  2. Add the following piece of code after the boilerplate code:
    params = []
    if '--JOB_NAME' in sys.argv:
        params.append('JOB_NAME')
    args = getResolvedOptions(sys.argv, params)
    if 'JOB_NAME' in args:
        jobname = args['JOB_NAME']
    else:
        jobname = "myFirstGlueISProject"
    job.init(jobname, args)

  3. Then comment out all the lines of code that were authored to verify the desired outcome and aren’t necessary for the job to deliver its purpose:
    #nyc_taxi_trip_input_df = nyc_taxi_trip_input_dyf.toDF()
    #nyc_taxi_trip_input_df.count()
    #nyc_taxi_trip_input_df.printSchema()
    #nyc_taxi_trip_input_df.show(5)
    
    #nyc_taxi_zone_lookup_df = nyc_taxi_zone_lookup_dyf.toDF()
    #nyc_taxi_zone_lookup_df.count()
    #nyc_taxi_zone_lookup_df.printSchema()
    #nyc_taxi_zone_lookup_df.show(5)
    
    #nyc_taxi_trip_apply_mapping_dyf.toDF().printSchema()
    #nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()
    
    #nyc_taxi_trip_final_df = nyc_taxi_trip_final_dyf.toDF()
    #nyc_taxi_trip_final_df.count()
    #nyc_taxi_trip_final_df.printSchema()
    #nyc_taxi_trip_final_df.show(5)

  4. Save the notebook.
    glue interactive session save job
    You can check the corresponding script on the Script tab.glue interactive session script tabNote that job.commit() is automatically added at the end of the script.Let’s run the notebook as a job.
  5. First, truncate f_nyc_yellow_taxi_trip and d_nyc_taxi_zone_lookup tables in Amazon Redshift using the query editor v2 so that we don’t have duplicates in both the tables:
    truncate "public"."f_nyc_yellow_taxi_trip";
    truncate "public"."d_nyc_taxi_zone_lookup";

  6. Choose Run to run the job.
    glue interactive session run jobYou can check its status on the Runs tab.glue interactive session job run statusThe job completed in less than 5 minutes with G1.x 3 DPUs.
  7. Let’s check the count of records in f_nyc_yellow_taxi_trip and d_nyc_taxi_zone_lookup tables in Amazon Redshift:
    SELECT 'f_nyc_yellow_taxi_trip' AS table_name, COUNT(1) FROM "public"."f_nyc_yellow_taxi_trip"
    UNION ALL
    SELECT 'd_nyc_taxi_zone_lookup' AS table_name, COUNT(1) FROM "public"."d_nyc_taxi_zone_lookup";

    redshift count query output

    With job bookmarks enabled, even if you run the job again with no new files in corresponding folders in the S3 bucket, it doesn’t process the same files again. The following screenshot shows a subsequent job run in my environment, which completed in less than 2 minutes because there were no new files to process.

    glue interactive session job re-run

    Now let’s schedule the job.

  8. On the Schedules tab, choose Create schedule.
    glue interactive session create schedule
  9. For Name¸ enter a name (for example, myFirstGlueISProject-testSchedule).
  10. For Frequency, choose Custom.
  11. Enter a cron expression so the job runs every Monday at 6:00 AM.
  12. Add an optional description.
  13. Choose Create schedule.
    glue interactive session add schedule

The schedule has been saved and activated. You can edit, pause, resume, or delete the schedule from the Actions menu.

glue interactive session schedule action

Clean up

To avoid incurring future charges, delete the AWS resources you created.

  • Delete the AWS Glue job (myFirstGlueISProject for this post).
  • Delete the Amazon S3 objects and bucket (my-first-aws-glue-is-project-<random number> for this post).
  • Delete the AWS IAM policies and roles (AWSGlueInteractiveSessionPassRolePolicy, AmazonS3Access-MyFirstGlueISProject and AWSGlueServiceRole-GlueIS).
  • Delete the Amazon Redshift tables (f_nyc_yellow_taxi_trip and d_nyc_taxi_zone_lookup).
  • Delete the AWS Glue JDBC Connection (redshiftServerless).
  • Also delete the self-referencing Redshift Serverless security group, and Amazon S3 endpoint (if you created it while following the steps for this post).

Conclusion

In this post, we demonstrated how to do the following:

  • Set up an AWS Glue Jupyter notebook with interactive sessions
  • Use the notebook’s magics, including the AWS Glue connection onboarding and bookmarks
  • Read the data from Amazon S3, and transform and load it into Amazon Redshift Serverless
  • Configure magics to enable job bookmarks, save the notebook as an AWS Glue job, and schedule it using a cron expression

The goal of this post is to give you step-by-step fundamentals to get you going with AWS Glue Studio Jupyter notebooks and interactive sessions. You can set up an AWS Glue Jupyter notebook in minutes, start an interactive session in seconds, and greatly improve the development experience with AWS Glue jobs. Interactive sessions have a 1-minute billing minimum with cost control features that reduce the cost of developing data preparation applications. You can build and test applications from the environment of your choice, even on your local environment, using the interactive sessions backend.

Interactive sessions provide a faster, cheaper, and more flexible way to build and run data preparation and analytics applications. To learn more about interactive sessions, refer to Job development (interactive sessions), and start exploring a whole new development experience with AWS Glue. Additionally, check out the following posts to walk through more examples of using interactive sessions with different options:


About the Authors

Vikas blog picVikas Omer is a principal analytics specialist solutions architect at Amazon Web Services. Vikas has a strong background in analytics, customer experience management (CEM), and data monetization, with over 13 years of experience in the industry globally. With six AWS Certifications, including Analytics Specialty, he is a trusted analytics advocate to AWS customers and partners. He loves traveling, meeting customers, and helping them become successful in what they do.

Nori profile picNoritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys collaborating with different teams to deliver results like this post. In his spare time, he enjoys playing video games with his family.

Gal blog picGal Heyne is a Product Manager for AWS Glue and has over 15 years of experience as a product manager, data engineer and data architect. She is passionate about developing a deep understanding of customers’ business needs and collaborating with engineers to design elegant, powerful and easy to use data products. Gal has a Master’s degree in Data Science from UC Berkeley and she enjoys traveling, playing board games and going to music concerts.

Introducing ACK controller for Amazon EMR on EKS

Post Syndicated from Peter Dalbhanjan original https://aws.amazon.com/blogs/big-data/introducing-ack-controller-for-amazon-emr-on-eks/

AWS Controllers for Kubernetes (ACK) was announced in August, 2020, and now supports 14 AWS service controllers as generally available with an additional 12 in preview. The vision behind this initiative was simple: allow Kubernetes users to use the Kubernetes API to manage the lifecycle of AWS resources such as Amazon Simple Storage Service (Amazon S3) buckets or Amazon Relational Database Service (Amazon RDS) DB instances. For example, you can define an S3 bucket as a custom resource, create this bucket as part of your application deployment, and delete it when your application is retired.

Amazon EMR on EKS is a deployment option for EMR that allows organizations to run Apache Spark on Amazon Elastic Kubernetes Service (Amazon EKS) clusters. With EMR on EKS, the Spark jobs run using the Amazon EMR runtime for Apache Spark. This increases the performance of your Spark jobs so that they run faster and cost less than open source Apache Spark. Also, you can run Amazon EMR-based Apache Spark applications with other types of applications on the same EKS cluster to improve resource utilization and simplify infrastructure management.

Today, we’re excited to announce the ACK controller for Amazon EMR on EKS is generally available. Customers have told us that they like the declarative way of managing Apache Spark applications on EKS clusters. With the ACK controller for EMR on EKS, you can now define and run Amazon EMR jobs directly using the Kubernetes API. This lets you manage EMR on EKS resources directly using Kubernetes-native tools such as kubectl.

The controller pattern has been widely adopted by the Kubernetes community to manage the lifecycle of resources. In fact, Kubernetes has built-in controllers for built-in resources like Jobs or Deployment. These controllers continuously ensure that the observed state of a resource matches the desired state of the resource stored in Kubernetes. For example, if you define a deployment that has NGINX using three replicas, the deployment controller continuously watches and tries to maintain three replicas of NGINX pods. Using the same pattern, the ACK controller for EMR on EKS installs two custom resource definitions (CRDs): VirtualCluster and JobRun. When you create EMR virtual clusters, the controller tracks these as Kubernetes custom resources and calls the EMR on EKS service API (also known as emr-containers) to create and manage these resources. If you want to get a deeper understanding of how ACK works with AWS service APIs, and learn how ACK generates Kubernetes resources like CRDs, see blog post.

If you need a simple getting started tutorial, refer to Run Spark jobs using the ACK EMR on EKS controller. Typically, customers who run Apache Spark jobs on EKS clusters use higher level abstraction such as Argo Workflows, Apache Airflow, or AWS Step Functions, and use workflow-based orchestration in order to run their extract, transform, and load (ETL) jobs. This gives you a consistent experience running jobs while defining job pipelines using Directed Acyclic Graphs (DAGs). DAGs allow you organize your job steps with dependencies and relationships to say how they should run. Argo Workflows is a container-native workflow engine for orchestrating parallel jobs on Kubernetes.

In this post, we show you how to use Argo Workflows with the ACK controller for EMR on EKS to run Apache Spark jobs on EKS clusters.

Solution overview

In the following diagram, we show Argo Workflows submitting a request to the Kubernetes API using its orchestration mechanism.

We’re using Argo to showcase the possibilities with workflow orchestration in this post, but you can also submit jobs directly using kubectl (the Kubernetes command line tool). When Argo Workflows submits these requests to the Kubernetes API, the ACK controller for EMR on EKS reconciles VirtualCluster custom resources by invoking the EMR on EKS APIs.

Let’s go through an exercise of creating custom resources using the ACK controller for EMR on EKS and Argo Workflows.

Prerequisites

Your environment needs the following tools installed:

Install the ACK controller for EMR on EKS

You can either create an EKS cluster or re-use an existing one. We refer to the instructions in Run Spark jobs using the ACK EMR on EKS controller to set up our environment. Complete the following steps:

  1. Install the EKS cluster.
  2. Create IAM Identity mapping.
  3. Install emrcontainers-controller.
  4. Configure IRSA for the EMR on EKS controller.
  5. Create an EMR job execution role and configure IRSA.

At this stage, you should have an EKS cluster with proper role-based access control (RBAC) permissions so that Amazon EMR can run its jobs. You should also have the ACK controller for EMR on EKS installed and the EMR job execution role with IAM Roles for Service Account (IRSA) configurations so that they have the correct permissions to call EMR APIs.

Please note, we’re skipping the step to create an EMR virtual cluster because we want to create a custom resource using Argo Workflows. If you created this resource using the getting started tutorial, you can either delete the virtual cluster or create new IAM identity mapping using a different namespace.

Let’s validate the annotation for the EMR on EKS controller service account before proceeding:

# validate annotation
kubectl get pods -n $ACK_SYSTEM_NAMESPACE
CONTROLLER_POD_NAME=$(kubectl get pods -n $ACK_SYSTEM_NAMESPACE --selector=app.kubernetes.io/name=emrcontainers-chart -o jsonpath='{.items..metadata.name}')
kubectl describe pod -n $ACK_SYSTEM_NAMESPACE $CONTROLLER_POD_NAME | grep "^\s*AWS_"

The following code shows the expected results:

AWS_REGION:                      us-west-2
AWS_ENDPOINT_URL:
AWS_ROLE_ARN:                    arn:aws:iam::012345678910:role/ack-emrcontainers-controller
AWS_WEB_IDENTITY_TOKEN_FILE:     /var/run/secrets/eks.amazonaws.com/serviceaccount/token (http://eks.amazonaws.com/serviceaccount/token)

Check the logs of the controller:

kubectl logs ${CONTROLLER_POD_NAME} -n ${ACK_SYSTEM_NAMESPACE}

The following code is the expected outcome:

2022-11-02T18:52:33.588Z    INFO    controller.virtualcluster    Starting Controller    {"reconciler group": "emrcontainers.services.k8s.aws", "reconciler kind": "VirtualCluster"}
2022-11-02T18:52:33.588Z    INFO    controller.virtualcluster    Starting EventSource    {"reconciler group": "emrcontainers.services.k8s.aws", "reconciler kind": "VirtualCluster", "source": "kind source: *v1alpha1.VirtualCluster"}
2022-11-02T18:52:33.589Z    INFO    controller.virtualcluster    Starting Controller    {"reconciler group": "emrcontainers.services.k8s.aws", "reconciler kind": "VirtualCluster"}
2022-11-02T18:52:33.589Z    INFO    controller.jobrun    Starting EventSource    {"reconciler group": "emrcontainers.services.k8s.aws", "reconciler kind": "JobRun", "source": "kind source: *v1alpha1.JobRun"}
2022-11-02T18:52:33.589Z    INFO    controller.jobrun    Starting Controller    {"reconciler group": "emrcontainers.services.k8s.aws", "reconciler kind": "JobRun"}
...
2022-11-02T18:52:33.689Z    INFO    controller.jobrun    Starting workers    {"reconciler group": "emrcontainers.services.k8s.aws", "reconciler kind": "JobRun", "worker count": 1}
2022-11-02T18:52:33.689Z    INFO    controller.virtualcluster    Starting workers    {"reconciler group": "emrcontainers.services.k8s.aws", "reconciler kind": "VirtualCluster", "worker count": 1}

Now we’re ready to install Argo Workflows and use workflow orchestration to create EMR on EKS virtual clusters and submit jobs.

Install Argo Workflows

The following steps are meant for quick installation with a proof of concept in mind. This is not meant for a production install. We recommend reviewing the Argo documentation, security guidelines, and other considerations for a production install.

We install the argo CLI first. We have provided instructions to install the argo CLI using brew, which is compatible with the Mac operating system. If you use Linux or another OS, refer to Quick Start for installation steps.

brew install argo

Let’s create a namespace and install Argo Workflows on your EMR on EKS cluster:

kubectl create namespace argo
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.4.3/install.yaml

You can access the Argo UI locally by port-forwarding the argo-server deployment:

kubectl -n argo port-forward deploy/argo-server 2746:2746

You can access the web UI at https://localhost:2746. You will get a notice that “Your connection is not private” because Argo is using a self-signed certificate. It’s okay to choose Advanced and then Proceed to localhost.

Please note, you get an Access Denied error because we haven’t configured permissions yet. Let’s set up RBAC so that Argo Workflows has permissions to communicate with the Kubernetes API. We give admin permissions to argo serviceaccount in the argo and emr-ns namespaces.

Open another terminal window and run these commands:

# setup rbac 
kubectl create rolebinding default-admin --clusterrole=admin --serviceaccount=argo:default --namespace=argo
kubectl create rolebinding default-admin --clusterrole=admin --serviceaccount=argo:default --namespace=emr-ns

# extract bearer token to login into UI
SECRET=$(kubectl get sa default -n argo -o=jsonpath='{.secrets[0].name}')
ARGO_TOKEN="Bearer $(kubectl get secret $SECRET -n argo -o=jsonpath='{.data.token}' | base64 --decode)"
echo $ARGO_TOKEN

You now have a bearer token that we need to enter for client authentication.

You can now navigate to the Workflows tab and change the namespace to emr-ns to see the workflows under this namespace.

Let’s set up RBAC permissions and create a workflow that creates an EMR on EKS virtual cluster:

cat << EOF > argo-emrcontainers-vc-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: argo-emrcontainers-virtualcluster
rules:
  - apiGroups:
      - emrcontainers.services.k8s.aws
    resources:
      - virtualclusters
    verbs:
      - '*'
EOF

cat << EOF > argo-emrcontainers-jr-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: argo-emrcontainers-jobrun
rules:
  - apiGroups:
      - emrcontainers.services.k8s.aws
    resources:
      - jobruns
    verbs:
      - '*'
EOF

Let’s create these roles and a role binding:

# create argo clusterrole with permissions to emrcontainers.services.k8s.aws
kubectl apply -f argo-emrcontainers-vc-role.yaml
kubectl apply -f argo-emrcontainers-jr-role.yaml

# Give permissions for argo to use emr-containers clusterrole
kubectl create rolebinding argo-emrcontainers-virtualcluster --clusterrole=argo-emrcontainers-virtualcluster --serviceaccount=emr-ns:default -n emr-ns
kubectl create rolebinding argo-emrcontainers-jobrun --clusterrole=argo-emrcontainers-jobrun --serviceaccount=emr-ns:default -n emr-ns

Let’s recap what we have done so far. We created an EMR on EKS cluster, installed the ACK controller for EMR on EKS using Helm, installed the Argo CLI, installed Argo Workflows, gained access to the Argo UI, and set up RBAC permissions for Argo. RBAC permissions are required so that the default service account in the Argo namespace can use VirtualCluster and JobRun custom resources via the emrcontainers.services.k8s.aws API.

It’s time to create the EMR virtual cluster. The environment variables used in the following code are from the getting started guide, but you can change these to meet your environment:

export EKS_CLUSTER_NAME=ack-emr-eks
export EMR_NAMESPACE=emr-ns

cat << EOF > argo-emr-virtualcluster.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: emr-virtualcluster
spec:
  arguments: {}
  entrypoint: emr-virtualcluster
  templates:
  - name: emr-virtualcluster
    resource:
      action: create
      manifest: |
        apiVersion: emrcontainers.services.k8s.aws/v1alpha1
        kind: VirtualCluster
        metadata:
          name: my-ack-vc
        spec:
          name: my-ack-vc
          containerProvider:
            id: ${EKS_CLUSTER_NAME}
            type_: EKS
            info:
              eksInfo:
                namespace: ${EMR_NAMESPACE}
EOF

Use the following command to create an Argo Workflow for virtual cluster creation:

kubectl apply -f argo-emr-virtualcluster.yaml -n emr-ns
argo list -n emr-ns

The following code is the expected result from the Argo CLI:

NAME                 STATUS      AGE   DURATION   PRIORITY   MESSAGE
emr-virtualcluster   Succeeded   12m   11s        0 

Check the status of virtualcluster:

kubectl describe virtualcluster/my-ack-vc -n emr-ns

The following code is the expected result from the preceding command:

Name:         my-ack-vc
Namespace:    default
Labels:       <none>
Annotations:  <none>
API Version:  emrcontainers.services.k8s.aws/v1alpha1
Kind:         VirtualCluster
...
Status:
  Ack Resource Metadata:
    Arn:               arn:aws:emr-containers:us-west-2:012345678910:/virtualclusters/dxnqujbxexzri28ph1wspbxo0
    Owner Account ID:  012345678910
    Region:            us-west-2
  Conditions:
    Last Transition Time:  2022-11-03T15:34:10Z
    Message:               Resource synced successfully
    Reason:                
    Status:                True
    Type:                  ACK.ResourceSynced
  Id:                      dxnqujbxexzri28ph1wspbxo0
Events:                    <none>

If you run into issues, you can check Argo logs using the following command or through the console:

argo logs emr-virtualcluster -n emr-ns

You can also check controller logs as mentioned in the troubleshooting guide.

Because we have an EMR virtual cluster ready to accept jobs, we can start working on the prerequisites for job submission.

Create an S3 bucket and Amazon CloudWatch Logs group that are needed for the job (see the following code). If you already created these resources from the getting started tutorial, you can skip this step.

export RANDOM_ID1=$(LC_ALL=C tr -dc a-z0-9 </dev/urandom | head -c 8)

aws logs create-log-group --log-group-name=/emr-on-eks-logs/$EKS_CLUSTER_NAME
aws s3 mb s3://$EKS_CLUSTER_NAME-$RANDOM_ID1

We use the New York Citi Bike dataset, which has rider demographics and trip data information. Run the following command to copy the dataset into your S3 bucket:

export S3BUCKET=$EKS_CLUSTER_NAME-$RANDOM_ID1
aws s3 sync s3://tripdata/ s3://${S3BUCKET}/citibike/csv/

Copy the sample Spark application code to your S3 bucket:

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-2782/citibike-convert-csv-to-parquet.py s3://${S3BUCKET}/application/
aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-2782/citibike-ridership.py s3://${S3BUCKET}/application/
aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-2782/citibike-popular-stations.py s3://${S3BUCKET}/application/
aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-2782/citibike-trips-by-age.py s3://${S3BUCKET}/application/

Now, it’s time to run sample Spark job. Run the following to generate an Argo workflow submission template:

export RANDOM_ID2=$(LC_ALL=C tr -dc a-z0-9 </dev/urandom | head -c 8)

cat << EOF > argo-citibike-steps-jobrun.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: emr-citibike-${RANDOM_ID2}
spec:
  entrypoint: emr-citibike
  templates:
  - name: emr-citibike
    steps:
    - - name: emr-citibike-csv-parquet
        template: emr-citibike-csv-parquet
    - - name: emr-citibike-ridership
        template: emr-citibike-ridership
      - name: emr-citibike-popular-stations
        template: emr-citibike-popular-stations
      - name: emr-citibike-trips-by-age
        template: emr-citibike-trips-by-age

  # This is parent job that converts csv data to parquet
  - name: emr-citibike-csv-parquet
    resource:
      action: create
      successCondition: status.state == COMPLETED
      failureCondition: status.state == FAILED      
      manifest: |
        apiVersion: emrcontainers.services.k8s.aws/v1alpha1
        kind: JobRun
        metadata:
          name: my-ack-jobrun-csv-parquet-${RANDOM_ID2}
        spec:
          name: my-ack-jobrun-csv-parquet-${RANDOM_ID2}
          virtualClusterRef:
            from:
              name: my-ack-vc
          executionRoleARN: "${ACK_JOB_EXECUTION_ROLE_ARN}"
          releaseLabel: "emr-6.7.0-latest"
          jobDriver:
            sparkSubmitJobDriver:
              entryPoint: "s3://${S3BUCKET}/application/citibike-convert-csv-to-parquet.py"
              entryPointArguments: [${S3BUCKET}]
              sparkSubmitParameters: "--conf spark.executor.instances=2 --conf spark.executor.memory=1G --conf spark.executor.cores=1 --conf spark.driver.cores=1 --conf spark.sql.shuffle.partitions=60 --conf spark.dynamicAllocation.enabled=false"
          configurationOverrides: |
            ApplicationConfiguration: null
            MonitoringConfiguration:
              CloudWatchMonitoringConfiguration:
                LogGroupName: /emr-on-eks-logs/${EKS_CLUSTER_NAME}
                LogStreamNamePrefix: citibike
              S3MonitoringConfiguration:
                LogUri: s3://${S3BUCKET}/logs

  # This is a child job which runs after csv-parquet jobs is complete
  - name: emr-citibike-ridership
    resource:
      action: create
      manifest: |
        apiVersion: emrcontainers.services.k8s.aws/v1alpha1
        kind: JobRun
        metadata:
          name: my-ack-jobrun-ridership-${RANDOM_ID2}
        spec:
          name: my-ack-jobrun-ridership-${RANDOM_ID2}
          virtualClusterRef:
            from:
              name: my-ack-vc
          executionRoleARN: "${ACK_JOB_EXECUTION_ROLE_ARN}"
          releaseLabel: "emr-6.7.0-latest"
          jobDriver:
            sparkSubmitJobDriver:
              entryPoint: "s3://${S3BUCKET}/application/citibike-ridership.py"
              entryPointArguments: [${S3BUCKET}]
              sparkSubmitParameters: "--conf spark.executor.instances=2 --conf spark.executor.memory=1G --conf spark.executor.cores=1 --conf spark.driver.cores=1 --conf spark.sql.shuffle.partitions=60 --conf spark.dynamicAllocation.enabled=false"
          configurationOverrides: |
            ApplicationConfiguration: null
            MonitoringConfiguration:
              CloudWatchMonitoringConfiguration:
                LogGroupName: /emr-on-eks-logs/${EKS_CLUSTER_NAME}
                LogStreamNamePrefix: citibike
              S3MonitoringConfiguration:
                LogUri: s3://${S3BUCKET}/logs   

  # This is a child job which runs after csv-parquet jobs is complete
  - name: emr-citibike-popular-stations
    resource:
      action: create
      manifest: |
        apiVersion: emrcontainers.services.k8s.aws/v1alpha1
        kind: JobRun
        metadata:
          name: my-ack-jobrun-popular-stations-${RANDOM_ID2}
        spec:
          name: my-ack-jobrun-popular-stations-${RANDOM_ID2}
          virtualClusterRef:
            from:
              name: my-ack-vc
          executionRoleARN: "${ACK_JOB_EXECUTION_ROLE_ARN}"
          releaseLabel: "emr-6.7.0-latest"
          jobDriver:
            sparkSubmitJobDriver:
              entryPoint: "s3://${S3BUCKET}/application/citibike-popular-stations.py"
              entryPointArguments: [${S3BUCKET}]
              sparkSubmitParameters: "--conf spark.executor.instances=2 --conf spark.executor.memory=1G --conf spark.executor.cores=1 --conf spark.driver.cores=1 --conf spark.sql.shuffle.partitions=60 --conf spark.dynamicAllocation.enabled=false"
          configurationOverrides: |
            ApplicationConfiguration: null
            MonitoringConfiguration:
              CloudWatchMonitoringConfiguration:
                LogGroupName: /emr-on-eks-logs/${EKS_CLUSTER_NAME}
                LogStreamNamePrefix: citibike
              S3MonitoringConfiguration:
                LogUri: s3://${S3BUCKET}/logs             

  # This is a child job which runs after csv-parquet jobs is complete
  - name: emr-citibike-trips-by-age
    resource:
      action: create
      manifest: |
        apiVersion: emrcontainers.services.k8s.aws/v1alpha1
        kind: JobRun
        metadata:
          name: my-ack-jobrun-trips-by-age-${RANDOM_ID2}
        spec:
          name: my-ack-jobrun-trips-by-age-${RANDOM_ID2}
          virtualClusterRef:
            from:
              name: my-ack-vc
          executionRoleARN: "${ACK_JOB_EXECUTION_ROLE_ARN}"
          releaseLabel: "emr-6.7.0-latest"
          jobDriver:
            sparkSubmitJobDriver:
              entryPoint: "s3://${S3BUCKET}/application/citibike-trips-by-age.py"
              entryPointArguments: [${S3BUCKET}]
              sparkSubmitParameters: "--conf spark.executor.instances=2 --conf spark.executor.memory=1G --conf spark.executor.cores=1 --conf spark.driver.cores=1 --conf spark.sql.shuffle.partitions=60 --conf spark.dynamicAllocation.enabled=false"
          configurationOverrides: |
            ApplicationConfiguration: null
            MonitoringConfiguration:
              CloudWatchMonitoringConfiguration:
                LogGroupName: /emr-on-eks-logs/${EKS_CLUSTER_NAME}
                LogStreamNamePrefix: citibike
              S3MonitoringConfiguration:
                LogUri: s3://${S3BUCKET}/logs                        
EOF

Let’s run this job:

argo -n emr-ns submit --watch argo-citibike-steps-jobrun.yaml

The following code is the expected result:

Name:                emr-citibike-tp8dlo6c
Namespace:           emr-ns
ServiceAccount:      unset (will run with the default ServiceAccount)
Status:              Succeeded
Conditions:          
 PodRunning          False
 Completed           True
Created:             Mon Nov 07 15:29:34 -0500 (20 seconds ago)
Started:             Mon Nov 07 15:29:34 -0500 (20 seconds ago)
Finished:            Mon Nov 07 15:29:54 -0500 (now)
Duration:            20 seconds
Progress:            4/4
ResourcesDuration:   4s*(1 cpu),4s*(100Mi memory)
STEP                                  TEMPLATE                       PODNAME                                                         DURATION  MESSAGE
 ✔ emr-citibike-if32fvjd              emr-citibike                                                                                               
 ├───✔ emr-citibike-csv-parquet       emr-citibike-csv-parquet       emr-citibike-if32fvjd-emr-citibike-csv-parquet-140307921        2m          
 └─┬─✔ emr-citibike-popular-stations  emr-citibike-popular-stations  emr-citibike-if32fvjd-emr-citibike-popular-stations-1670101609  4s          
   ├─✔ emr-citibike-ridership         emr-citibike-ridership         emr-citibike-if32fvjd-emr-citibike-ridership-2463339702         4s          
   └─✔ emr-citibike-trips-by-age      emr-citibike-trips-by-age      emr-citibike-if32fvjd-emr-citibike-trips-by-age-3778285872      4s       

You can open another terminal and run the following command to check on the job status as well:

kubectl -n emr-ns get jobruns -w

You can also check the UI and look at the Argo logs, as shown in the following screenshot.

Clean up

Follow the instructions from the getting started tutorial to clean up the ACK controller for EMR on EKS and its resources. To delete Argo resources, use the following code:

kubectl delete -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.4.3/install.yaml
kubectl delete -f argo-emrcontainers-vc-role.yaml
kubectl delete -f argo-emrcontainers-jr-role.yaml
kubectl delete rolebinding argo-emrcontainers-virtualcluster -n emr-ns
kubectl delete rolebinding argo-emrcontainers-jobrun -n emr-ns
kubectl delete ns argo

Conclusion

In this post, we went through how to manage your Spark jobs on EKS clusters using the ACK controller for EMR on EKS. You can define Spark jobs in a declarative fashion and manage these resources using Kubernetes custom resources. We also reviewed how to use Argo Workflows to orchestrate these jobs to get a consistent job submission experience. You can take advantage of the rich features from Argo Workflows such as using DAGs to define multi-step workflows and specify dependencies within job steps, using the UI to visualize and manage the jobs, and defining retries and timeouts at the workflow or task level.

You can get started today by installing the ACK controller for EMR on EKS and start managing your Amazon EMR resources using Kubernetes-native methods.


About the authors

Peter Dalbhanjan is a Solutions Architect for AWS based in Herndon, VA. Peter is passionate about evangelizing and solving complex business problems using combination of AWS services and open source solutions. At AWS, Peter helps with designing and architecting variety of customer workloads.

Amine Hilaly is a Software Development Engineer at Amazon Web Services working on the Kubernetes and Open source related projects for about two years. Amine is a Go, open-source, and Kubernetes fanatic.

Announcing AWS Glue crawler support for Snowflake

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/announcing-aws-glue-crawler-support-for-snowflake/

For data lake customers who need to discover petabytes of data, AWS Glue crawlers are a popular way to scan data in the background, so you can focus on using the data to make better intelligent decisions. You may also have data in data warehouses such as Snowflake and want the ability to discover the data in the warehouse and combine with data from data lakes to derive insights. AWS Glue crawlers now support Snowflake, making it easier for you to understand updates to Snowflake schema and extract meaningful insights.

To crawl a Snowflake database, you can create and schedule an AWS Glue crawler with an JDBC URL with credential information from AWS Secrets Manager. A configuration option allows you to specify if you want the crawler to crawl the entire database or limit the tables by including the schema or table path and exclude patterns to reduce crawl time. With each run of the crawler, the crawler inspects and catalogs information, such as updates or deletes to Snowflake tables, external tables, views, and materialized views in the AWS Glue Data Catalog. For Snowflake columns with non-Hive compatible types, such as geography or geometry, the crawler extracts that information as a raw data type and makes it available in the Data Catalog.

In this post, we set up an AWS Glue crawler to crawl the OpenStreetMap geospatial dataset, which is freely available through Snowflake Marketplace. This dataset includes all of the OpenStreetMap location data for New York. OpenStreetMap maintains data about businesses, roads, trails, cafes, railway stations, and much more, from all over the world.

Overview of solution

Snowflake is a cloud data platform that provides data solutions from data warehousing to data science. Snowflake Computing is an AWS Advanced Technology Partner with AWS Competencies in Data & Analytics, Machine Learning, and Retail, as well as an AWS service validation for AWS PrivateLink.

In this solution, we use a sample use case involving points of interest in New York City, based on the following Snowflake quick start. Follow sections 1 and 2 to get access to sample geospatial data from Snowflake Marketplace. We show how to interpret the geography data type and understand the different formats. We use the AWS Glue crawler to crawl this OpenStreetMap geospatial dataset and make it available in the Data Catalog with the geography data type maintained where appropriate.

Prerequisites

To follow along, you need the following:

  • An AWS account.
  • An AWS Identity and Access Management (IAM) user with access to the following services:
  • An IAM role with access to run AWS Glue crawlers.
  • If the AWS account you use to follow this post uses AWS Lake Formation to manage permissions on the AWS Glue Data Catalog, make sure that you log in as a user with access to create databases and tables. For more information, refer to Implicit Lake Formation permissions.
  • A Snowflake Enterprise Edition account with permission to create storage integrations, ideally in the AWS us-east-1 Region or closest available trial Region, like us-east-2. If necessary, you can subscribe to a Snowflake trial account on AWS Marketplace.
    • On the Marketplace listing page, choose Continue to Subscribe, and then choose Accept Terms. You’re redirected to the Snowflake website to begin using the software. To complete your registration, choose Set Up Your Account.
    • If you’re new to Snowflake, consider completing the Snowflake in 20 Minutes tutorial. By the end of the tutorial, you should know how to create required Snowflake objects, including warehouses, databases, and tables for storing and querying data.
  • A Snowflake worksheet (query editor) and associated access to a Snowflake virtual warehouse (compute) and database (storage).
  • Access to an existing Snowflake account with the ACCOUNTADMIN role or the IMPORT SHARE privilege.

Create an AWS Glue connection to Snowflake

For this post, an AWS Glue connection to your Snowflake cluster is necessary. For more details about how to create it, follow the steps in Performing data transformations using Snowflake and AWS Glue. The following screenshot shows the configuration used to create a connection to the Snowflake cluster for this post.
configuration used to create a connection to the Snowflake cluster for this post.

Create an AWS Glue crawler

To create your crawler, complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
    1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose Create crawler.
    Choose Create crawler.
  3. For Name, enter a name (for example, glue-blog-snowflake-crawler).
  4. Choose Next.
    Choose Next
  5. For Is your data already mapped to Glue tables, select Not yet.
  6. In the Data sources section, choose Add a data source.
    6. In the Data sources section, choose Add a data source.

For this post, you use a JDBC dataset as a source.

  1. For Data source, choose JDBC.
  2. For Connection, select the connection that you created earlier (for this post, SA-snowflake-connection).
  3. For Include path, enter the path to the Snowflake database you created as a prerequisite (OSM_NEWYORK/NEW_YORK/%).
  4. For Additional metadata, choose COMMENTS and RAWTYPE.

This allows the crawler to harvest metadata related to comments and raw types like geospatial columns.

  1. Choose Add a JDBC data source.
  1. Choose Next.
    Choose Next
  2. For Existing IAM role¸ choose the role you created as a prerequisite (for this post, we use AWSGlueServiceRole-DefualtRole).
  3. Choose Next.
    Choose Next

Now let’s create an AWS Glue database.

  1. Under Target database, choose Add database.
    Under Target database, choose Add database.
  2. For Name, enter gluesnowdb.
  3. Choose Create database.
    Choose Create database.
  4. On the Set output and scheduling page, for Target database, choose the database you just created (gluesnowdb).
  5. For Table name prefix, enter blog_.
  6. For Frequency, choose On demand.
  7. Choose Next.
    Choose Next.
  8. Review the configuration and choose Create crawler.
    Review the configuration and choose Create crawler.

Run the AWS Glue crawler

To run the crawler, complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose the crawler you created.
    Choose the crawler you created.
  3. Choose Run crawler.
    Choose Run crawler

On the Crawler runs tab, you can see the current run of the crawler.
On the Crawler runs tab, you can see the current run of the crawler.

  1. Wait until the crawler run is complete.

As shown in the following screenshot, 27 tables were added.
As shown in the following screenshot, 27 tables were added.

Now let’s see how these tables look in the AWS Glue Data Catalog.

Explore the AWS Glue tables

Let’s explore the tables created by the crawler.

  1. On the AWS Glue console, chose Databases in the navigation pane.
    On the AWS Glue console, chose Databases in the navigation pane.
  2. Search for and choose the gluesnowdb database.
    Search for and choose the gluesnowdb database.

Now you can see the list of the tables created by the crawler.
Now you can see the list of the tables created by the crawler.

  1. Choose the blog_osm_newyork_new_york_v_osm_ny_amenity table.
    3. Choose the blog_osm_newyork_new_york_v_osm_ny_amenity table.

In the Schema section, you can see that the raw type was also harvested from the source Snowflake database.
In the Schema section, you can see that the raw type was also harvested from the source Snowflake database.

  1. Choose the Advanced properties tab.
  2. In the Table properties section, you can see that the classification is snowflake and the typeOfData is view.
    5. In the Table properties section, you can see that the classification is snowflake and the typeOfData is view.

Clean up

To avoid incurring future charges, and to clean up unused roles and policies, delete the resources you created: the CloudFormation stack, S3 bucket, AWS Glue crawler, AWS Glue database, and AWS Glue table.

Conclusion

AWS Glue crawlers now support Snowflake tables, views, and materialized views. Offering more options to integrate Snowflake databases to your AWS Glue Data Catalog. You can use AWS Glue crawlers to discover Snowflake datasets, extract schema information, and populate the Data Catalog.

In this post, we provided a procedure to set up AWS Glue crawlers to discover Snowflake tables, which reduces the time and cost needed to incrementally process Snowflake table data updates in the Data Catalog. To learn more about this feature, refer to the docs.

Special thanks to everyone who contributed to this crawler feature launch: Theo Xu, Hunny Vankawala, and Jessica Cheng.

Happy crawling!

Attribution

OpenStreetMap data by OpenStreetMap Foundation is licensed under Open Data Commons Open Database License (ODbL)


About the authors

Leonardo Gómez is a Senior Analytics Specialist Solutions Architect at AWS. Based in Toronto, Canada, he has over a decade of experience in data management, helping customers around the globe address their business and technical needs.

Bosco Albuquerque is a Sr. Partner Solutions Architect at AWS and has over 20 years of experience working with database and analytics products from enterprise database vendors and cloud providers. He has helped technology companies design and implement data analytics solutions and products.

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.